Class: Harnex::Adapters::Pi
- Inherits:
-
Base
- Object
- Base
- Harnex::Adapters::Pi
show all
- Defined in:
- lib/harnex/adapters/pi.rb
Overview
Pi RPC adapter — JSONL command/event protocol over stdio.
Protocol docs: ‘pi –mode rpc` (strict LF-delimited JSON lines).
Constant Summary
collapse
- STOP_TERM_GRACE_SECONDS =
0.5
- STOP_KILL_GRACE_SECONDS =
1.0
- DIALOG_UI_METHODS =
%w[select confirm input editor].freeze
Constants inherited
from Base
Base::AGENT_VERSION_TIMEOUT_SECONDS, Base::PROMPT_PREFIXES
Instance Attribute Summary collapse
Attributes inherited from Base
#key
Instance Method Summary
collapse
-
#base_command ⇒ Object
-
#build_command ⇒ Object
-
#build_send_payload(text:, submit:, enter_only:, screen_text:, force: false) ⇒ Object
-
#close ⇒ Object
-
#collect_session_summary ⇒ Object
-
#current_model ⇒ Object
-
#describe ⇒ Object
-
#dispatch(prompt:, model: nil, effort: nil) ⇒ Object
-
#initialize(extra_args = []) ⇒ Pi
constructor
-
#inject_exit(_writer, **_kwargs) ⇒ Object
-
#input_state(_screen_text = nil) ⇒ Object
-
#interrupt(turn_id: nil) ⇒ Object
-
#on_disconnect(&block) ⇒ Object
-
#on_notification(&block) ⇒ Object
-
#pid ⇒ Object
-
#provider ⇒ Object
-
#request_session_stats_async ⇒ Object
-
#respond_extension_ui_cancel(request_id:, method:) ⇒ Object
-
#start_rpc(env: nil, cwd: nil, read_io: nil, write_io: nil, pid: nil) ⇒ Object
-
#state ⇒ Object
-
#terminate_subprocess(term_grace_seconds: STOP_TERM_GRACE_SECONDS, kill_grace_seconds: STOP_KILL_GRACE_SECONDS) ⇒ Object
-
#transport ⇒ Object
Methods inherited from Base
#agent_version, #infer_repo_path, #parse_session_summary, #send_wait_seconds, #wait_for_sendable, #wait_for_sendable_state?
Constructor Details
#initialize(extra_args = []) ⇒ Pi
Returns a new instance of Pi.
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
# File 'lib/harnex/adapters/pi.rb', line 16
def initialize( = [])
super("pi", )
@initial_prompt = ()
@notification_handler = nil
@disconnect_handler = nil
@read_io = nil
@write_io = nil
@pid = nil
@reader_thread = nil
@closed = false
@disconnect_signaled = false
@state = :disconnected
@next_id = 1
@pending = {}
@id_mutex = Mutex.new
@write_mutex = Mutex.new
@summary_mutex = Mutex.new
@session_summary = {}
@model = nil
@provider = nil
@session_stats_requested = false
@last_completed_at = nil
end
|
Instance Attribute Details
#initial_prompt ⇒ Object
Returns the value of attribute initial_prompt.
14
15
16
|
# File 'lib/harnex/adapters/pi.rb', line 14
def initial_prompt
@initial_prompt
end
|
#last_completed_at ⇒ Object
Returns the value of attribute last_completed_at.
14
15
16
|
# File 'lib/harnex/adapters/pi.rb', line 14
def last_completed_at
@last_completed_at
end
|
Instance Method Details
#base_command ⇒ Object
52
53
54
|
# File 'lib/harnex/adapters/pi.rb', line 52
def base_command
["pi", "--mode", "rpc"]
end
|
#build_command ⇒ Object
56
57
58
|
# File 'lib/harnex/adapters/pi.rb', line 56
def build_command
base_command +
end
|
#build_send_payload(text:, submit:, enter_only:, screen_text:, force: false) ⇒ Object
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
# File 'lib/harnex/adapters/pi.rb', line 84
def build_send_payload(text:, submit:, enter_only:, screen_text:, force: false)
state = input_state(nil)
if !force && submit && !enter_only && state[:input_ready] != true
raise ArgumentError, blocked_message(state, enter_only: enter_only)
end
raise ArgumentError, "Pi RPC cannot stage input without submitting it" unless submit || enter_only
raise ArgumentError, "Pi RPC does not support submit-only input" if enter_only
{
dispatch: { prompt: text.to_s },
input_state: state,
force: force
}
end
|
#close ⇒ Object
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
|
# File 'lib/harnex/adapters/pi.rb', line 190
def close
return if @closed
@closed = true
fail_pending_requests(StandardError.new("pi rpc client closed"))
begin
@write_io.close unless @write_io&.closed?
rescue IOError
nil
end
@reader_thread&.join(2)
ensure
terminate_subprocess(
term_grace_seconds: STOP_TERM_GRACE_SECONDS,
kill_grace_seconds: STOP_KILL_GRACE_SECONDS
)
end
|
#collect_session_summary ⇒ Object
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
|
# File 'lib/harnex/adapters/pi.rb', line 171
def collect_session_summary
attempt_live_summary_refresh if connected?
@summary_mutex.synchronize do
{
input_tokens: @session_summary[:input_tokens],
output_tokens: @session_summary[:output_tokens],
reasoning_tokens: nil,
cached_tokens: @session_summary[:cached_tokens],
total_tokens: @session_summary[:total_tokens],
agent_session_id: @session_summary[:agent_session_id],
tool_calls: @session_summary[:tool_calls],
cost_usd: @session_summary[:cost_usd],
model: @session_summary[:model],
agent_provider: @session_summary[:agent_provider]
}
end
end
|
#current_model ⇒ Object
48
49
50
|
# File 'lib/harnex/adapters/pi.rb', line 48
def current_model
@model
end
|
#describe ⇒ Object
60
61
62
63
64
65
66
67
68
69
70
71
|
# File 'lib/harnex/adapters/pi.rb', line 60
def describe
{
transport: transport,
protocol: "jsonl",
events: %w[
agent_start agent_end turn_start turn_end message_start message_update message_end
tool_execution_start tool_execution_update tool_execution_end queue_update
compaction_start compaction_end auto_retry_start auto_retry_end extension_error
extension_ui_request
]
}
end
|
#dispatch(prompt:, model: nil, effort: nil) ⇒ Object
129
130
131
132
133
134
135
136
137
138
139
|
# File 'lib/harnex/adapters/pi.rb', line 129
def dispatch(prompt:, model: nil, effort: nil)
ensure_open!
payload = { "type" => "prompt", "message" => prompt.to_s }
payload["model"] = model if model
payload["thinkingLevel"] = effort if effort
request(payload)
@state = :busy
nil
end
|
#inject_exit(_writer, **_kwargs) ⇒ Object
100
101
102
|
# File 'lib/harnex/adapters/pi.rb', line 100
def inject_exit(_writer, **_kwargs)
nil
end
|
77
78
79
80
81
82
|
# File 'lib/harnex/adapters/pi.rb', line 77
def input_state(_screen_text = nil)
{
state: @state.to_s,
input_ready: @state == :prompt
}
end
|
#interrupt(turn_id: nil) ⇒ Object
141
142
143
144
145
146
|
# File 'lib/harnex/adapters/pi.rb', line 141
def interrupt(turn_id: nil)
ensure_open!
request("type" => "abort")
rescue StandardError
nil
end
|
#on_disconnect(&block) ⇒ Object
108
109
110
|
# File 'lib/harnex/adapters/pi.rb', line 108
def on_disconnect(&block)
@disconnect_handler = block
end
|
#on_notification(&block) ⇒ Object
104
105
106
|
# File 'lib/harnex/adapters/pi.rb', line 104
def on_notification(&block)
@notification_handler = block
end
|
#pid ⇒ Object
230
231
232
|
# File 'lib/harnex/adapters/pi.rb', line 230
def pid
@pid
end
|
#provider ⇒ Object
44
45
46
|
# File 'lib/harnex/adapters/pi.rb', line 44
def provider
@provider
end
|
#request_session_stats_async ⇒ Object
148
149
150
151
152
153
154
155
156
|
# File 'lib/harnex/adapters/pi.rb', line 148
def request_session_stats_async
return if @closed
return if @session_stats_requested
@session_stats_requested = true
write_line("type" => "get_session_stats")
rescue StandardError
nil
end
|
#respond_extension_ui_cancel(request_id:, method:) ⇒ Object
158
159
160
161
162
163
164
165
166
167
168
169
|
# File 'lib/harnex/adapters/pi.rb', line 158
def respond_extension_ui_cancel(request_id:, method:)
return false unless DIALOG_UI_METHODS.include?(method.to_s)
write_line(
"type" => "extension_ui_response",
"id" => request_id,
"cancelled" => true
)
true
rescue StandardError
false
end
|
#start_rpc(env: nil, cwd: nil, read_io: nil, write_io: nil, pid: nil) ⇒ Object
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
# File 'lib/harnex/adapters/pi.rb', line 112
def start_rpc(env: nil, cwd: nil, read_io: nil, write_io: nil, pid: nil)
if read_io && write_io
@read_io = read_io
@write_io = write_io
@pid = pid
else
@pid, @write_io, @read_io = spawn_subprocess(env, cwd)
end
@closed = false
@disconnect_signaled = false
@state = :prompt
@reader_thread = Thread.new { read_loop }
request_state_async
self
end
|
#state ⇒ Object
73
74
75
|
# File 'lib/harnex/adapters/pi.rb', line 73
def state
@state
end
|
#terminate_subprocess(term_grace_seconds: STOP_TERM_GRACE_SECONDS, kill_grace_seconds: STOP_KILL_GRACE_SECONDS) ⇒ Object
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
|
# File 'lib/harnex/adapters/pi.rb', line 210
def terminate_subprocess(term_grace_seconds: STOP_TERM_GRACE_SECONDS, kill_grace_seconds: STOP_KILL_GRACE_SECONDS)
return false unless @pid
begin
Process.kill("TERM", @pid)
rescue Errno::ESRCH
return true
end
return true if wait_for_process_exit(@pid, term_grace_seconds)
begin
Process.kill("KILL", @pid)
rescue Errno::ESRCH
return true
end
wait_for_process_exit(@pid, kill_grace_seconds)
end
|
#transport ⇒ Object
40
41
42
|
# File 'lib/harnex/adapters/pi.rb', line 40
def transport
:stdio_jsonl_rpc
end
|