Class: CollavreOpenclaw::ConnectionManager

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
app/services/collavre_openclaw/connection_manager.rb

Overview

Manages WebSocket connections to OpenClaw Gateways.

Singleton: use ConnectionManager.instance

Features:

  • Lazy connect (creates connection on first use)

  • Connection sharing (same gateway_url → same connection)

  • Idle timeout (disconnects after inactivity)

  • Thread-safe access

  • Graceful shutdown

Multiple AI agents using the same Gateway share a single WebSocket connection, reducing resource usage and connection overhead.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeConnectionManager

Returns a new instance of ConnectionManager.



18
19
20
21
22
23
24
25
26
27
28
# File 'app/services/collavre_openclaw/connection_manager.rb', line 18

def initialize
  @connections = {} # gateway_url → WebsocketClient
  @gateway_users = {} # gateway_url → Set<user_id> (users sharing this gateway)
  @user_gateways = {} # user_id → gateway_url (reverse lookup)
  @mutex = Mutex.new
  @idle_checker = nil
  @proactive_handler = nil
  @proactive_message_handler = ProactiveMessageHandler.new
  setup_default_proactive_handler!
  start_idle_checker!
end

Class Method Details

.status_summaryObject

Safe accessor: returns status without triggering singleton initialization. Use this from controllers/monitoring instead of instance_variable_get.



121
122
123
124
125
126
127
128
# File 'app/services/collavre_openclaw/connection_manager.rb', line 121

def self.status_summary
  if instance_variable_defined?(:@singleton__instance__) && @singleton__instance__
    instance.status
  else
    { total_connections: 0, total_users: 0,
      connected: 0, connecting: 0, reconnecting: 0, disconnected: 0 }
  end
end

Instance Method Details

#connected_countObject

Number of active connections



98
99
100
101
102
# File 'app/services/collavre_openclaw/connection_manager.rb', line 98

def connected_count
  @mutex.synchronize do
    @connections.count { |_, c| c.connected? }
  end
end

#connection_for(user) ⇒ WebsocketClient

Get or create a WebSocket connection for a user. Users with the same gateway_url share a single connection.

Parameters:

  • user (User)

    must respond to #gateway_url and #llm_api_key

Returns:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'app/services/collavre_openclaw/connection_manager.rb', line 35

def connection_for(user)
  gateway_url = user.gateway_url.to_s.strip
  if gateway_url.blank?
    Rails.logger.warn("[CollavreOpenclaw::ConnectionManager] User #{user.id} has blank gateway_url, cannot create connection")
    return nil
  end

  @mutex.synchronize do
    client = @connections[gateway_url]

    if client.nil?
      client = create_client(user, gateway_url)
    elsif client.user.llm_api_key != user.llm_api_key
      # Same gateway_url but different API key. This is a configuration
      # error: one Gateway = one API key. Log a warning so the admin
      # notices, rather than silently ignoring the second user's key.
      # In HTTP mode this would surface as a 401 on each request.
      Rails.logger.warn(
        "[CollavreOpenclaw::ConnectionManager] API key mismatch for gateway #{gateway_url}: " \
        "user #{user.id} has a different key than connection owner #{client.user.id}. " \
        "The connection uses the owner's key. Verify AI agent settings."
      )
    end

    # Track this user as using this gateway
    @gateway_users[gateway_url].add(user.id)
    @user_gateways[user.id] = gateway_url

    client
  end
end

#disconnect(user) ⇒ Object

Disconnect a specific user from their gateway connection. The connection is only closed if no other users are sharing it.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'app/services/collavre_openclaw/connection_manager.rb', line 69

def disconnect(user)
  @mutex.synchronize do
    gateway_url = @user_gateways.delete(user.id)
    return unless gateway_url

    users = @gateway_users[gateway_url]
    users&.delete(user.id)

    # Only disconnect if no users are sharing this gateway
    if users.nil? || users.empty?
      client = @connections.delete(gateway_url)
      @gateway_users.delete(gateway_url)
      client&.disconnect!
    end
  end
end

#disconnect_allObject

Disconnect all connections (for app shutdown)



87
88
89
90
91
92
93
94
95
# File 'app/services/collavre_openclaw/connection_manager.rb', line 87

def disconnect_all
  @mutex.synchronize do
    @connections.each_value(&:disconnect!)
    @connections.clear
    @gateway_users.clear
    @user_gateways.clear
  end
  stop_idle_checker!
end

#on_proactive_message(&handler) ⇒ Object

Register a proactive message handler for all connections. New connections will also get this handler.



132
133
134
135
136
137
138
139
# File 'app/services/collavre_openclaw/connection_manager.rb', line 132

def on_proactive_message(&handler)
  @mutex.synchronize do
    @proactive_handler = handler
    @connections.each_value do |client|
      client.on_proactive_message(&handler)
    end
  end
end

#statusObject

Status summary



105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'app/services/collavre_openclaw/connection_manager.rb', line 105

def status
  @mutex.synchronize do
    states = @connections.values.group_by(&:state)
    {
      connected: states[:connected]&.size || 0,
      connecting: states[:connecting]&.size || 0,
      reconnecting: states[:reconnecting]&.size || 0,
      disconnected: states[:disconnected]&.size || 0,
      total_connections: @connections.size,
      total_users: @user_gateways.size
    }
  end
end

#user_connected?(user) ⇒ Boolean

Check if a user has an active connection

Returns:

  • (Boolean)


149
150
151
152
153
154
155
# File 'app/services/collavre_openclaw/connection_manager.rb', line 149

def user_connected?(user)
  @mutex.synchronize do
    gateway_url = @user_gateways[user.id]
    return false unless gateway_url
    @connections[gateway_url]&.connected? || false
  end
end

#users_for_gateway(gateway_url) ⇒ Object

Get the number of users sharing a specific gateway



142
143
144
145
146
# File 'app/services/collavre_openclaw/connection_manager.rb', line 142

def users_for_gateway(gateway_url)
  @mutex.synchronize do
    @gateway_users[gateway_url]&.size || 0
  end
end