Class: RobotLab::Memory

Inherits:
Object
  • Object
show all
Includes:
Utils
Defined in:
lib/robot_lab/memory.rb

Overview

Unified memory system for Robot and Network execution

Memory is a reactive key-value store backed by Redis (if available) or an internal Hash object. It provides persistent storage for runtime data, conversation history, and arbitrary user-defined values.

Reactive Features

Memory supports pub/sub semantics where robots can subscribe to key changes and optionally block until values become available:

  • ‘set(key, value)` - Write a value and notify subscribers asynchronously

  • ‘get(key, wait: true)` - Read a value, blocking until it exists if needed

  • ‘subscribe(*keys)` - Register a callback for key changes

Reserved keys with special accessors:

  • :data - runtime data (StateProxy for method-style access)

  • :results - accumulated robot results

  • :messages - conversation history

  • :session_id - conversation session identifier for history persistence

  • :cache - semantic cache instance (RubyLLM::SemanticCache)

Examples:

Basic usage

memory = Memory.new
memory.set(:user_id, 123)
memory.get(:user_id)  # => 123

Blocking read

# In robot A (writer)
memory.set(:sentiment, { score: 0.8 })

# In robot B (reader, may run concurrently)
result = memory.get(:sentiment, wait: true)   # Blocks until available
result = memory.get(:sentiment, wait: 30)     # Blocks up to 30 seconds

Multiple keys

results = memory.get(:sentiment, :entities, :keywords, wait: 60)
# => { sentiment: {...}, entities: [...], keywords: [...] }

Subscriptions (async callbacks)

memory.subscribe(:raw_data) do |change|
  puts "#{change.key} changed by #{change.writer}"
  enriched = enrich(change.value)
  memory.set(:enriched, enriched)
end

Using reserved keys

memory.data[:category] = "billing"
memory.data.category  # => "billing"
memory.results  # => []
memory.cache  # => RubyLLM::SemanticCache instance

Constant Summary collapse

RESERVED_KEYS =

Reserved keys that have special behavior

%i[data results messages session_id cache].freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(data: {}, results: [], messages: [], session_id: nil, backend: :auto, enable_cache: true, network_name: nil) ⇒ Memory

Creates a new Memory instance.

Examples:

Basic memory with caching enabled

Memory.new(data: { category: nil, resolved: false })

Memory with caching disabled

Memory.new(enable_cache: false)

Network-owned memory

Memory.new(network_name: "support_pipeline")

Parameters:

  • data (Hash) (defaults to: {})

    initial runtime data

  • results (Array<RobotResult>) (defaults to: [])

    pre-loaded robot results

  • messages (Array<Message, Hash>) (defaults to: [])

    pre-loaded conversation messages

  • session_id (String, nil) (defaults to: nil)

    conversation session identifier

  • backend (Symbol) (defaults to: :auto)

    storage backend (:auto, :redis, :hash)

  • enable_cache (Boolean) (defaults to: true)

    whether to enable semantic caching (default: true)

  • network_name (String, nil) (defaults to: nil)

    the network this memory belongs to



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/robot_lab/memory.rb', line 92

def initialize(data: {}, results: [], messages: [], session_id: nil, backend: :auto, enable_cache: true, network_name: nil)
  @backend = select_backend(backend)
  @mutex = Mutex.new
  @enable_cache = enable_cache
  @network_name = network_name
  @current_writer = nil

  # Initialize reserved keys
  set_internal(:data, data.is_a?(Hash) ? data.transform_keys(&:to_sym) : data)
  set_internal(:results, Array(results))
  set_internal(:messages, Array(messages).map { |m| normalize_message(m) })
  set_internal(:session_id, session_id)
  set_internal(:cache, @enable_cache ? RubyLLM::SemanticCache : nil)

  # Data proxy for method-style access
  @data_proxy = nil

  # Reactive infrastructure
  @subscriptions = Hash.new { |h, k| h[k] = [] }
  @pattern_subscriptions = []
  @waiters = Hash.new { |h, k| h[k] = [] }
  @subscription_mutex = Mutex.new
  @waiter_mutex = Mutex.new

  # Notification coalescing — batches multiple key changes into a single
  # drainer fiber rather than spawning one Async task per callback per change.
  @notification_queue       = []
  @notification_queue_mutex = Mutex.new
  @drainer_scheduled        = false
end

Instance Attribute Details

#current_writerString?

Returns the name of the robot currently writing.

Returns:

  • (String, nil)

    the name of the robot currently writing



71
# File 'lib/robot_lab/memory.rb', line 71

attr_reader :network_name

#network_nameObject (readonly)

Returns the value of attribute network_name.



71
72
73
# File 'lib/robot_lab/memory.rb', line 71

def network_name
  @network_name
end

Class Method Details

.from_hash(hash) ⇒ Memory

Reconstruct memory from hash

A new semantic cache instance is created automatically.

Parameters:

  • hash (Hash)

Returns:



664
665
666
667
668
669
670
671
672
673
674
675
676
677
# File 'lib/robot_lab/memory.rb', line 664

def self.from_hash(hash)
  hash = hash.transform_keys(&:to_sym)
  memory = new(
    data: hash[:data] || {},
    results: (hash[:results] || []).map { |r| RobotResult.from_hash(r) },
    messages: (hash[:messages] || []).map { |m| Message.from_hash(m) },
    session_id: hash[:session_id]
  )

  # Restore custom keys
  (hash[:custom] || {}).each { |k, v| memory[k] = v }

  memory
end

Instance Method Details

#[](key) ⇒ Object

Get value by key

Parameters:

  • key (Symbol, String)

    the key to retrieve

Returns:

  • (Object)

    the stored value



128
129
130
131
132
133
# File 'lib/robot_lab/memory.rb', line 128

def [](key)
  key = key.to_sym
  return send(key) if RESERVED_KEYS.include?(key) && key != :cache

  get_internal(key)
end

#[]=(key, value) ⇒ Object

Set value by key

For non-reserved keys, this delegates to #set which provides reactive notifications. For reserved keys, it bypasses notifications.

Parameters:

  • key (Symbol, String)

    the key to set

  • value (Object)

    the value to store

Returns:

  • (Object)

    the stored value

See Also:



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/robot_lab/memory.rb', line 146

def []=(key, value)
  key = key.to_sym

  # Reserved keys have special handling (no notifications)
  case key
  when :data
    @data_proxy = nil  # Reset proxy
    set_internal(:data, value.is_a?(Hash) ? value.transform_keys(&:to_sym) : value)
  when :results
    set_internal(:results, Array(value))
  when :messages
    set_internal(:messages, Array(value).map { |m| normalize_message(m) })
  when :session_id
    set_internal(:session_id, value)
  when :cache
    # Cache is read-only after initialization
    raise ArgumentError, "Cannot reassign cache - it is initialized automatically"
  else
    # Non-reserved keys use reactive set
    set(key, value)
  end

  value
end

#all_keysArray<Symbol>

Get all keys including reserved

Returns:

  • (Array<Symbol>)


547
548
549
550
551
# File 'lib/robot_lab/memory.rb', line 547

def all_keys
  @mutex.synchronize do
    @backend.keys.map(&:to_sym)
  end
end

#append_result(result) ⇒ self

Append a robot result to history

Parameters:

Returns:

  • (self)


479
480
481
482
483
484
485
486
# File 'lib/robot_lab/memory.rb', line 479

def append_result(result)
  @mutex.synchronize do
    results_array = @backend[:results] || []
    results_array << result
    @backend[:results] = results_array
  end
  self
end

#cacheRubyLLM::SemanticCache

Get the semantic cache module

The cache is always active and provides semantic similarity matching for LLM responses, reducing costs and latency by returning cached responses for semantically equivalent queries.

Examples:

Using the cache with fetch

response = memory.cache.fetch("What is Ruby?") do
  RubyLLM.chat.ask("What is Ruby?")
end

Wrapping a chat instance

chat = memory.cache.wrap(RubyLLM.chat(model: "gpt-4"))
chat.ask("What is Ruby?")  # Cached on semantic similarity

Returns:

  • (RubyLLM::SemanticCache)

    the semantic cache module



230
231
232
# File 'lib/robot_lab/memory.rb', line 230

def cache
  get_internal(:cache)
end

#clearself

Clear all non-reserved keys

Returns:

  • (self)


571
572
573
574
575
576
577
# File 'lib/robot_lab/memory.rb', line 571

def clear
  @mutex.synchronize do
    keys_to_delete = @backend.keys.map(&:to_sym) - RESERVED_KEYS
    keys_to_delete.each { |k| @backend.delete(k) }
  end
  self
end

#cloneMemory Also known as: dup

Clone memory for isolated execution

The semantic cache setting and network name are preserved in clones. Subscriptions are NOT cloned - the new memory starts with fresh subscriptions.

Returns:



616
617
618
619
620
621
622
623
624
625
626
627
628
629
# File 'lib/robot_lab/memory.rb', line 616

def clone
  cloned = Memory.new(
    data: deep_dup(data.to_h),
    results: results,
    messages: messages,
    session_id: session_id,
    backend: @backend.is_a?(Hash) ? :hash : :auto,
    enable_cache: @enable_cache,
    network_name: @network_name
  )
  # Copy non-reserved keys (without triggering notifications)
  keys.each { |k| cloned.send(:set_internal, k, deep_dup(get_internal(k))) }
  cloned
end

#dataStateProxy

Access runtime data through StateProxy

Returns:

  • (StateProxy)

    proxy for method-style data access



175
176
177
# File 'lib/robot_lab/memory.rb', line 175

def data
  @data_proxy ||= StateProxy.new(get_internal(:data) || {})
end

#delete(key) ⇒ Object

Delete a key

Parameters:

  • key (Symbol, String)

Returns:

  • (Object)

    the deleted value

Raises:

  • (ArgumentError)


558
559
560
561
562
563
564
565
# File 'lib/robot_lab/memory.rb', line 558

def delete(key)
  key = key.to_sym
  raise ArgumentError, "Cannot delete reserved key: #{key}" if RESERVED_KEYS.include?(key)

  @mutex.synchronize do
    @backend.delete(key)
  end
end

#delete_document(key) ⇒ self

Remove a document from the store.

Parameters:

  • key (Symbol, String)

Returns:

  • (self)


469
470
471
472
# File 'lib/robot_lab/memory.rb', line 469

def delete_document(key)
  @document_store&.delete(key)
  self
end

#document_keysArray<Symbol>

Keys of all documents stored in the embedded document store.

Returns:

  • (Array<Symbol>)


461
462
463
# File 'lib/robot_lab/memory.rb', line 461

def document_keys
  @document_store&.keys || []
end

#format_history(formatter: nil) ⇒ Array<Message>

Format history for robot prompts

Combines pre-loaded messages with formatted results.

Parameters:

  • formatter (Proc, nil) (defaults to: nil)

    custom result formatter

Returns:



604
605
606
607
# File 'lib/robot_lab/memory.rb', line 604

def format_history(formatter: nil)
  formatter ||= default_formatter
  messages + results.flat_map { |r| formatter.call(r) }
end

#get(*keys, wait: false) ⇒ Object, Hash

Get one or more values, optionally waiting until they exist.

Examples:

Immediate read

memory.get(:sentiment)  # => value or nil

Blocking read

memory.get(:sentiment, wait: true)  # Blocks until available

Blocking with timeout

memory.get(:sentiment, wait: 30)  # Blocks up to 30 seconds

Multiple keys

memory.get(:sentiment, :entities, :keywords, wait: 60)
# => { sentiment: {...}, entities: [...], keywords: [...] }

Parameters:

  • keys (Array<Symbol, String>)

    one or more keys to retrieve

  • wait (Boolean, Numeric) (defaults to: false)

    wait behavior:

    • ‘false` (default): return immediately, nil if missing

    • ‘true`: block indefinitely until value(s) exist

    • ‘Numeric`: block up to that many seconds, raise AwaitTimeout if exceeded

Returns:

  • (Object, Hash)

    single value for one key, hash for multiple keys

Raises:

  • (AwaitTimeout)

    if timeout expires before value is available



297
298
299
300
301
302
303
304
305
# File 'lib/robot_lab/memory.rb', line 297

def get(*keys, wait: false)
  keys = keys.flatten.map(&:to_sym)

  if keys.one?
    get_single(keys.first, wait: wait)
  else
    get_multiple(keys, wait: wait)
  end
end

#key?(key) ⇒ Boolean Also known as: has_key?, include?

Check if key exists

Parameters:

  • key (Symbol, String)

Returns:

  • (Boolean)


522
523
524
525
526
527
528
529
# File 'lib/robot_lab/memory.rb', line 522

def key?(key)
  key = key.to_sym
  return true if RESERVED_KEYS.include?(key)

  @mutex.synchronize do
    @backend.key?(key)
  end
end

#keysArray<Symbol>

Get all keys (excluding reserved keys)

Returns:

  • (Array<Symbol>)


537
538
539
540
541
# File 'lib/robot_lab/memory.rb', line 537

def keys
  @mutex.synchronize do
    @backend.keys.map(&:to_sym) - RESERVED_KEYS
  end
end

#merge!(values) ⇒ self

Merge additional values into memory

Parameters:

  • values (Hash)

    key-value pairs to merge

Returns:

  • (self)


512
513
514
515
# File 'lib/robot_lab/memory.rb', line 512

def merge!(values)
  values.each { |k, v| self[k] = v }
  self
end

#messagesArray<Message>

Get copy of messages (immutable access)

Returns:



191
192
193
# File 'lib/robot_lab/memory.rb', line 191

def messages
  (get_internal(:messages) || []).dup
end

#redis?Boolean

Check if using Redis backend

Returns:

  • (Boolean)


683
684
685
# File 'lib/robot_lab/memory.rb', line 683

def redis?
  @backend.is_a?(RedisBackend)
end

#resetself

Reset memory to initial state

Returns:

  • (self)


583
584
585
586
587
588
589
590
591
592
593
594
595
# File 'lib/robot_lab/memory.rb', line 583

def reset
  cached = get_internal(:cache)  # Preserve cache instance
  @mutex.synchronize do
    @backend.clear
    @backend[:data] = {}
    @backend[:results] = []
    @backend[:messages] = []
    @backend[:session_id] = nil
    @backend[:cache] = cached  # Restore cache instance
  end
  @data_proxy = nil
  self
end

#resultsArray<RobotResult>

Get copy of results (immutable access)

Returns:



183
184
185
# File 'lib/robot_lab/memory.rb', line 183

def results
  (get_internal(:results) || []).dup
end

#results_from(start_index) ⇒ Array<RobotResult>

Get results from a specific index (for incremental save)

Parameters:

  • start_index (Integer)

Returns:



503
504
505
# File 'lib/robot_lab/memory.rb', line 503

def results_from(start_index)
  (get_internal(:results) || [])[start_index..] || []
end

#search_documents(query, limit: 5) ⇒ Array<Hash>

Search stored documents for the ones most semantically similar to query.

Examples:

hits = memory.search_documents("how to configure redis", limit: 3)
hits.each { |h| puts "#{h[:key]} (#{h[:score].round(3)}): #{h[:text][0..80]}" }

Parameters:

  • query (String)

    natural-language query

  • limit (Integer) (defaults to: 5)

    maximum number of results to return (default 5)

Returns:

  • (Array<Hash>)

    results sorted by score descending; each hash has :key, :text, and :score (Float 0.0..1.0)



452
453
454
455
456
# File 'lib/robot_lab/memory.rb', line 452

def search_documents(query, limit: 5)
  return [] unless @document_store

  @document_store.search(query, limit: limit)
end

#session_idString?

Get session identifier

Returns:

  • (String, nil)


199
200
201
# File 'lib/robot_lab/memory.rb', line 199

def session_id
  get_internal(:session_id)
end

#session_id=(id) ⇒ self

Set session identifier

Parameters:

  • id (String, nil)

Returns:

  • (self)


208
209
210
211
# File 'lib/robot_lab/memory.rb', line 208

def session_id=(id)
  set_internal(:session_id, id)
  self
end

#set(key, value) ⇒ Object

Set a value and notify subscribers asynchronously.

This is the primary write method for reactive memory. It stores the value, wakes any threads waiting for this key, and asynchronously notifies subscribers.

Examples:

Basic set

memory.set(:sentiment, { score: 0.8, confidence: 0.95 })

Set triggers notifications

memory.subscribe(:status) { |change| puts "Status: #{change.value}" }
memory.set(:status, "complete")  # Subscriber callback fires async

Parameters:

  • key (Symbol, String)

    the key to set

  • value (Object)

    the value to store

Returns:

  • (Object)

    the stored value



255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/robot_lab/memory.rb', line 255

def set(key, value)
  key = key.to_sym
  old_value = nil

  # Store the value
  @mutex.synchronize do
    old_value = @backend[key]
    @backend[key] = value
  end

  # Wake any threads waiting for this key (synchronous - they need the value)
  wake_waiters(key, value)

  # Notify subscribers asynchronously
  notify_subscribers_async(key, value, old_value)

  value
end

#set_results(results) ⇒ self

Set results (used when loading from persistence)

Parameters:

Returns:

  • (self)


493
494
495
496
# File 'lib/robot_lab/memory.rb', line 493

def set_results(results)
  set_internal(:results, Array(results))
  self
end

#store_document(key, text) ⇒ self

Embed text and store it under key for later semantic search.

The embedding model (BAAI/bge-small-en-v1.5 via fastembed) is initialised lazily on the first call. The model file is downloaded once and cached.

Examples:

memory.store_document(:readme,    File.read("README.md"))
memory.store_document(:changelog, File.read("CHANGELOG.md"))

Parameters:

  • key (Symbol, String)

    identifier for the document

  • text (String)

    text to embed and store

Returns:

  • (self)


436
437
438
439
# File 'lib/robot_lab/memory.rb', line 436

def store_document(key, text)
  document_store.store(key, text)
  self
end

#subscribe(*keys) {|MemoryChange| ... } ⇒ Object

Subscribe to changes on one or more keys.

The callback is invoked asynchronously whenever a subscribed key changes. The callback receives a MemoryChange object with details about the change.

Examples:

Subscribe to a single key

memory.subscribe(:raw_data) do |change|
  puts "#{change.key} changed from #{change.previous} to #{change.value}"
  puts "Written by: #{change.writer}"
end

Subscribe to multiple keys

memory.subscribe(:sentiment, :entities) do |change|
  update_dashboard(change.key, change.value)
end

Parameters:

  • keys (Array<Symbol, String>)

    keys to subscribe to

Yields:

  • (MemoryChange)

    callback invoked when a subscribed key changes

Returns:

  • (Object)

    subscription identifier (for unsubscribe)

Raises:

  • (ArgumentError)


327
328
329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/robot_lab/memory.rb', line 327

def subscribe(*keys, &block)
  raise ArgumentError, "Block required for subscribe" unless block_given?

  keys = keys.flatten.map(&:to_sym)
  subscription_id = generate_subscription_id

  @subscription_mutex.synchronize do
    keys.each do |key|
      @subscriptions[key] << { id: subscription_id, callback: block }
    end
  end

  subscription_id
end

#subscribe_pattern(pattern) {|MemoryChange| ... } ⇒ Object

Subscribe to keys matching a pattern.

Pattern uses glob-style matching:

  • ‘*` matches any characters

  • ‘?` matches a single character

Examples:

Subscribe to namespace

memory.subscribe_pattern("analysis:*") do |change|
  puts "Analysis key #{change.key} updated"
end

Parameters:

  • pattern (String)

    glob pattern to match keys

Yields:

  • (MemoryChange)

    callback invoked when a matching key changes

Returns:

  • (Object)

    subscription identifier (for unsubscribe)

Raises:

  • (ArgumentError)


357
358
359
360
361
362
363
364
365
366
367
368
# File 'lib/robot_lab/memory.rb', line 357

def subscribe_pattern(pattern, &block)
  raise ArgumentError, "Block required for subscribe_pattern" unless block_given?

  subscription_id = generate_subscription_id
  regex = pattern_to_regex(pattern)

  @subscription_mutex.synchronize do
    @pattern_subscriptions << { id: subscription_id, pattern: regex, callback: block }
  end

  subscription_id
end

#subscribed?(key) ⇒ Boolean

Check if there are any subscribers for a key.

Parameters:

  • key (Symbol, String)

    the key to check

Returns:

  • (Boolean)


409
410
411
412
413
414
415
416
417
# File 'lib/robot_lab/memory.rb', line 409

def subscribed?(key)
  key = key.to_sym

  @subscription_mutex.synchronize do
    return true if @subscriptions[key].any?

    @pattern_subscriptions.any? { |s| s[:pattern].match?(key.to_s) }
  end
end

#to_hHash

Export memory to hash for serialization

Note: The cache is not serialized as it is recreated on initialization.

Returns:

  • (Hash)


638
639
640
641
642
643
644
645
646
# File 'lib/robot_lab/memory.rb', line 638

def to_h
  {
    data: data.to_h,
    results: results.map(&:export),
    messages: messages.map(&:to_h),
    session_id: session_id,
    custom: keys.each_with_object({}) { |k, h| h[k] = self[k] }
  }.compact
end

#to_json(*args) ⇒ String

Convert to JSON

Parameters:

  • args (Array)

    arguments passed to to_json

Returns:

  • (String)


653
654
655
# File 'lib/robot_lab/memory.rb', line 653

def to_json(*args)
  to_h.to_json(*args)
end

#unsubscribe(subscription_id) ⇒ Boolean

Remove a subscription.

Parameters:

  • subscription_id (Object)

    the subscription identifier from subscribe

Returns:

  • (Boolean)

    true if subscription was found and removed



375
376
377
378
379
380
381
382
383
384
385
386
387
# File 'lib/robot_lab/memory.rb', line 375

def unsubscribe(subscription_id)
  removed = false

  @subscription_mutex.synchronize do
    @subscriptions.each_value do |subs|
      removed = true if subs.reject! { |s| s[:id] == subscription_id }
    end

    removed = true if @pattern_subscriptions.reject! { |s| s[:id] == subscription_id }
  end

  removed
end

#unsubscribe_keys(*keys) ⇒ self

Remove all subscriptions for specific keys.

Parameters:

  • keys (Array<Symbol, String>)

    keys to unsubscribe from

Returns:

  • (self)


394
395
396
397
398
399
400
401
402
# File 'lib/robot_lab/memory.rb', line 394

def unsubscribe_keys(*keys)
  keys = keys.flatten.map(&:to_sym)

  @subscription_mutex.synchronize do
    keys.each { |key| @subscriptions.delete(key) }
  end

  self
end