Class: Harnex::Adapters::Pi

Inherits:
Base
  • Object
show all
Defined in:
lib/harnex/adapters/pi.rb

Overview

Pi RPC adapter — JSONL command/event protocol over stdio.

Protocol docs: ‘pi –mode rpc` (strict LF-delimited JSON lines).

Constant Summary collapse

STOP_TERM_GRACE_SECONDS =
0.5
STOP_KILL_GRACE_SECONDS =
1.0
DIALOG_UI_METHODS =
%w[select confirm input editor].freeze

Constants inherited from Base

Base::AGENT_VERSION_TIMEOUT_SECONDS, Base::PROMPT_PREFIXES

Instance Attribute Summary collapse

Attributes inherited from Base

#key

Instance Method Summary collapse

Methods inherited from Base

#agent_version, #infer_repo_path, #parse_session_summary, #send_wait_seconds, #wait_for_sendable, #wait_for_sendable_state?

Constructor Details

#initialize(extra_args = []) ⇒ Pi

Returns a new instance of Pi.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/harnex/adapters/pi.rb', line 16

def initialize(extra_args = [])
  super("pi", extra_args)
  @initial_prompt = extract_initial_prompt(extra_args)
  @notification_handler = nil
  @disconnect_handler = nil
  @read_io = nil
  @write_io = nil
  @pid = nil
  @reader_thread = nil
  @closed = false
  @disconnect_signaled = false
  @state = :disconnected
  @next_id = 1
  @pending = {}
  @id_mutex = Mutex.new
  @write_mutex = Mutex.new
  @summary_mutex = Mutex.new
  @session_summary = {}
  @model = nil
  @provider = nil
  @session_stats_requested = false
  @last_completed_at = nil
end

Instance Attribute Details

#initial_promptObject (readonly)

Returns the value of attribute initial_prompt.



14
15
16
# File 'lib/harnex/adapters/pi.rb', line 14

def initial_prompt
  @initial_prompt
end

#last_completed_atObject (readonly)

Returns the value of attribute last_completed_at.



14
15
16
# File 'lib/harnex/adapters/pi.rb', line 14

def last_completed_at
  @last_completed_at
end

Instance Method Details

#base_commandObject



52
53
54
# File 'lib/harnex/adapters/pi.rb', line 52

def base_command
  ["pi", "--mode", "rpc"]
end

#build_commandObject



56
57
58
# File 'lib/harnex/adapters/pi.rb', line 56

def build_command
  base_command + cli_extra_args
end

#build_send_payload(text:, submit:, enter_only:, screen_text:, force: false) ⇒ Object

Raises:

  • (ArgumentError)


84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/harnex/adapters/pi.rb', line 84

def build_send_payload(text:, submit:, enter_only:, screen_text:, force: false)
  state = input_state(nil)
  if !force && submit && !enter_only && state[:input_ready] != true
    raise ArgumentError, blocked_message(state, enter_only: enter_only)
  end

  raise ArgumentError, "Pi RPC cannot stage input without submitting it" unless submit || enter_only
  raise ArgumentError, "Pi RPC does not support submit-only input" if enter_only

  {
    dispatch: { prompt: text.to_s },
    input_state: state,
    force: force
  }
end

#closeObject



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/harnex/adapters/pi.rb', line 190

def close
  return if @closed

  @closed = true
  fail_pending_requests(StandardError.new("pi rpc client closed"))

  begin
    @write_io.close unless @write_io&.closed?
  rescue IOError
    nil
  end

  @reader_thread&.join(2)
ensure
  terminate_subprocess(
    term_grace_seconds: STOP_TERM_GRACE_SECONDS,
    kill_grace_seconds: STOP_KILL_GRACE_SECONDS
  )
end

#collect_session_summaryObject



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/harnex/adapters/pi.rb', line 171

def collect_session_summary
  attempt_live_summary_refresh if connected?

  @summary_mutex.synchronize do
    {
      input_tokens: @session_summary[:input_tokens],
      output_tokens: @session_summary[:output_tokens],
      reasoning_tokens: nil,
      cached_tokens: @session_summary[:cached_tokens],
      total_tokens: @session_summary[:total_tokens],
      agent_session_id: @session_summary[:agent_session_id],
      tool_calls: @session_summary[:tool_calls],
      cost_usd: @session_summary[:cost_usd],
      model: @session_summary[:model],
      agent_provider: @session_summary[:agent_provider]
    }
  end
end

#current_modelObject



48
49
50
# File 'lib/harnex/adapters/pi.rb', line 48

def current_model
  @model
end

#describeObject



60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/harnex/adapters/pi.rb', line 60

def describe
  {
    transport: transport,
    protocol: "jsonl",
    events: %w[
      agent_start agent_end turn_start turn_end message_start message_update message_end
      tool_execution_start tool_execution_update tool_execution_end queue_update
      compaction_start compaction_end auto_retry_start auto_retry_end extension_error
      extension_ui_request
    ]
  }
end

#dispatch(prompt:, model: nil, effort: nil) ⇒ Object



129
130
131
132
133
134
135
136
137
138
139
# File 'lib/harnex/adapters/pi.rb', line 129

def dispatch(prompt:, model: nil, effort: nil)
  ensure_open!

  payload = { "type" => "prompt", "message" => prompt.to_s }
  payload["model"] = model if model
  payload["thinkingLevel"] = effort if effort

  request(payload)
  @state = :busy
  nil
end

#inject_exit(_writer, **_kwargs) ⇒ Object



100
101
102
# File 'lib/harnex/adapters/pi.rb', line 100

def inject_exit(_writer, **_kwargs)
  nil
end

#input_state(_screen_text = nil) ⇒ Object



77
78
79
80
81
82
# File 'lib/harnex/adapters/pi.rb', line 77

def input_state(_screen_text = nil)
  {
    state: @state.to_s,
    input_ready: @state == :prompt
  }
end

#interrupt(turn_id: nil) ⇒ Object



141
142
143
144
145
146
# File 'lib/harnex/adapters/pi.rb', line 141

def interrupt(turn_id: nil)
  ensure_open!
  request("type" => "abort")
rescue StandardError
  nil
end

#on_disconnect(&block) ⇒ Object



108
109
110
# File 'lib/harnex/adapters/pi.rb', line 108

def on_disconnect(&block)
  @disconnect_handler = block
end

#on_notification(&block) ⇒ Object



104
105
106
# File 'lib/harnex/adapters/pi.rb', line 104

def on_notification(&block)
  @notification_handler = block
end

#pidObject



230
231
232
# File 'lib/harnex/adapters/pi.rb', line 230

def pid
  @pid
end

#providerObject



44
45
46
# File 'lib/harnex/adapters/pi.rb', line 44

def provider
  @provider
end

#request_session_stats_asyncObject



148
149
150
151
152
153
154
155
156
# File 'lib/harnex/adapters/pi.rb', line 148

def request_session_stats_async
  return if @closed
  return if @session_stats_requested

  @session_stats_requested = true
  write_line("type" => "get_session_stats")
rescue StandardError
  nil
end

#respond_extension_ui_cancel(request_id:, method:) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/harnex/adapters/pi.rb', line 158

def respond_extension_ui_cancel(request_id:, method:)
  return false unless DIALOG_UI_METHODS.include?(method.to_s)

  write_line(
    "type" => "extension_ui_response",
    "id" => request_id,
    "cancelled" => true
  )
  true
rescue StandardError
  false
end

#start_rpc(env: nil, cwd: nil, read_io: nil, write_io: nil, pid: nil) ⇒ Object



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/harnex/adapters/pi.rb', line 112

def start_rpc(env: nil, cwd: nil, read_io: nil, write_io: nil, pid: nil)
  if read_io && write_io
    @read_io = read_io
    @write_io = write_io
    @pid = pid
  else
    @pid, @write_io, @read_io = spawn_subprocess(env, cwd)
  end

  @closed = false
  @disconnect_signaled = false
  @state = :prompt
  @reader_thread = Thread.new { read_loop }
  request_state_async
  self
end

#stateObject



73
74
75
# File 'lib/harnex/adapters/pi.rb', line 73

def state
  @state
end

#terminate_subprocess(term_grace_seconds: STOP_TERM_GRACE_SECONDS, kill_grace_seconds: STOP_KILL_GRACE_SECONDS) ⇒ Object



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/harnex/adapters/pi.rb', line 210

def terminate_subprocess(term_grace_seconds: STOP_TERM_GRACE_SECONDS, kill_grace_seconds: STOP_KILL_GRACE_SECONDS)
  return false unless @pid

  begin
    Process.kill("TERM", @pid)
  rescue Errno::ESRCH
    return true
  end

  return true if wait_for_process_exit(@pid, term_grace_seconds)

  begin
    Process.kill("KILL", @pid)
  rescue Errno::ESRCH
    return true
  end

  wait_for_process_exit(@pid, kill_grace_seconds)
end

#transportObject



40
41
42
# File 'lib/harnex/adapters/pi.rb', line 40

def transport
  :stdio_jsonl_rpc
end