Class: Harnex::Adapters::Codex
- Inherits:
-
Base
- Object
- Base
- Harnex::Adapters::Codex
show all
- Defined in:
- lib/harnex/adapters/codex.rb
Constant Summary
collapse
- SUBMIT_DELAY_MS =
75
- SUBMIT_DELAY_PER_KB_MS =
50
- SEND_WAIT_SECONDS =
2.0
Constants inherited
from Base
Base::PROMPT_PREFIXES
Instance Attribute Summary
Attributes inherited from Base
#key
Instance Method Summary
collapse
Methods inherited from Base
#build_command, #wait_for_sendable
Constructor Details
#initialize(extra_args = []) ⇒ Codex
Returns a new instance of Codex.
8
9
10
11
|
# File 'lib/harnex/adapters/codex.rb', line 8
def initialize( = [])
super("codex", )
@banner_seen = false
end
|
Instance Method Details
#base_command ⇒ Object
13
14
15
16
17
18
19
|
# File 'lib/harnex/adapters/codex.rb', line 13
def base_command
[
"codex",
"--dangerously-bypass-approvals-and-sandbox",
"--no-alt-screen"
]
end
|
#build_send_payload(text:, submit:, enter_only:, screen_text:, force: false) ⇒ Object
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
# File 'lib/harnex/adapters/codex.rb', line 92
def build_send_payload(text:, submit:, enter_only:, screen_text:, force: false)
state = input_state(screen_text)
if !force && submit && !enter_only && state[:input_ready] != true
raise ArgumentError, blocked_message(state, enter_only: enter_only)
end
steps = []
unless enter_only
body = text.to_s
steps << { text: body, newline: false } unless body.empty?
end
if submit || enter_only
step = { text: submit_bytes, newline: false }
step[:delay_ms] = submit_delay_ms(text) if steps.any?
steps << step
end
{
steps: steps,
input_state: state,
force: force
}
end
|
#infer_repo_path(argv) ⇒ Object
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
# File 'lib/harnex/adapters/codex.rb', line 21
def infer_repo_path(argv)
index = 0
while index < argv.length
arg = argv[index]
case arg
when "-C", "--cd"
next_value = argv[index + 1]
return next_value if next_value
break
when /\A-C(.+)\z/
return Regexp.last_match(1)
end
index += 1
end
Dir.pwd
end
|
#inject_exit(writer) ⇒ Object
117
118
119
|
# File 'lib/harnex/adapters/codex.rb', line 117
def inject_exit(writer)
super(writer, delay_ms: SUBMIT_DELAY_MS)
end
|
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
# File 'lib/harnex/adapters/codex.rb', line 39
def input_state(screen_text)
lines = recent_lines(screen_text)
if lines.any? { |line| line.include?("OpenAI Codex") || line.include?("gpt-") }
@banner_seen = true
end
return super unless @banner_seen
if lines.any? { |line| prompt_line?(line) }
{
state: "prompt",
input_ready: true
}
else
{
state: "session",
input_ready: nil
}
end
end
|
#parse_session_summary(transcript_tail) ⇒ Object
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
# File 'lib/harnex/adapters/codex.rb', line 59
def parse_session_summary(transcript_tail)
summary = empty_session_summary
text = transcript_tail.to_s
if (match = text.match(/Token usage:\s+total=([\d,]+)\s+input=([\d,]+)(?:\s+\(\+\s+([\d,]+)\s+cached\))?\s+output=([\d,]+)(?:\s+\(reasoning\s+([\d,]+)\))?/))
summary[:total_tokens] = parse_token_count(match[1])
summary[:input_tokens] = parse_token_count(match[2])
summary[:cached_tokens] = parse_token_count(match[3])
summary[:output_tokens] = parse_token_count(match[4])
summary[:reasoning_tokens] = parse_token_count(match[5])
end
if (match = text.match(/codex resume\s+([0-9a-f-]{36})/))
summary[:agent_session_id] = match[1]
end
summary
end
|
#send_wait_seconds(submit:, enter_only:) ⇒ Object
78
79
80
81
82
83
|
# File 'lib/harnex/adapters/codex.rb', line 78
def send_wait_seconds(submit:, enter_only:)
return 0.0 unless submit
return 0.0 if enter_only
SEND_WAIT_SECONDS
end
|
#wait_for_sendable_state?(state, submit:, enter_only:) ⇒ Boolean
85
86
87
88
89
90
|
# File 'lib/harnex/adapters/codex.rb', line 85
def wait_for_sendable_state?(state, submit:, enter_only:)
return false unless submit
return false if enter_only
state[:input_ready] != true
end
|