Class: HTM::WorkingMemoryChannel

Inherits:
Object
  • Object
show all
Defined in:
lib/htm/working_memory_channel.rb

Overview

Provides real-time synchronization of working memory changes across multiple robots using PostgreSQL LISTEN/NOTIFY pub/sub mechanism.

This class enables distributed robots to maintain synchronized working memory by broadcasting change notifications through PostgreSQL channels. When one robot adds, evicts, or clears working memory, all other robots in the group receive immediate notification.

Examples:

Basic usage

channel = HTM::WorkingMemoryChannel.new('support-team', db_config)

# Subscribe to changes
channel.on_change do |event, node_id, robot_id|
  case event
  when :added   then puts "Node #{node_id} added by robot #{robot_id}"
  when :evicted then puts "Node #{node_id} evicted by robot #{robot_id}"
  when :cleared then puts "Working memory cleared by robot #{robot_id}"
  end
end

# Start listening in background thread
channel.start_listening

# Publish a change
channel.notify(:added, node_id: 123, robot_id: 456)

# Cleanup when done
channel.stop_listening

See Also:

Constant Summary collapse

CHANNEL_PREFIX =

Prefix used for all PostgreSQL channel names

Returns:

  • (String)
'htm_wm'

Instance Attribute Summary collapse

Publishing collapse

Subscribing collapse

Status collapse

Instance Method Summary collapse

Constructor Details

#initialize(group_name, db_config) ⇒ WorkingMemoryChannel

Creates a new working memory channel for a robot group.

The channel name is derived from the group name with non-alphanumeric characters replaced by underscores to ensure PostgreSQL compatibility.

Examples:

db_config = { host: 'localhost', port: 5432, dbname: 'htm_dev', user: 'postgres' }
channel = HTM::WorkingMemoryChannel.new('customer-support', db_config)

Parameters:

  • group_name (String)

    Name of the robot group (used to create unique channel)

  • db_config (Hash)

    PostgreSQL connection configuration hash

Options Hash (db_config):

  • :host (String)

    Database host

  • :port (Integer)

    Database port

  • :dbname (String)

    Database name

  • :user (String)

    Database user

  • :password (String)

    Database password (optional)



62
63
64
65
66
67
68
69
70
71
# File 'lib/htm/working_memory_channel.rb', line 62

def initialize(group_name, db_config)
  @group_name             = group_name
  @channel                = "#{CHANNEL_PREFIX}_#{group_name.gsub(/[^a-z0-9_]/i, '_')}"
  @db_config              = db_config
  @listeners              = []
  @listen_thread          = nil
  @stop_requested         = false
  @notifications_received = 0
  @mutex                  = Mutex.new
end

Instance Attribute Details

#notifications_receivedInteger (readonly)

Number of notifications received since channel was created

Returns:

  • (Integer)


43
44
45
# File 'lib/htm/working_memory_channel.rb', line 43

def notifications_received
  @notifications_received
end

Instance Method Details

#channel_nameString

Returns the PostgreSQL channel name used for notifications.

The channel name is derived from the group name with a prefix and sanitization of special characters.

Examples:

channel = HTM::WorkingMemoryChannel.new('my-group', db_config)
channel.channel_name  # => "htm_wm_my_group"

Returns:

  • (String)

    The PostgreSQL LISTEN/NOTIFY channel name



198
199
200
# File 'lib/htm/working_memory_channel.rb', line 198

def channel_name
  @channel
end

#listening?Boolean

Checks if the listener thread is currently active.

Examples:

channel.start_listening
channel.listening?  # => true
channel.stop_listening
channel.listening?  # => false

Returns:

  • (Boolean)

    true if listening for notifications, false otherwise



183
184
185
# File 'lib/htm/working_memory_channel.rb', line 183

def listening?
  @listen_thread&.alive? || false
end

#notify(event, node_id:, robot_id:) ⇒ void

This method returns an undefined value.

Broadcasts a working memory change notification to all listeners.

Uses PostgreSQL’s pg_notify function to send a JSON payload containing the event type, affected node ID, originating robot ID, and timestamp.

Examples:

Notify that a node was added

channel.notify(:added, node_id: 123, robot_id: 1)

Notify that working memory was cleared

channel.notify(:cleared, node_id: nil, robot_id: 1)

Parameters:

  • event (Symbol)

    Type of change (:added, :evicted, or :cleared)

  • node_id (Integer, nil)

    ID of the affected node (nil for :cleared events)

  • robot_id (Integer)

    ID of the robot that triggered the change



91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/htm/working_memory_channel.rb', line 91

def notify(event, node_id:, robot_id:)
  payload = {
    event: event,
    node_id: node_id,
    robot_id: robot_id,
    timestamp: Time.now.iso8601
  }.to_json

  with_connection do |conn|
    conn.exec_params('SELECT pg_notify($1, $2)', [@channel, payload])
  end
end

#on_change {|event, node_id, robot_id| ... } ⇒ void

This method returns an undefined value.

Registers a callback to be invoked when working memory changes occur.

Multiple callbacks can be registered; all will be called for each event. Callbacks are invoked synchronously within the listener thread.

Examples:

Register a change handler

channel.on_change do |event, node_id, robot_id|
  puts "Received #{event} event for node #{node_id}"
end

Yields:

  • (event, node_id, robot_id)

    Block called for each notification

Yield Parameters:

  • event (Symbol)

    Type of change (:added, :evicted, or :cleared)

  • node_id (Integer, nil)

    ID of the affected node

  • robot_id (Integer)

    ID of the robot that triggered the change



124
125
126
# File 'lib/htm/working_memory_channel.rb', line 124

def on_change(&callback)
  @mutex.synchronize { @listeners << callback }
end

#start_listeningThread

Starts listening for notifications in a background thread.

Creates a dedicated PostgreSQL connection that uses LISTEN to receive notifications. The thread polls every 0.5 seconds, allowing for clean shutdown via #stop_listening.

Examples:

Start and verify listening

thread = channel.start_listening
puts "Listening: #{channel.listening?}"  # => true

Returns:

  • (Thread)

    The background listener thread



140
141
142
143
144
145
146
147
# File 'lib/htm/working_memory_channel.rb', line 140

def start_listening
  @stop_requested = false
  @listen_thread  = Thread.new do
    listen_loop
  end
  @listen_thread.abort_on_exception = true
  @listen_thread
end

#stop_listeningvoid

This method returns an undefined value.

Stops the background listener thread.

Signals the listener to stop, waits up to 0.5 seconds for clean exit, then forcefully terminates if still running. The PostgreSQL connection is closed automatically.

Examples:

Clean shutdown

channel.stop_listening
puts "Listening: #{channel.listening?}"  # => false


161
162
163
164
165
166
167
# File 'lib/htm/working_memory_channel.rb', line 161

def stop_listening
  @stop_requested = true
  # Give the thread a moment to exit cleanly
  @listen_thread&.join(0.5)
  @listen_thread&.kill if @listen_thread&.alive?
  @listen_thread = nil
end