Class: CollavreOpenclaw::WebsocketClient
- Inherits:
-
Object
- Object
- CollavreOpenclaw::WebsocketClient
- Defined in:
- app/services/collavre_openclaw/websocket_client.rb
Overview
WebSocket client for a single user’s OpenClaw Gateway connection.
Handles:
-
Connection lifecycle (connect, disconnect, reconnect)
-
OpenClaw protocol handshake (connect.challenge → connect → hello-ok)
-
RPC request/response (chat.send, chat.history, chat.abort)
-
Event streaming (chat events with delta/final/error states)
-
Proactive message detection (unsolicited chat events)
-
Tick keepalive
Thread model:
-
WebSocket runs in the shared EventMachine reactor thread
-
Rails threads call public methods which bridge via EM.next_tick + Queue
Constant Summary collapse
- PROTOCOL_VERSION =
3- COMPLETED_RUN_COOLDOWN =
seconds to suppress late-arriving events for completed runs
5- SEEN_EVENT_TTL =
seconds to remember (runId, seq) pairs for dedup
30- CLOSE_POLICIES =
WebSocket close codes → reconnection policy :reconnect — schedule reconnect with exponential backoff :fatal — permanent failure (auth, forbidden), don’t retry, propagate error :normal — clean shutdown, no reconnect, no error
{ 1000 => :normal, # Normal closure 1001 => :reconnect, # Going away (server shutting down) 1006 => :reconnect, # Abnormal closure (network issue) 1008 => :fatal, # Policy violation (likely auth) 1011 => :reconnect, # Internal server error 4001 => :fatal, # Auth failure (OpenClaw) 4003 => :fatal # Forbidden (OpenClaw) }.freeze
Instance Attribute Summary collapse
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#user ⇒ Object
readonly
Returns the value of attribute user.
Instance Method Summary collapse
-
#chat_abort(session_key:, run_id: nil) ⇒ Object
Abort a running chat.
-
#chat_history(session_key:, limit: nil) ⇒ Object
Fetch chat history for a session.
-
#chat_inject(session_key:, message:, label: nil) ⇒ Object
Inject an assistant message into a session transcript.
-
#chat_send(session_key:, message:, attachments: nil, idempotency_key: nil) {|Hash| ... } ⇒ String?
Send a chat message.
-
#connect! ⇒ Object
Connect to the Gateway.
- #connected? ⇒ Boolean
-
#disconnect! ⇒ Object
Disconnect gracefully.
-
#idle_seconds ⇒ Object
Time since last activity (for idle timeout).
-
#initialize(user:) ⇒ WebsocketClient
constructor
States: :disconnected, :connecting, :connected, :reconnecting.
-
#on_fatal_close(&handler) ⇒ Object
Register a callback invoked when the connection dies with a fatal close code (auth failure, forbidden, etc.).
-
#on_proactive_message(&handler) ⇒ Object
Register a handler for proactive messages (unsolicited chat events).
Constructor Details
#initialize(user:) ⇒ WebsocketClient
States: :disconnected, :connecting, :connected, :reconnecting
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'app/services/collavre_openclaw/websocket_client.rb', line 42 def initialize(user:) @user = user @state = :disconnected @ws = nil @mutex = Mutex.new @connect_mutex = Mutex.new @connect_waiters = [] # Queues for threads waiting on in-progress connect @pending_requests = {} # id → { queue:, timer: } @pending_runs = {} # runId → Queue (for chat.send streaming) @completed_runs = {} # runId → monotonic timestamp (cooldown for late events) @seen_chat_events = {} # "runId:seq" → monotonic timestamp (broadcast+nodeSend dedup) @proactive_handler = nil @reconnect_attempts = 0 @last_activity_at = nil @rpc_run_registrations = {} # RPC request_id → run_queue (for EM-thread runId registration) end |
Instance Attribute Details
#state ⇒ Object (readonly)
Returns the value of attribute state.
39 40 41 |
# File 'app/services/collavre_openclaw/websocket_client.rb', line 39 def state @state end |
#user ⇒ Object (readonly)
Returns the value of attribute user.
39 40 41 |
# File 'app/services/collavre_openclaw/websocket_client.rb', line 39 def user @user end |
Instance Method Details
#chat_abort(session_key:, run_id: nil) ⇒ Object
Abort a running chat
294 295 296 297 298 299 300 301 |
# File 'app/services/collavre_openclaw/websocket_client.rb', line 294 def chat_abort(session_key:, run_id: nil) ensure_connected! touch_activity! params = { sessionKey: session_key } params[:runId] = run_id if run_id send_rpc("chat.abort", params) end |
#chat_history(session_key:, limit: nil) ⇒ Object
Fetch chat history for a session
284 285 286 287 288 289 290 291 |
# File 'app/services/collavre_openclaw/websocket_client.rb', line 284 def chat_history(session_key:, limit: nil) ensure_connected! touch_activity! params = { sessionKey: session_key } params[:limit] = limit if limit send_rpc("chat.history", params) end |
#chat_inject(session_key:, message:, label: nil) ⇒ Object
Inject an assistant message into a session transcript. Useful for pre-populating context without triggering an agent run.
274 275 276 277 278 279 280 281 |
# File 'app/services/collavre_openclaw/websocket_client.rb', line 274 def chat_inject(session_key:, message:, label: nil) ensure_connected! touch_activity! params = { sessionKey: session_key, message: } params[:label] = label if label send_rpc("chat.inject", params) end |
#chat_send(session_key:, message:, attachments: nil, idempotency_key: nil) {|Hash| ... } ⇒ String?
Send a chat message. Blocks and yields streaming events.
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'app/services/collavre_openclaw/websocket_client.rb', line 159 def chat_send(session_key:, message:, attachments: nil, idempotency_key: nil, &block) ensure_connected! touch_activity! idempotency_key ||= SecureRandom.uuid actual_run_id = nil run_queue = Queue.new response_text = +"" # Pre-register with idempotency_key to catch early events @mutex.synchronize { @pending_runs[idempotency_key] = run_queue } # Send the RPC request to get the real runId. # IMPORTANT: We pass the run_queue via @rpc_run_registrations so that # handle_response can register @pending_runs[actual_run_id] on the EM # thread BEFORE any subsequent chat events arrive. This prevents a race # condition where fast Gateway responses send events before the Rails # thread can re-register with the actual runId. rpc_request_id = SecureRandom.uuid @mutex.synchronize { @rpc_run_registrations[rpc_request_id] = run_queue } rpc_params = { sessionKey: session_key, message: , idempotencyKey: idempotency_key } rpc_params[:attachments] = if .present? response = send_rpc("chat.send", rpc_params, request_id: rpc_request_id) # The EM thread already registered @pending_runs[actual_run_id] in # handle_response. Clean up the idempotency_key entry if a different # runId was assigned. actual_run_id = response&.dig(:runId) || idempotency_key if actual_run_id != idempotency_key @mutex.synchronize do @pending_runs.delete(idempotency_key) # Ensure runId is registered (may already be from handle_response) @pending_runs[actual_run_id] ||= run_queue end end # Stream events until final/error/aborted last_seq = nil loop do event = wait_with_timeout(run_queue, config.read_timeout, "chat response") break if event[:done] if event[:error] raise ChatError, event[:error] end # Gateway may broadcast + nodeSend the same event, causing # duplicates on the same WebSocket. Skip already-seen seqs for # deltas only. Terminal events (final, error, aborted) must NEVER # be skipped — they break the loop and unblock the caller. seq = event[:seq] event_state = event[:state] is_terminal = event_state == "final" || event_state == "error" || event_state == "aborted" if !is_terminal && seq && last_seq && seq <= last_seq next end last_seq = seq if seq case event_state when "delta" text = extract_event_text(event) if text.present? # Gateway sends accumulated content (full text so far) in # each delta event. Compute the incremental delta for callers. delta = if response_text.present? && text.start_with?(response_text) text[response_text.length..] else text end response_text.replace(text) yield({ state: "delta", text: delta }) if delta.present? && block_given? end when "final" text = extract_event_text(event) yield({ state: "final", text: text, message: event[:message] }) if block_given? break when "error" error_msg = event[:errorMessage] || "Unknown error" yield({ state: "error", text: error_msg }) if block_given? raise ChatError, error_msg when "aborted" yield({ state: "aborted" }) if block_given? break end end response_text.presence ensure @mutex.synchronize do @pending_runs.delete(actual_run_id) if actual_run_id # Also clean up idempotency_key if send_rpc failed before we got a runId @pending_runs.delete(idempotency_key) if idempotency_key # Record completed runs so late-arriving events are suppressed now = Process.clock_gettime(Process::CLOCK_MONOTONIC) @completed_runs[actual_run_id] = now if actual_run_id @completed_runs[idempotency_key] = now if idempotency_key && idempotency_key != actual_run_id end end |
#connect! ⇒ Object
Connect to the Gateway. Blocks until connected or raises on failure. Thread-safe: concurrent callers all wait on the same handshake attempt.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'app/services/collavre_openclaw/websocket_client.rb', line 65 def connect! return if connected? waiter_queue = nil @connect_mutex.synchronize do return if connected? if @state == :connecting # Another thread is already connecting — wait on the same handshake waiter_queue = Queue.new @connect_waiters << waiter_queue else @state = :connecting end end if waiter_queue # Wait for the initiating thread to finish handshake result = wait_with_timeout(waiter_queue, config.ws_connect_timeout, "connect (waiting)") if result[:error] raise ConnectionError, result[:error] end return result end # This thread initiates the connection queue = Queue.new EmReactor.next_tick do begin do_connect!(queue) rescue StandardError => e queue.push({ error: e. }) end end begin result = wait_with_timeout(queue, config.ws_connect_timeout, "connect") rescue TimeoutError, StandardError => e # Timeout or unexpected error — reset state and wake all waiters error_result = { error: e. } @connect_mutex.synchronize do @state = :disconnected @connect_waiters.each { |q| q.push(error_result) } @connect_waiters.clear end raise ConnectionError, e. end # Notify all waiting threads @connect_mutex.synchronize do if result[:error] @state = :disconnected else @state = :connected @reconnect_attempts = 0 touch_activity! end @connect_waiters.each { |q| q.push(result) } @connect_waiters.clear end if result[:error] raise ConnectionError, result[:error] end result end |
#connected? ⇒ Boolean
59 60 61 |
# File 'app/services/collavre_openclaw/websocket_client.rb', line 59 def connected? @state == :connected end |
#disconnect! ⇒ Object
Disconnect gracefully
137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'app/services/collavre_openclaw/websocket_client.rb', line 137 def disconnect! @state = :disconnected EmReactor.next_tick do @ws&.close @ws = nil end # Unblock any waiting requests @pending_requests.each_value { |pr| pr[:queue]&.push({ error: "disconnected" }) } @pending_requests.clear @pending_runs.each_value { |q| q.push({ done: true }) } @pending_runs.clear @rpc_run_registrations.clear end |
#idle_seconds ⇒ Object
Time since last activity (for idle timeout)
316 317 318 319 |
# File 'app/services/collavre_openclaw/websocket_client.rb', line 316 def idle_seconds return Float::INFINITY unless @last_activity_at Process.clock_gettime(Process::CLOCK_MONOTONIC) - @last_activity_at end |
#on_fatal_close(&handler) ⇒ Object
Register a callback invoked when the connection dies with a fatal close code (auth failure, forbidden, etc.). ConnectionManager uses this to remove dead clients.
311 312 313 |
# File 'app/services/collavre_openclaw/websocket_client.rb', line 311 def on_fatal_close(&handler) @on_fatal_close = handler end |
#on_proactive_message(&handler) ⇒ Object
Register a handler for proactive messages (unsolicited chat events). The handler receives (user, payload) where user is the connection owner.
305 306 307 |
# File 'app/services/collavre_openclaw/websocket_client.rb', line 305 def (&handler) @proactive_handler = handler end |