Class: CollavreOpenclaw::ProactiveMessageHandler

Inherits:
Object
  • Object
show all
Defined in:
app/services/collavre_openclaw/proactive_message_handler.rb

Overview

Handles proactive (unsolicited) chat events from OpenClaw Gateway.

Proactive messages arrive when the Gateway agent initiates communication (e.g., cron jobs, heartbeats, reminders) without a prior request from Collavre.

Events arrive as streaming deltas followed by a final event, just like regular chat responses. This handler buffers deltas per runId and dispatches the complete message to CallbackProcessorJob on final.

Thread safety: called from the EventMachine reactor thread. All operations should be non-blocking. Job enqueueing is safe from any thread.

Stale buffer cleanup: buffers older than BUFFER_TTL are periodically purged to prevent memory leaks when the Gateway never sends a final/error event.

Constant Summary collapse

BUFFER_TTL =

Maximum age (seconds) for a buffer before it’s considered stale and purged. Generous timeout for long-running AI responses.

30.minutes.to_i
SWEEP_INTERVAL =

How often (seconds) to check for stale buffers.

60
DISPATCHED_RUN_TTL =

Cooldown (seconds) to remember dispatched runIds and suppress duplicates.

60

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeProactiveMessageHandler

Returns a new instance of ProactiveMessageHandler.



27
28
29
30
31
32
# File 'app/services/collavre_openclaw/proactive_message_handler.rb', line 27

def initialize
  @buffers = {}           # runId β†’ { text:, session_key:, connection_owner_id:, created_at: }
  @dispatched_runs = {}   # runId β†’ monotonic timestamp (dedup for broadcast+nodeSend duplicates)
  @mutex = Mutex.new
  @last_sweep_at = monotonic_now
end

Class Method Details

.parse_session_key(session_key) ⇒ Hash

Parse a session key into its component parts. Format: agent:<agent_id>:collavre:<user_id>[:topic:<id>]

Parameters:

  • session_key (String)

Returns:

  • (Hash)

    with :agent_id, :user_id, :creative_id, :topic_id keys



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'app/services/collavre_openclaw/proactive_message_handler.rb', line 68

def self.parse_session_key(session_key)
  return {} unless session_key.present?

  result = {}
  parts = session_key.split(":")

  # Parse key:value pairs
  i = 0
  while i < parts.length - 1
    key = parts[i]
    value = parts[i + 1]

    case key
    when "agent"
      result[:agent_id] = value
      i += 2
    when "collavre"
      result[:user_id] = value.to_i
      i += 2
    when "creative"
      result[:creative_id] = value.to_i
      i += 2
    when "topic"
      result[:topic_id] = value.to_i
      i += 2
    else
      i += 1
    end
  end

  result
end

Instance Method Details

#handle(user, payload) ⇒ Object

Process a single chat event payload from the WebSocket client. Called by WebsocketClient#on_proactive_message.

Parameters:

  • user (User)

    the user who owns the Gateway connection

  • payload (Hash)

    the chat event payload with :runId, :state, :message, :sessionKey



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'app/services/collavre_openclaw/proactive_message_handler.rb', line 39

def handle(user, payload)
  run_id = payload[:runId]
  return unless run_id

  sweep_stale_buffers_if_needed!

  state = payload[:state]&.to_s
  session_key = payload[:sessionKey]

  case state
  when "delta"
    buffer_delta(run_id, user, session_key, payload)
  when "final"
    handle_final(run_id, user, session_key, payload)
  when "error"
    handle_error(run_id, user, payload)
  when "aborted"
    cleanup(run_id)
  else
    # Unknown state β€” log and ignore
    Rails.logger.debug("[CollavreOpenclaw::Proactive] Unknown state '#{state}' for runId=#{run_id}")
  end
end