Class: CollavreOpenclaw::ProactiveMessageHandler
- Inherits:
-
Object
- Object
- CollavreOpenclaw::ProactiveMessageHandler
- Defined in:
- app/services/collavre_openclaw/proactive_message_handler.rb
Overview
Handles proactive (unsolicited) chat events from OpenClaw Gateway.
Proactive messages arrive when the Gateway agent initiates communication (e.g., cron jobs, heartbeats, reminders) without a prior request from Collavre.
Events arrive as streaming deltas followed by a final event, just like regular chat responses. This handler buffers deltas per runId and dispatches the complete message to CallbackProcessorJob on final.
Thread safety: called from the EventMachine reactor thread. All operations should be non-blocking. Job enqueueing is safe from any thread.
Stale buffer cleanup: buffers older than BUFFER_TTL are periodically purged to prevent memory leaks when the Gateway never sends a final/error event.
Constant Summary collapse
- BUFFER_TTL =
Maximum age (seconds) for a buffer before itβs considered stale and purged. Generous timeout for long-running AI responses.
30.minutes.to_i
- SWEEP_INTERVAL =
How often (seconds) to check for stale buffers.
60- DISPATCHED_RUN_TTL =
Cooldown (seconds) to remember dispatched runIds and suppress duplicates.
60
Class Method Summary collapse
-
.parse_session_key(session_key) ⇒ Hash
Parse a session key into its component parts.
Instance Method Summary collapse
-
#handle(user, payload) ⇒ Object
Process a single chat event payload from the WebSocket client.
-
#initialize ⇒ ProactiveMessageHandler
constructor
A new instance of ProactiveMessageHandler.
Constructor Details
#initialize ⇒ ProactiveMessageHandler
Returns a new instance of ProactiveMessageHandler.
27 28 29 30 31 32 |
# File 'app/services/collavre_openclaw/proactive_message_handler.rb', line 27 def initialize @buffers = {} # runId β { text:, session_key:, connection_owner_id:, created_at: } @dispatched_runs = {} # runId β monotonic timestamp (dedup for broadcast+nodeSend duplicates) @mutex = Mutex.new @last_sweep_at = monotonic_now end |
Class Method Details
.parse_session_key(session_key) ⇒ Hash
Parse a session key into its component parts. Format: agent:<agent_id>:collavre:<user_id>[:topic:<id>]
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'app/services/collavre_openclaw/proactive_message_handler.rb', line 68 def self.parse_session_key(session_key) return {} unless session_key.present? result = {} parts = session_key.split(":") # Parse key:value pairs i = 0 while i < parts.length - 1 key = parts[i] value = parts[i + 1] case key when "agent" result[:agent_id] = value i += 2 when "collavre" result[:user_id] = value.to_i i += 2 when "creative" result[:creative_id] = value.to_i i += 2 when "topic" result[:topic_id] = value.to_i i += 2 else i += 1 end end result end |
Instance Method Details
#handle(user, payload) ⇒ Object
Process a single chat event payload from the WebSocket client. Called by WebsocketClient#on_proactive_message.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'app/services/collavre_openclaw/proactive_message_handler.rb', line 39 def handle(user, payload) run_id = payload[:runId] return unless run_id sweep_stale_buffers_if_needed! state = payload[:state]&.to_s session_key = payload[:sessionKey] case state when "delta" buffer_delta(run_id, user, session_key, payload) when "final" handle_final(run_id, user, session_key, payload) when "error" handle_error(run_id, user, payload) when "aborted" cleanup(run_id) else # Unknown state β log and ignore Rails.logger.debug("[CollavreOpenclaw::Proactive] Unknown state '#{state}' for runId=#{run_id}") end end |