Class: RubyReactor::Storage::RedisAdapter

Inherits:
Adapter
  • Object
show all
Includes:
RedisLocking, RedisOrderedLocking
Defined in:
lib/ruby_reactor/storage/redis_adapter.rb

Constant Summary

Constants included from RedisOrderedLocking

RubyReactor::Storage::RedisOrderedLocking::ADVANCE_SCRIPT, RubyReactor::Storage::RedisOrderedLocking::ASSIGN_SCRIPT, RubyReactor::Storage::RedisOrderedLocking::CAN_PROCEED_SCRIPT, RubyReactor::Storage::RedisOrderedLocking::HEARTBEAT_SCRIPT, RubyReactor::Storage::RedisOrderedLocking::SKIP_SCRIPT

Constants included from RedisLocking

RubyReactor::Storage::RedisLocking::LOCK_ACQUIRE_SCRIPT, RubyReactor::Storage::RedisLocking::LOCK_EXTEND_SCRIPT, RubyReactor::Storage::RedisLocking::LOCK_RELEASE_SCRIPT, RubyReactor::Storage::RedisLocking::RATE_LIMIT_SCRIPT, RubyReactor::Storage::RedisLocking::SEMAPHORE_TTL, RubyReactor::Storage::RedisLocking::SEM_ACQUIRE_SCRIPT, RubyReactor::Storage::RedisLocking::SEM_RELEASE_SCRIPT

Instance Method Summary collapse

Methods included from RedisOrderedLocking

#ordered_lock_advance, #ordered_lock_assign, #ordered_lock_can_proceed, #ordered_lock_heartbeat, #ordered_lock_keys, #ordered_lock_peek, #ordered_lock_reset, #ordered_lock_skip

Methods included from RedisLocking

#lock_acquire, #lock_extend, #lock_info, #lock_release, #lock_ttl, #period_mark, #period_marker?, #period_seen?, #period_ttl, #rate_limit_check_and_increment, #rate_limit_count, #rate_limit_ttl, #semaphore_acquire, #semaphore_exists?, #semaphore_init, #semaphore_release, #semaphore_reset, #semaphore_state

Constructor Details

#initialize(redis_config) ⇒ RedisAdapter

Returns a new instance of RedisAdapter.



12
13
14
15
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 12

def initialize(redis_config)
  super()
  @redis = Redis.new(redis_config)
end

Instance Method Details

#count_map_results(map_id, reactor_class_name) ⇒ Object



285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 285

def count_map_results(map_id, reactor_class_name)
  key = map_results_key(map_id, reactor_class_name)
  type = @redis.type(key)

  if type == "hash"
    @redis.hlen(key)
  elsif type == "list"
    @redis.llen(key)
  else
    0
  end
end

#decrement_map_counter(map_id, reactor_class_name) ⇒ Object



93
94
95
96
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 93

def decrement_map_counter(map_id, reactor_class_name)
  key = map_counter_key(map_id, reactor_class_name)
  @redis.decr(key)
end

#delete_context(context_id, reactor_class_name) ⇒ Object



138
139
140
141
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 138

def delete_context(context_id, reactor_class_name)
  key = context_key(context_id, reactor_class_name)
  @redis.del(key)
end

#delete_correlation_id(correlation_id, reactor_class_name) ⇒ Object



133
134
135
136
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 133

def delete_correlation_id(correlation_id, reactor_class_name)
  key = correlation_id_key(correlation_id, reactor_class_name)
  @redis.del(key)
end

#determine_status(data) ⇒ Object



199
200
201
202
203
204
205
206
207
208
209
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 199

def determine_status(data)
  status = data["status"].to_s
  return status if status && %w[failed paused completed running skipped pending].include?(status)
  return "cancelled" if data["cancelled"]
  # Heuristic
  return "failed" if data["retry_count"]&.positive? && !data["current_step"].nil?
  return "running" if data["current_step"]
  return "completed" if execution_evidence?(data)

  "pending"
end

#execution_evidence?(data) ⇒ Boolean

Returns:

  • (Boolean)


211
212
213
214
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 211

def execution_evidence?(data)
  (data["execution_trace"] || []).any? ||
    (data["intermediate_results"] || {}).any?
end

#expire(key, seconds) ⇒ Object



151
152
153
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 151

def expire(key, seconds)
  @redis.expire(key, seconds)
end

#find_context_by_id(context_id) ⇒ Object



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 182

def find_context_by_id(context_id)
  # We don't know the reactor class, so we search for the ID
  pattern = "reactor:*:context:#{context_id}"
  keys = []
  @redis.scan_each(match: pattern, count: 1) do |key|
    keys << key
    break
  end
  return nil if keys.empty?

  key = keys.first
  json = @redis.get(key)
  return nil unless json

  JSON.parse(json)
end

#increment_last_queued_index(map_id, reactor_class_name) ⇒ Object



103
104
105
106
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 103

def increment_last_queued_index(map_id, reactor_class_name)
  key = map_last_queued_index_key(map_id, reactor_class_name)
  @redis.incr(key)
end

#increment_map_counter(map_id, reactor_class_name) ⇒ Object



87
88
89
90
91
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 87

def increment_map_counter(map_id, reactor_class_name)
  key = map_counter_key(map_id, reactor_class_name)
  @redis.incr(key)
  @redis.expire(key, 86_400)
end

#increment_map_offset(map_id, increment, reactor_class_name) ⇒ Object



258
259
260
261
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 258

def increment_map_offset(map_id, increment, reactor_class_name)
  key = map_offset_key(map_id, reactor_class_name)
  @redis.incrby(key, increment)
end

#initialize_map_operation(map_id, count, parent_reactor_class_name, reactor_class_info:, strict_ordering: true) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 64

def initialize_map_operation(map_id, count, parent_reactor_class_name, reactor_class_info:, strict_ordering: true)
  # Ensure counter is set
  set_map_counter(map_id, count, parent_reactor_class_name)

  # Store metadata
  key = "reactor:#{parent_reactor_class_name}:map:#{map_id}:metadata"
   = {
    count: count,
    strict_ordering: strict_ordering,
    reactor_class_info: reactor_class_info,
    created_at: Time.now.to_i
  }
  @redis.set(key, .to_json, ex: 86_400)
end

#publish(channel, message) ⇒ Object



147
148
149
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 147

def publish(channel, message)
  @redis.publish(channel, message)
end

#retrieve_context(context_id, reactor_class_name) ⇒ Object



23
24
25
26
27
28
29
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 23

def retrieve_context(context_id, reactor_class_name)
  key = context_key(context_id, reactor_class_name)
  json = @redis.get(key)
  return nil unless json

  JSON.parse(json)
end

#retrieve_context_id_by_correlation_id(correlation_id, reactor_class_name) ⇒ Object



128
129
130
131
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 128

def retrieve_context_id_by_correlation_id(correlation_id, reactor_class_name)
  key = correlation_id_key(correlation_id, reactor_class_name)
  @redis.get(key)
end

#retrieve_map_element_context_id(map_id, reactor_class_name, index: -1)) ⇒ Object



227
228
229
230
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 227

def retrieve_map_element_context_id(map_id, reactor_class_name, index: -1)
  key = map_element_contexts_key(map_id, reactor_class_name)
  @redis.lindex(key, index)
end

#retrieve_map_element_context_ids(map_id, reactor_class_name) ⇒ Object



222
223
224
225
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 222

def retrieve_map_element_context_ids(map_id, reactor_class_name)
  key = map_element_contexts_key(map_id, reactor_class_name)
  @redis.lrange(key, 0, -1)
end

#retrieve_map_failed_context_id(map_id, reactor_class_name) ⇒ Object



238
239
240
241
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 238

def retrieve_map_failed_context_id(map_id, reactor_class_name)
  key = map_failed_context_key(map_id, reactor_class_name)
  @redis.get(key)
end

#retrieve_map_metadata(map_id, reactor_class_name) ⇒ Object



79
80
81
82
83
84
85
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 79

def (map_id, reactor_class_name)
  key = "reactor:#{reactor_class_name}:map:#{map_id}:metadata"
  json = @redis.get(key)
  return nil unless json

  JSON.parse(json)
end

#retrieve_map_offset(map_id, reactor_class_name) ⇒ Object



253
254
255
256
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 253

def retrieve_map_offset(map_id, reactor_class_name)
  key = map_offset_key(map_id, reactor_class_name)
  @redis.get(key)
end

#retrieve_map_results(map_id, reactor_class_name, strict_ordering: true) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 46

def retrieve_map_results(map_id, reactor_class_name, strict_ordering: true)
  key = map_results_key(map_id, reactor_class_name)

  if strict_ordering
    results = @redis.hgetall(key)
    # Sort by index (key)
    results.keys.sort_by(&:to_i).map { |k| JSON.parse(results[k]) }
  else
    results = @redis.lrange(key, 0, -1)
    results.map { |r| JSON.parse(r) }
  end
end

#retrieve_map_results_batch(map_id, reactor_class_name, offset:, limit:, strict_ordering: true) ⇒ Object



263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 263

def retrieve_map_results_batch(map_id, reactor_class_name, offset:, limit:, strict_ordering: true)
  key = map_results_key(map_id, reactor_class_name)

  if strict_ordering
    # For Hash based results (indexed), we can use HMGET if we know the keys.
    # Since we use 0-based index keys, we can generate the keys for the batch.
    fields = (offset...(offset + limit)).map(&:to_s)
    results = @redis.hmget(key, *fields)

    # HMGET returns nil for missing fields, compact them?
    # Or should we respect the holes?
    # Map results are usually dense.
    results.compact.map { |r| JSON.parse(r) }
  else
    # For List based results
    # LRANGE uses inclusive ending index
    end_index = offset + limit - 1
    results = @redis.lrange(key, offset, end_index)
    results.map { |r| JSON.parse(r) }
  end
end

#scan_reactors(pattern: "reactor:*:context:*", count: 50) ⇒ Object

New methods for API



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 156

def scan_reactors(pattern: "reactor:*:context:*", count: 50)
  # Use SCAN to find keys matching the pattern
  results = []
  batch_keys = []

  # scan_each yields keys. We buffer them to use MGET efficiently.
  # We request a batch size from Redis (count: 100) to reduce roundtrips.
  @redis.scan_each(match: pattern, count: 100) do |key|
    batch_keys << key

    # specific batch size for MGET processing
    if batch_keys.size >= 50
      results.concat(fetch_and_filter_reactors(batch_keys))
      batch_keys = []

      # Stop if we have enough results
      return results.take(count) if results.size >= count
    end
  end

  # Process remaining keys
  results.concat(fetch_and_filter_reactors(batch_keys)) if batch_keys.any?

  results.take(count)
end

#set_last_queued_index(map_id, index, reactor_class_name) ⇒ Object



98
99
100
101
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 98

def set_last_queued_index(map_id, index, reactor_class_name)
  key = map_last_queued_index_key(map_id, reactor_class_name)
  @redis.set(key, index, ex: 86_400)
end

#set_map_counter(map_id, count, reactor_class_name) ⇒ Object



59
60
61
62
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 59

def set_map_counter(map_id, count, reactor_class_name)
  key = map_counter_key(map_id, reactor_class_name)
  @redis.set(key, count, ex: 86_400)
end

#set_map_offset(map_id, offset, reactor_class_name) ⇒ Object



243
244
245
246
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 243

def set_map_offset(map_id, offset, reactor_class_name)
  key = map_offset_key(map_id, reactor_class_name)
  @redis.set(key, offset, ex: 86_400)
end

#set_map_offset_if_not_exists(map_id, offset, reactor_class_name) ⇒ Object



248
249
250
251
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 248

def set_map_offset_if_not_exists(map_id, offset, reactor_class_name)
  key = map_offset_key(map_id, reactor_class_name)
  @redis.set(key, offset, nx: true, ex: 86_400)
end

#store_context(context_id, serialized_context, reactor_class_name) ⇒ Object



17
18
19
20
21
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 17

def store_context(context_id, serialized_context, reactor_class_name)
  key = context_key(context_id, reactor_class_name)
  # Use standard SET for compatibility (ReJSON not strictly required for full docs)
  @redis.set(key, serialized_context, ex: 86_400) # 24h TTL
end

#store_correlation_id(correlation_id, context_id, reactor_class_name) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 108

def store_correlation_id(correlation_id, context_id, reactor_class_name)
  key = correlation_id_key(correlation_id, reactor_class_name)
  # Store mapping correlation_id -> context_id
  # Try to set if not exists
  success = @redis.set(key, context_id, nx: true, ex: 86_400) # 24h TTL

  return if success

  # If it exists, check if it's the same context_id
  existing_context_id = @redis.get(key)

  if existing_context_id == context_id
    # Refresh TTL
    @redis.expire(key, 86_400)
    return
  end

  raise Error::ValidationError, "Correlation ID '#{correlation_id}' already exists"
end

#store_map_element_context_id(map_id, context_id, reactor_class_name) ⇒ Object



216
217
218
219
220
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 216

def store_map_element_context_id(map_id, context_id, reactor_class_name)
  key = map_element_contexts_key(map_id, reactor_class_name)
  @redis.rpush(key, context_id)
  @redis.expire(key, 86_400)
end

#store_map_failed_context_id(map_id, context_id, reactor_class_name) ⇒ Object



232
233
234
235
236
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 232

def store_map_failed_context_id(map_id, context_id, reactor_class_name)
  key = map_failed_context_key(map_id, reactor_class_name)
  # Only store the first failure (nx: true)
  @redis.set(key, context_id, nx: true, ex: 86_400)
end

#store_map_result(map_id, index, serialized_result, reactor_class_name, strict_ordering: true) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 31

def store_map_result(map_id, index, serialized_result, reactor_class_name, strict_ordering: true)
  key = map_results_key(map_id, reactor_class_name)

  if strict_ordering
    # Use Hash for strict ordering by index
    # HSET key index serialized_result
    @redis.hset(key, index.to_s, serialized_result.to_json)
  else
    # Loose ordering: just push to list
    @redis.rpush(key, serialized_result.to_json)
  end

  @redis.expire(key, 86_400)
end

#subscribe(channel, &block) ⇒ Object



143
144
145
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 143

def subscribe(channel, &block)
  @redis.subscribe(channel, &block)
end