Class: CollavreOpenclaw::ConnectionManager
- Inherits:
-
Object
- Object
- CollavreOpenclaw::ConnectionManager
- Includes:
- Singleton
- Defined in:
- app/services/collavre_openclaw/connection_manager.rb
Overview
Manages WebSocket connections to OpenClaw Gateways.
Singleton: use ConnectionManager.instance
Features:
-
Lazy connect (creates connection on first use)
-
Connection sharing (same gateway_url → same connection)
-
Idle timeout (disconnects after inactivity)
-
Thread-safe access
-
Graceful shutdown
Multiple AI agents using the same Gateway share a single WebSocket connection, reducing resource usage and connection overhead.
Class Method Summary collapse
-
.status_summary ⇒ Object
Safe accessor: returns status without triggering singleton initialization.
Instance Method Summary collapse
-
#connected_count ⇒ Object
Number of active connections.
-
#connection_for(user) ⇒ WebsocketClient
Get or create a WebSocket connection for a user.
-
#disconnect(user) ⇒ Object
Disconnect a specific user from their gateway connection.
-
#disconnect_all ⇒ Object
Disconnect all connections (for app shutdown).
-
#initialize ⇒ ConnectionManager
constructor
A new instance of ConnectionManager.
-
#on_proactive_message(&handler) ⇒ Object
Register a proactive message handler for all connections.
-
#status ⇒ Object
Status summary.
-
#user_connected?(user) ⇒ Boolean
Check if a user has an active connection.
-
#users_for_gateway(gateway_url) ⇒ Object
Get the number of users sharing a specific gateway.
Constructor Details
#initialize ⇒ ConnectionManager
Returns a new instance of ConnectionManager.
18 19 20 21 22 23 24 25 26 27 28 |
# File 'app/services/collavre_openclaw/connection_manager.rb', line 18 def initialize @connections = {} # gateway_url → WebsocketClient @gateway_users = {} # gateway_url → Set<user_id> (users sharing this gateway) @user_gateways = {} # user_id → gateway_url (reverse lookup) @mutex = Mutex.new @idle_checker = nil @proactive_handler = nil @proactive_message_handler = ProactiveMessageHandler.new setup_default_proactive_handler! start_idle_checker! end |
Class Method Details
.status_summary ⇒ Object
Safe accessor: returns status without triggering singleton initialization. Use this from controllers/monitoring instead of instance_variable_get.
121 122 123 124 125 126 127 128 |
# File 'app/services/collavre_openclaw/connection_manager.rb', line 121 def self.status_summary if instance_variable_defined?(:@singleton__instance__) && @singleton__instance__ instance.status else { total_connections: 0, total_users: 0, connected: 0, connecting: 0, reconnecting: 0, disconnected: 0 } end end |
Instance Method Details
#connected_count ⇒ Object
Number of active connections
98 99 100 101 102 |
# File 'app/services/collavre_openclaw/connection_manager.rb', line 98 def connected_count @mutex.synchronize do @connections.count { |_, c| c.connected? } end end |
#connection_for(user) ⇒ WebsocketClient
Get or create a WebSocket connection for a user. Users with the same gateway_url share a single connection.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'app/services/collavre_openclaw/connection_manager.rb', line 35 def connection_for(user) gateway_url = user.gateway_url.to_s.strip if gateway_url.blank? Rails.logger.warn("[CollavreOpenclaw::ConnectionManager] User #{user.id} has blank gateway_url, cannot create connection") return nil end @mutex.synchronize do client = @connections[gateway_url] if client.nil? client = create_client(user, gateway_url) elsif client.user.llm_api_key != user.llm_api_key # Same gateway_url but different API key. This is a configuration # error: one Gateway = one API key. Log a warning so the admin # notices, rather than silently ignoring the second user's key. # In HTTP mode this would surface as a 401 on each request. Rails.logger.warn( "[CollavreOpenclaw::ConnectionManager] API key mismatch for gateway #{gateway_url}: " \ "user #{user.id} has a different key than connection owner #{client.user.id}. " \ "The connection uses the owner's key. Verify AI agent settings." ) end # Track this user as using this gateway @gateway_users[gateway_url].add(user.id) @user_gateways[user.id] = gateway_url client end end |
#disconnect(user) ⇒ Object
Disconnect a specific user from their gateway connection. The connection is only closed if no other users are sharing it.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'app/services/collavre_openclaw/connection_manager.rb', line 69 def disconnect(user) @mutex.synchronize do gateway_url = @user_gateways.delete(user.id) return unless gateway_url users = @gateway_users[gateway_url] users&.delete(user.id) # Only disconnect if no users are sharing this gateway if users.nil? || users.empty? client = @connections.delete(gateway_url) @gateway_users.delete(gateway_url) client&.disconnect! end end end |
#disconnect_all ⇒ Object
Disconnect all connections (for app shutdown)
87 88 89 90 91 92 93 94 95 |
# File 'app/services/collavre_openclaw/connection_manager.rb', line 87 def disconnect_all @mutex.synchronize do @connections.each_value(&:disconnect!) @connections.clear @gateway_users.clear @user_gateways.clear end stop_idle_checker! end |
#on_proactive_message(&handler) ⇒ Object
Register a proactive message handler for all connections. New connections will also get this handler.
132 133 134 135 136 137 138 139 |
# File 'app/services/collavre_openclaw/connection_manager.rb', line 132 def (&handler) @mutex.synchronize do @proactive_handler = handler @connections.each_value do |client| client.(&handler) end end end |
#status ⇒ Object
Status summary
105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'app/services/collavre_openclaw/connection_manager.rb', line 105 def status @mutex.synchronize do states = @connections.values.group_by(&:state) { connected: states[:connected]&.size || 0, connecting: states[:connecting]&.size || 0, reconnecting: states[:reconnecting]&.size || 0, disconnected: states[:disconnected]&.size || 0, total_connections: @connections.size, total_users: @user_gateways.size } end end |
#user_connected?(user) ⇒ Boolean
Check if a user has an active connection
149 150 151 152 153 154 155 |
# File 'app/services/collavre_openclaw/connection_manager.rb', line 149 def user_connected?(user) @mutex.synchronize do gateway_url = @user_gateways[user.id] return false unless gateway_url @connections[gateway_url]&.connected? || false end end |
#users_for_gateway(gateway_url) ⇒ Object
Get the number of users sharing a specific gateway
142 143 144 145 146 |
# File 'app/services/collavre_openclaw/connection_manager.rb', line 142 def users_for_gateway(gateway_url) @mutex.synchronize do @gateway_users[gateway_url]&.size || 0 end end |