Class: HTM::WorkingMemoryChannel
- Inherits:
-
Object
- Object
- HTM::WorkingMemoryChannel
- Defined in:
- lib/htm/working_memory_channel.rb
Overview
Provides real-time synchronization of working memory changes across multiple robots using PostgreSQL LISTEN/NOTIFY pub/sub mechanism.
This class enables distributed robots to maintain synchronized working memory by broadcasting change notifications through PostgreSQL channels. When one robot adds, evicts, or clears working memory, all other robots in the group receive immediate notification.
Constant Summary collapse
- CHANNEL_PREFIX =
Prefix used for all PostgreSQL channel names
'htm_wm'
Instance Attribute Summary collapse
-
#notifications_received ⇒ Integer
readonly
Number of notifications received since channel was created.
Publishing collapse
-
#notify(event, node_id:, robot_id:) ⇒ void
Broadcasts a working memory change notification to all listeners.
Subscribing collapse
-
#on_change {|event, node_id, robot_id| ... } ⇒ void
Registers a callback to be invoked when working memory changes occur.
-
#start_listening ⇒ Thread
Starts listening for notifications in a background thread.
-
#stop_listening ⇒ void
Stops the background listener thread.
Status collapse
-
#channel_name ⇒ String
Returns the PostgreSQL channel name used for notifications.
-
#listening? ⇒ Boolean
Checks if the listener thread is currently active.
Instance Method Summary collapse
-
#initialize(group_name, db_config) ⇒ WorkingMemoryChannel
constructor
Creates a new working memory channel for a robot group.
Constructor Details
#initialize(group_name, db_config) ⇒ WorkingMemoryChannel
Creates a new working memory channel for a robot group.
The channel name is derived from the group name with non-alphanumeric characters replaced by underscores to ensure PostgreSQL compatibility.
62 63 64 65 66 67 68 69 70 71 |
# File 'lib/htm/working_memory_channel.rb', line 62 def initialize(group_name, db_config) @group_name = group_name @channel = "#{CHANNEL_PREFIX}_#{group_name.gsub(/[^a-z0-9_]/i, '_')}" @db_config = db_config @listeners = [] @listen_thread = nil @stop_requested = false @notifications_received = 0 @mutex = Mutex.new end |
Instance Attribute Details
#notifications_received ⇒ Integer (readonly)
Number of notifications received since channel was created
43 44 45 |
# File 'lib/htm/working_memory_channel.rb', line 43 def notifications_received @notifications_received end |
Instance Method Details
#channel_name ⇒ String
Returns the PostgreSQL channel name used for notifications.
The channel name is derived from the group name with a prefix and sanitization of special characters.
198 199 200 |
# File 'lib/htm/working_memory_channel.rb', line 198 def channel_name @channel end |
#listening? ⇒ Boolean
Checks if the listener thread is currently active.
183 184 185 |
# File 'lib/htm/working_memory_channel.rb', line 183 def listening? @listen_thread&.alive? || false end |
#notify(event, node_id:, robot_id:) ⇒ void
This method returns an undefined value.
Broadcasts a working memory change notification to all listeners.
Uses PostgreSQL’s pg_notify function to send a JSON payload containing the event type, affected node ID, originating robot ID, and timestamp.
91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/htm/working_memory_channel.rb', line 91 def notify(event, node_id:, robot_id:) payload = { event: event, node_id: node_id, robot_id: robot_id, timestamp: Time.now.iso8601 }.to_json with_connection do |conn| conn.exec_params('SELECT pg_notify($1, $2)', [@channel, payload]) end end |
#on_change {|event, node_id, robot_id| ... } ⇒ void
This method returns an undefined value.
Registers a callback to be invoked when working memory changes occur.
Multiple callbacks can be registered; all will be called for each event. Callbacks are invoked synchronously within the listener thread.
124 125 126 |
# File 'lib/htm/working_memory_channel.rb', line 124 def on_change(&callback) @mutex.synchronize { @listeners << callback } end |
#start_listening ⇒ Thread
Starts listening for notifications in a background thread.
Creates a dedicated PostgreSQL connection that uses LISTEN to receive notifications. The thread polls every 0.5 seconds, allowing for clean shutdown via #stop_listening.
140 141 142 143 144 145 146 147 |
# File 'lib/htm/working_memory_channel.rb', line 140 def start_listening @stop_requested = false @listen_thread = Thread.new do listen_loop end @listen_thread.abort_on_exception = true @listen_thread end |
#stop_listening ⇒ void
This method returns an undefined value.
Stops the background listener thread.
Signals the listener to stop, waits up to 0.5 seconds for clean exit, then forcefully terminates if still running. The PostgreSQL connection is closed automatically.
161 162 163 164 165 166 167 |
# File 'lib/htm/working_memory_channel.rb', line 161 def stop_listening @stop_requested = true # Give the thread a moment to exit cleanly @listen_thread&.join(0.5) @listen_thread&.kill if @listen_thread&.alive? @listen_thread = nil end |