Class: RobotLab::Memory

Inherits:
Object
  • Object
show all
Includes:
Utils
Defined in:
lib/robot_lab/memory.rb

Overview

Unified memory system for Robot and Network execution

Memory is a reactive key-value store backed by Redis (if available) or an internal Hash object. It provides persistent storage for runtime data, conversation history, and arbitrary user-defined values.

Reactive Features

Memory supports pub/sub semantics where robots can subscribe to key changes and optionally block until values become available:

  • ‘set(key, value)` - Write a value and notify subscribers asynchronously

  • ‘get(key, wait: true)` - Read a value, blocking until it exists if needed

  • ‘subscribe(*keys)` - Register a callback for key changes

Reserved keys with special accessors:

  • :data - runtime data (StateProxy for method-style access)

  • :results - accumulated robot results

  • :messages - conversation history

  • :session_id - conversation session identifier for history persistence

  • :cache - semantic cache instance (RubyLLM::SemanticCache)

Examples:

Basic usage

memory = Memory.new
memory.set(:user_id, 123)
memory.get(:user_id)  # => 123

Blocking read

# In robot A (writer)
memory.set(:sentiment, { score: 0.8 })

# In robot B (reader, may run concurrently)
result = memory.get(:sentiment, wait: true)   # Blocks until available
result = memory.get(:sentiment, wait: 30)     # Blocks up to 30 seconds

Multiple keys

results = memory.get(:sentiment, :entities, :keywords, wait: 60)
# => { sentiment: {...}, entities: [...], keywords: [...] }

Subscriptions (async callbacks)

memory.subscribe(:raw_data) do |change|
  puts "#{change.key} changed by #{change.writer}"
  enriched = enrich(change.value)
  memory.set(:enriched, enriched)
end

Using reserved keys

memory.data[:category] = "billing"
memory.data.category  # => "billing"
memory.results  # => []
memory.cache  # => RubyLLM::SemanticCache instance

Constant Summary collapse

RESERVED_KEYS =

Reserved keys that have special behavior

%i[data results messages session_id cache].freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(data: {}, results: [], messages: [], session_id: nil, backend: :auto, enable_cache: true, network_name: nil) ⇒ Memory

Creates a new Memory instance.

Examples:

Basic memory with caching enabled

Memory.new(data: { category: nil, resolved: false })

Memory with caching disabled

Memory.new(enable_cache: false)

Network-owned memory

Memory.new(network_name: "support_pipeline")

Parameters:

  • data (Hash) (defaults to: {})

    initial runtime data

  • results (Array<RobotResult>) (defaults to: [])

    pre-loaded robot results

  • messages (Array<Message, Hash>) (defaults to: [])

    pre-loaded conversation messages

  • session_id (String, nil) (defaults to: nil)

    conversation session identifier

  • backend (Symbol) (defaults to: :auto)

    storage backend (:auto, :redis, :hash)

  • enable_cache (Boolean) (defaults to: true)

    whether to enable semantic caching (default: true)

  • network_name (String, nil) (defaults to: nil)

    the network this memory belongs to



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/robot_lab/memory.rb', line 92

def initialize(data: {}, results: [], messages: [], session_id: nil, backend: :auto, enable_cache: true,
               network_name: nil)
  @backend = select_backend(backend)
  @mutex = Mutex.new
  @enable_cache = enable_cache
  @network_name = network_name
  @current_writer = nil

  # Initialize reserved keys
  set_internal(:data, data.is_a?(Hash) ? data.transform_keys(&:to_sym) : data)
  set_internal(:results, Array(results))
  set_internal(:messages, Array(messages).map { |m| normalize_message(m) })
  set_internal(:session_id, session_id)
  set_internal(:cache, @enable_cache ? RubyLLM::SemanticCache : nil)

  # Data proxy for method-style access
  @data = nil

  # Reactive infrastructure
  @subscriptions = Hash.new { |h, k| h[k] = [] }
  @pattern_subscriptions = []
  @waiters = Hash.new { |h, k| h[k] = [] }
  @subscription_mutex = Mutex.new
  @waiter_mutex = Mutex.new

  # Notification coalescing — batches multiple key changes into a single
  # drainer fiber rather than spawning one Async task per callback per change.
  @notification_queue       = []
  @notification_queue_mutex = Mutex.new
  @drainer_scheduled        = false
end

Instance Attribute Details

#current_writerString?

Returns the name of the robot currently writing.

Returns:

  • (String, nil)

    the name of the robot currently writing



71
# File 'lib/robot_lab/memory.rb', line 71

attr_reader :network_name

#network_nameObject (readonly)

Returns the value of attribute network_name.



71
72
73
# File 'lib/robot_lab/memory.rb', line 71

def network_name
  @network_name
end

Class Method Details

.from_hash(hash) ⇒ Memory

Reconstruct memory from hash

A new semantic cache instance is created automatically.

Parameters:

  • hash (Hash)

Returns:



660
661
662
663
664
665
666
667
668
669
670
671
672
673
# File 'lib/robot_lab/memory.rb', line 660

def self.from_hash(hash)
  hash = hash.transform_keys(&:to_sym)
  memory = new(
    data: hash[:data] || {},
    results: (hash[:results] || []).map { |r| RobotResult.from_hash(r) },
    messages: (hash[:messages] || []).map { |m| Message.from_hash(m) },
    session_id: hash[:session_id]
  )

  # Restore custom keys
  (hash[:custom] || {}).each { |k, v| memory[k] = v }

  memory
end

Instance Method Details

#[](key) ⇒ Object

Get value by key

Parameters:

  • key (Symbol, String)

    the key to retrieve

Returns:

  • (Object)

    the stored value



129
130
131
132
133
134
# File 'lib/robot_lab/memory.rb', line 129

def [](key)
  key = key.to_sym
  return send(key) if RESERVED_KEYS.include?(key) && key != :cache

  get_internal(key)
end

#[]=(key, value) ⇒ Object

Set value by key

For non-reserved keys, this delegates to #set which provides reactive notifications. For reserved keys, it bypasses notifications.

Parameters:

  • key (Symbol, String)

    the key to set

  • value (Object)

    the value to store

Returns:

  • (Object)

    the stored value

See Also:



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/robot_lab/memory.rb', line 147

def []=(key, value)
  key = key.to_sym

  # Reserved keys have special handling (no notifications)
  case key
  when :data
    @data = nil  # Reset proxy
    set_internal(:data, value.is_a?(Hash) ? value.transform_keys(&:to_sym) : value)
  when :results
    set_internal(:results, Array(value))
  when :messages
    set_internal(:messages, Array(value).map { |m| normalize_message(m) })
  when :session_id
    set_internal(:session_id, value)
  when :cache
    # Cache is read-only after initialization
    raise ArgumentError, "Cannot reassign cache - it is initialized automatically"
  else
    # Non-reserved keys use reactive set
    set(key, value)
  end
end

#all_keysArray<Symbol>

Get all keys including reserved

Returns:

  • (Array<Symbol>)


543
544
545
546
547
# File 'lib/robot_lab/memory.rb', line 543

def all_keys
  @mutex.synchronize do
    @backend.keys.map(&:to_sym)
  end
end

#append_result(result) ⇒ self

Append a robot result to history

Parameters:

Returns:

  • (self)


475
476
477
478
479
480
481
482
# File 'lib/robot_lab/memory.rb', line 475

def append_result(result)
  @mutex.synchronize do
    results_array = @backend[:results] || []
    results_array << result
    @backend[:results] = results_array
  end
  self
end

#cacheRubyLLM::SemanticCache

Get the semantic cache module

The cache is always active and provides semantic similarity matching for LLM responses, reducing costs and latency by returning cached responses for semantically equivalent queries.

Examples:

Using the cache with fetch

response = memory.cache.fetch("What is Ruby?") do
  RubyLLM.chat.ask("What is Ruby?")
end

Wrapping a chat instance

chat = memory.cache.wrap(RubyLLM.chat(model: "gpt-4"))
chat.ask("What is Ruby?")  # Cached on semantic similarity

Returns:

  • (RubyLLM::SemanticCache)

    the semantic cache module



228
229
230
# File 'lib/robot_lab/memory.rb', line 228

def cache
  get_internal(:cache)
end

#clearself

Clear all non-reserved keys

Returns:

  • (self)


567
568
569
570
571
572
573
# File 'lib/robot_lab/memory.rb', line 567

def clear
  @mutex.synchronize do
    keys_to_delete = @backend.keys.map(&:to_sym) - RESERVED_KEYS
    keys_to_delete.each { |k| @backend.delete(k) }
  end
  self
end

#cloneMemory Also known as: dup

Clone memory for isolated execution

The semantic cache setting and network name are preserved in clones. Subscriptions are NOT cloned - the new memory starts with fresh subscriptions.

Returns:



612
613
614
615
616
617
618
619
620
621
622
623
624
625
# File 'lib/robot_lab/memory.rb', line 612

def clone
  cloned = Memory.new(
    data: deep_dup(data.to_h),
    results: results,
    messages: messages,
    session_id: session_id,
    backend: @backend.is_a?(Hash) ? :hash : :auto,
    enable_cache: @enable_cache,
    network_name: @network_name
  )
  # Copy non-reserved keys (without triggering notifications)
  keys.each { |k| cloned.send(:set_internal, k, deep_dup(get_internal(k))) }
  cloned
end

#dataStateProxy

Access runtime data through StateProxy

Returns:

  • (StateProxy)

    proxy for method-style data access



174
175
176
# File 'lib/robot_lab/memory.rb', line 174

def data
  @data ||= StateProxy.new(get_internal(:data) || {})
end

#delete(key) ⇒ Object

Delete a key

Parameters:

  • key (Symbol, String)

Returns:

  • (Object)

    the deleted value

Raises:

  • (ArgumentError)


554
555
556
557
558
559
560
561
# File 'lib/robot_lab/memory.rb', line 554

def delete(key)
  key = key.to_sym
  raise ArgumentError, "Cannot delete reserved key: #{key}" if RESERVED_KEYS.include?(key)

  @mutex.synchronize do
    @backend.delete(key)
  end
end

#delete_document(key) ⇒ self

Remove a document from the store.

Parameters:

  • key (Symbol, String)

Returns:

  • (self)


465
466
467
468
# File 'lib/robot_lab/memory.rb', line 465

def delete_document(key)
  document_store.delete(key)
  self
end

#document_keysArray<Symbol>

Keys of all documents stored in the embedded document store.

Returns:

  • (Array<Symbol>)


457
458
459
# File 'lib/robot_lab/memory.rb', line 457

def document_keys
  document_store.keys
end

#format_history(formatter: nil) ⇒ Array<Message>

Format history for robot prompts

Combines pre-loaded messages with formatted results.

Parameters:

  • formatter (Proc, nil) (defaults to: nil)

    custom result formatter

Returns:



600
601
602
603
# File 'lib/robot_lab/memory.rb', line 600

def format_history(formatter: nil)
  formatter ||= default_formatter
  messages + results.flat_map { |r| formatter.call(r) }
end

#get(*keys, wait: false) ⇒ Object, Hash

Get one or more values, optionally waiting until they exist.

Examples:

Immediate read

memory.get(:sentiment)  # => value or nil

Blocking read

memory.get(:sentiment, wait: true)  # Blocks until available

Blocking with timeout

memory.get(:sentiment, wait: 30)  # Blocks up to 30 seconds

Multiple keys

memory.get(:sentiment, :entities, :keywords, wait: 60)
# => { sentiment: {...}, entities: [...], keywords: [...] }

Parameters:

  • keys (Array<Symbol, String>)

    one or more keys to retrieve

  • wait (Boolean, Numeric) (defaults to: false)

    wait behavior:

    • ‘false` (default): return immediately, nil if missing

    • ‘true`: block indefinitely until value(s) exist

    • ‘Numeric`: block up to that many seconds, raise AwaitTimeout if exceeded

Returns:

  • (Object, Hash)

    single value for one key, hash for multiple keys

Raises:

  • (AwaitTimeout)

    if timeout expires before value is available



295
296
297
298
299
300
301
302
303
# File 'lib/robot_lab/memory.rb', line 295

def get(*keys, wait: false)
  keys = keys.flatten.map(&:to_sym)

  if keys.one?
    get_single(keys.first, wait: wait)
  else
    get_multiple(keys, wait: wait)
  end
end

#key?(key) ⇒ Boolean Also known as: has_key?, include?

Check if key exists

Parameters:

  • key (Symbol, String)

Returns:

  • (Boolean)


518
519
520
521
522
523
524
525
# File 'lib/robot_lab/memory.rb', line 518

def key?(key)
  key = key.to_sym
  return true if RESERVED_KEYS.include?(key)

  @mutex.synchronize do
    @backend.key?(key)
  end
end

#keysArray<Symbol>

Get all keys (excluding reserved keys)

Returns:

  • (Array<Symbol>)


533
534
535
536
537
# File 'lib/robot_lab/memory.rb', line 533

def keys
  @mutex.synchronize do
    @backend.keys.map(&:to_sym) - RESERVED_KEYS
  end
end

#merge!(values) ⇒ self

Merge additional values into memory

Parameters:

  • values (Hash)

    key-value pairs to merge

Returns:

  • (self)


508
509
510
511
# File 'lib/robot_lab/memory.rb', line 508

def merge!(values)
  values.each { |k, v| self[k] = v }
  self
end

#messagesArray<Message>

Get copy of messages (immutable access)

Returns:



190
191
192
# File 'lib/robot_lab/memory.rb', line 190

def messages
  (get_internal(:messages) || []).dup
end

#redis?Boolean

Check if using Redis backend

Returns:

  • (Boolean)


679
680
681
# File 'lib/robot_lab/memory.rb', line 679

def redis?
  @backend.is_a?(RedisBackend)
end

#resetself

Reset memory to initial state

Returns:

  • (self)


579
580
581
582
583
584
585
586
587
588
589
590
591
# File 'lib/robot_lab/memory.rb', line 579

def reset
  cached = get_internal(:cache)  # Preserve cache instance
  @mutex.synchronize do
    @backend.clear
    @backend[:data] = {}
    @backend[:results] = []
    @backend[:messages] = []
    @backend[:session_id] = nil
    @backend[:cache] = cached  # Restore cache instance
  end
  @data = nil
  self
end

#resultsArray<RobotResult>

Get copy of results (immutable access)

Returns:



182
183
184
# File 'lib/robot_lab/memory.rb', line 182

def results
  (get_internal(:results) || []).dup
end

#results_from(start_index) ⇒ Array<RobotResult>

Get results from a specific index (for incremental save)

Parameters:

  • start_index (Integer)

Returns:



499
500
501
# File 'lib/robot_lab/memory.rb', line 499

def results_from(start_index)
  (get_internal(:results) || [])[start_index..] || []
end

#search_documents(query, limit: 5) ⇒ Array<Hash>

Search stored documents for the ones most semantically similar to query.

Examples:

hits = memory.search_documents("how to configure redis", limit: 3)
hits.each { |h| puts "#{h[:key]} (#{h[:score].round(3)}): #{h[:text][0..80]}" }

Parameters:

  • query (String)

    natural-language query

  • limit (Integer) (defaults to: 5)

    maximum number of results to return (default 5)

Returns:

  • (Array<Hash>)

    results sorted by score descending; each hash has :key, :text, and :score (Float 0.0..1.0)



450
451
452
# File 'lib/robot_lab/memory.rb', line 450

def search_documents(query, limit: 5)
  document_store.search(query, limit: limit)
end

#session_idString?

Get session identifier

Returns:

  • (String, nil)


198
199
200
# File 'lib/robot_lab/memory.rb', line 198

def session_id
  get_internal(:session_id)
end

#session_id=(id) ⇒ self

Set session identifier

Parameters:

  • id (String, nil)

Returns:

  • (self)


207
208
209
# File 'lib/robot_lab/memory.rb', line 207

def session_id=(id)
  set_internal(:session_id, id)
end

#set(key, value) ⇒ Object

Set a value and notify subscribers asynchronously.

This is the primary write method for reactive memory. It stores the value, wakes any threads waiting for this key, and asynchronously notifies subscribers.

Examples:

Basic set

memory.set(:sentiment, { score: 0.8, confidence: 0.95 })

Set triggers notifications

memory.subscribe(:status) { |change| puts "Status: #{change.value}" }
memory.set(:status, "complete")  # Subscriber callback fires async

Parameters:

  • key (Symbol, String)

    the key to set

  • value (Object)

    the value to store

Returns:

  • (Object)

    the stored value



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/robot_lab/memory.rb', line 253

def set(key, value)
  key = key.to_sym
  old_value = nil

  # Store the value
  @mutex.synchronize do
    old_value = @backend[key]
    @backend[key] = value
  end

  # Wake any threads waiting for this key (synchronous - they need the value)
  wake_waiters(key, value)

  # Notify subscribers asynchronously
  notify_subscribers_async(key, value, old_value)

  value
end

#set_results(results) ⇒ self

Set results (used when loading from persistence)

Parameters:

Returns:

  • (self)


489
490
491
492
# File 'lib/robot_lab/memory.rb', line 489

def set_results(results)
  set_internal(:results, Array(results))
  self
end

#store_document(key, text) ⇒ self

Embed text and store it under key for later semantic search.

The embedding model (BAAI/bge-small-en-v1.5 via fastembed) is initialised lazily on the first call. The model file is downloaded once and cached.

Examples:

memory.store_document(:readme,    File.read("README.md"))
memory.store_document(:changelog, File.read("CHANGELOG.md"))

Parameters:

  • key (Symbol, String)

    identifier for the document

  • text (String)

    text to embed and store

Returns:

  • (self)


434
435
436
437
# File 'lib/robot_lab/memory.rb', line 434

def store_document(key, text)
  document_store.store(key, text)
  self
end

#subscribe(*keys) {|MemoryChange| ... } ⇒ Object

Subscribe to changes on one or more keys.

The callback is invoked asynchronously whenever a subscribed key changes. The callback receives a MemoryChange object with details about the change.

Examples:

Subscribe to a single key

memory.subscribe(:raw_data) do |change|
  puts "#{change.key} changed from #{change.previous} to #{change.value}"
  puts "Written by: #{change.writer}"
end

Subscribe to multiple keys

memory.subscribe(:sentiment, :entities) do |change|
  update_dashboard(change.key, change.value)
end

Parameters:

  • keys (Array<Symbol, String>)

    keys to subscribe to

Yields:

  • (MemoryChange)

    callback invoked when a subscribed key changes

Returns:

  • (Object)

    subscription identifier (for unsubscribe)

Raises:

  • (ArgumentError)


325
326
327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/robot_lab/memory.rb', line 325

def subscribe(*keys, &block)
  raise ArgumentError, "Block required for subscribe" unless block_given?

  keys = keys.flatten.map(&:to_sym)
  subscription_id = generate_subscription_id

  @subscription_mutex.synchronize do
    keys.each do |key|
      @subscriptions[key] << { id: subscription_id, callback: block }
    end
  end

  subscription_id
end

#subscribe_pattern(pattern) {|MemoryChange| ... } ⇒ Object

Subscribe to keys matching a pattern.

Pattern uses glob-style matching:

  • ‘*` matches any characters

  • ‘?` matches a single character

Examples:

Subscribe to namespace

memory.subscribe_pattern("analysis:*") do |change|
  puts "Analysis key #{change.key} updated"
end

Parameters:

  • pattern (String)

    glob pattern to match keys

Yields:

  • (MemoryChange)

    callback invoked when a matching key changes

Returns:

  • (Object)

    subscription identifier (for unsubscribe)

Raises:

  • (ArgumentError)


355
356
357
358
359
360
361
362
363
364
365
366
# File 'lib/robot_lab/memory.rb', line 355

def subscribe_pattern(pattern, &block)
  raise ArgumentError, "Block required for subscribe_pattern" unless block_given?

  subscription_id = generate_subscription_id
  regex = pattern_to_regex(pattern)

  @subscription_mutex.synchronize do
    @pattern_subscriptions << { id: subscription_id, pattern: regex, callback: block }
  end

  subscription_id
end

#subscribed?(key) ⇒ Boolean

Check if there are any subscribers for a key.

Parameters:

  • key (Symbol, String)

    the key to check

Returns:

  • (Boolean)


407
408
409
410
411
412
413
414
415
# File 'lib/robot_lab/memory.rb', line 407

def subscribed?(key)
  key = key.to_sym

  @subscription_mutex.synchronize do
    return true if @subscriptions[key].any?

    @pattern_subscriptions.any? { |s| s[:pattern].match?(key.to_s) }
  end
end

#to_hHash

Export memory to hash for serialization

Note: The cache is not serialized as it is recreated on initialization.

Returns:

  • (Hash)


634
635
636
637
638
639
640
641
642
# File 'lib/robot_lab/memory.rb', line 634

def to_h
  {
    data: data.to_h,
    results: results.map(&:export),
    messages: messages.map(&:to_h),
    session_id: session_id,
    custom: keys.to_h { |k| [k, self[k]] }
  }.compact
end

#to_jsonString

Convert to JSON

Parameters:

  • args (Array)

    arguments passed to to_json

Returns:

  • (String)


649
650
651
# File 'lib/robot_lab/memory.rb', line 649

def to_json(*)
  to_h.to_json(*)
end

#unsubscribe(subscription_id) ⇒ Boolean

Remove a subscription.

Parameters:

  • subscription_id (Object)

    the subscription identifier from subscribe

Returns:

  • (Boolean)

    true if subscription was found and removed



373
374
375
376
377
378
379
380
381
382
383
384
385
# File 'lib/robot_lab/memory.rb', line 373

def unsubscribe(subscription_id)
  removed = false

  @subscription_mutex.synchronize do
    @subscriptions.each_value do |subs|
      removed = true if subs.reject! { |s| s[:id] == subscription_id }
    end

    removed = true if @pattern_subscriptions.reject! { |s| s[:id] == subscription_id }
  end

  removed
end

#unsubscribe_keys(*keys) ⇒ self

Remove all subscriptions for specific keys.

Parameters:

  • keys (Array<Symbol, String>)

    keys to unsubscribe from

Returns:

  • (self)


392
393
394
395
396
397
398
399
400
# File 'lib/robot_lab/memory.rb', line 392

def unsubscribe_keys(*keys)
  keys = keys.flatten.map(&:to_sym)

  @subscription_mutex.synchronize do
    keys.each { |key| @subscriptions.delete(key) }
  end

  self
end