Class: RubyReactor::Storage::RedisAdapter
Constant Summary
RubyReactor::Storage::RedisOrderedLocking::ADVANCE_SCRIPT, RubyReactor::Storage::RedisOrderedLocking::ASSIGN_SCRIPT, RubyReactor::Storage::RedisOrderedLocking::CAN_PROCEED_SCRIPT, RubyReactor::Storage::RedisOrderedLocking::HEARTBEAT_SCRIPT, RubyReactor::Storage::RedisOrderedLocking::SKIP_SCRIPT
RubyReactor::Storage::RedisLocking::LOCK_ACQUIRE_SCRIPT, RubyReactor::Storage::RedisLocking::LOCK_EXTEND_SCRIPT, RubyReactor::Storage::RedisLocking::LOCK_RELEASE_SCRIPT, RubyReactor::Storage::RedisLocking::RATE_LIMIT_SCRIPT, RubyReactor::Storage::RedisLocking::SEMAPHORE_TTL, RubyReactor::Storage::RedisLocking::SEM_ACQUIRE_SCRIPT, RubyReactor::Storage::RedisLocking::SEM_RELEASE_SCRIPT
Instance Method Summary
collapse
-
#count_map_results(map_id, reactor_class_name) ⇒ Object
-
#decrement_map_counter(map_id, reactor_class_name) ⇒ Object
-
#delete_context(context_id, reactor_class_name) ⇒ Object
-
#delete_correlation_id(correlation_id, reactor_class_name) ⇒ Object
-
#determine_status(data) ⇒ Object
-
#execution_evidence?(data) ⇒ Boolean
-
#expire(key, seconds) ⇒ Object
-
#find_context_by_id(context_id) ⇒ Object
-
#increment_last_queued_index(map_id, reactor_class_name) ⇒ Object
-
#increment_map_counter(map_id, reactor_class_name) ⇒ Object
-
#increment_map_offset(map_id, increment, reactor_class_name) ⇒ Object
-
#initialize(redis_config) ⇒ RedisAdapter
constructor
A new instance of RedisAdapter.
-
#initialize_map_operation(map_id, count, parent_reactor_class_name, reactor_class_info:, strict_ordering: true) ⇒ Object
-
#publish(channel, message) ⇒ Object
-
#retrieve_context(context_id, reactor_class_name) ⇒ Object
-
#retrieve_context_id_by_correlation_id(correlation_id, reactor_class_name) ⇒ Object
-
#retrieve_map_element_context_id(map_id, reactor_class_name, index: -1)) ⇒ Object
-
#retrieve_map_element_context_ids(map_id, reactor_class_name) ⇒ Object
-
#retrieve_map_failed_context_id(map_id, reactor_class_name) ⇒ Object
-
#retrieve_map_metadata(map_id, reactor_class_name) ⇒ Object
-
#retrieve_map_offset(map_id, reactor_class_name) ⇒ Object
-
#retrieve_map_results(map_id, reactor_class_name, strict_ordering: true) ⇒ Object
-
#retrieve_map_results_batch(map_id, reactor_class_name, offset:, limit:, strict_ordering: true) ⇒ Object
-
#scan_reactors(pattern: "reactor:*:context:*", count: 50) ⇒ Object
-
#set_last_queued_index(map_id, index, reactor_class_name) ⇒ Object
-
#set_map_counter(map_id, count, reactor_class_name) ⇒ Object
-
#set_map_offset(map_id, offset, reactor_class_name) ⇒ Object
-
#set_map_offset_if_not_exists(map_id, offset, reactor_class_name) ⇒ Object
-
#store_context(context_id, serialized_context, reactor_class_name) ⇒ Object
-
#store_correlation_id(correlation_id, context_id, reactor_class_name) ⇒ Object
-
#store_map_element_context_id(map_id, context_id, reactor_class_name) ⇒ Object
-
#store_map_failed_context_id(map_id, context_id, reactor_class_name) ⇒ Object
-
#store_map_result(map_id, index, serialized_result, reactor_class_name, strict_ordering: true) ⇒ Object
-
#subscribe(channel, &block) ⇒ Object
#ordered_lock_advance, #ordered_lock_assign, #ordered_lock_can_proceed, #ordered_lock_heartbeat, #ordered_lock_keys, #ordered_lock_peek, #ordered_lock_reset, #ordered_lock_skip
#lock_acquire, #lock_extend, #lock_info, #lock_release, #lock_ttl, #period_mark, #period_marker?, #period_seen?, #period_ttl, #rate_limit_check_and_increment, #rate_limit_count, #rate_limit_ttl, #semaphore_acquire, #semaphore_exists?, #semaphore_init, #semaphore_release, #semaphore_reset, #semaphore_state
Constructor Details
#initialize(redis_config) ⇒ RedisAdapter
Returns a new instance of RedisAdapter.
12
13
14
15
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 12
def initialize(redis_config)
super()
@redis = Redis.new(redis_config)
end
|
Instance Method Details
#count_map_results(map_id, reactor_class_name) ⇒ Object
285
286
287
288
289
290
291
292
293
294
295
296
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 285
def count_map_results(map_id, reactor_class_name)
key = map_results_key(map_id, reactor_class_name)
type = @redis.type(key)
if type == "hash"
@redis.hlen(key)
elsif type == "list"
@redis.llen(key)
else
0
end
end
|
#decrement_map_counter(map_id, reactor_class_name) ⇒ Object
93
94
95
96
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 93
def decrement_map_counter(map_id, reactor_class_name)
key = map_counter_key(map_id, reactor_class_name)
@redis.decr(key)
end
|
#delete_context(context_id, reactor_class_name) ⇒ Object
138
139
140
141
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 138
def delete_context(context_id, reactor_class_name)
key = context_key(context_id, reactor_class_name)
@redis.del(key)
end
|
#delete_correlation_id(correlation_id, reactor_class_name) ⇒ Object
133
134
135
136
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 133
def delete_correlation_id(correlation_id, reactor_class_name)
key = correlation_id_key(correlation_id, reactor_class_name)
@redis.del(key)
end
|
#determine_status(data) ⇒ Object
199
200
201
202
203
204
205
206
207
208
209
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 199
def determine_status(data)
status = data["status"].to_s
return status if status && %w[failed paused completed running skipped pending].include?(status)
return "cancelled" if data["cancelled"]
return "failed" if data["retry_count"]&.positive? && !data["current_step"].nil?
return "running" if data["current_step"]
return "completed" if execution_evidence?(data)
"pending"
end
|
#execution_evidence?(data) ⇒ Boolean
211
212
213
214
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 211
def execution_evidence?(data)
(data["execution_trace"] || []).any? ||
(data["intermediate_results"] || {}).any?
end
|
#expire(key, seconds) ⇒ Object
151
152
153
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 151
def expire(key, seconds)
@redis.expire(key, seconds)
end
|
#find_context_by_id(context_id) ⇒ Object
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 182
def find_context_by_id(context_id)
pattern = "reactor:*:context:#{context_id}"
keys = []
@redis.scan_each(match: pattern, count: 1) do |key|
keys << key
break
end
return nil if keys.empty?
key = keys.first
json = @redis.get(key)
return nil unless json
JSON.parse(json)
end
|
#increment_last_queued_index(map_id, reactor_class_name) ⇒ Object
103
104
105
106
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 103
def increment_last_queued_index(map_id, reactor_class_name)
key = map_last_queued_index_key(map_id, reactor_class_name)
@redis.incr(key)
end
|
#increment_map_counter(map_id, reactor_class_name) ⇒ Object
87
88
89
90
91
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 87
def increment_map_counter(map_id, reactor_class_name)
key = map_counter_key(map_id, reactor_class_name)
@redis.incr(key)
@redis.expire(key, 86_400)
end
|
#increment_map_offset(map_id, increment, reactor_class_name) ⇒ Object
258
259
260
261
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 258
def increment_map_offset(map_id, increment, reactor_class_name)
key = map_offset_key(map_id, reactor_class_name)
@redis.incrby(key, increment)
end
|
#initialize_map_operation(map_id, count, parent_reactor_class_name, reactor_class_info:, strict_ordering: true) ⇒ Object
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 64
def initialize_map_operation(map_id, count, parent_reactor_class_name, reactor_class_info:, strict_ordering: true)
set_map_counter(map_id, count, parent_reactor_class_name)
key = "reactor:#{parent_reactor_class_name}:map:#{map_id}:metadata"
metadata = {
count: count,
strict_ordering: strict_ordering,
reactor_class_info: reactor_class_info,
created_at: Time.now.to_i
}
@redis.set(key, metadata.to_json, ex: 86_400)
end
|
#publish(channel, message) ⇒ Object
147
148
149
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 147
def publish(channel, message)
@redis.publish(channel, message)
end
|
#retrieve_context(context_id, reactor_class_name) ⇒ Object
23
24
25
26
27
28
29
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 23
def retrieve_context(context_id, reactor_class_name)
key = context_key(context_id, reactor_class_name)
json = @redis.get(key)
return nil unless json
JSON.parse(json)
end
|
#retrieve_context_id_by_correlation_id(correlation_id, reactor_class_name) ⇒ Object
128
129
130
131
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 128
def retrieve_context_id_by_correlation_id(correlation_id, reactor_class_name)
key = correlation_id_key(correlation_id, reactor_class_name)
@redis.get(key)
end
|
#retrieve_map_element_context_id(map_id, reactor_class_name, index: -1)) ⇒ Object
227
228
229
230
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 227
def retrieve_map_element_context_id(map_id, reactor_class_name, index: -1)
key = map_element_contexts_key(map_id, reactor_class_name)
@redis.lindex(key, index)
end
|
#retrieve_map_element_context_ids(map_id, reactor_class_name) ⇒ Object
222
223
224
225
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 222
def retrieve_map_element_context_ids(map_id, reactor_class_name)
key = map_element_contexts_key(map_id, reactor_class_name)
@redis.lrange(key, 0, -1)
end
|
#retrieve_map_failed_context_id(map_id, reactor_class_name) ⇒ Object
238
239
240
241
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 238
def retrieve_map_failed_context_id(map_id, reactor_class_name)
key = map_failed_context_key(map_id, reactor_class_name)
@redis.get(key)
end
|
79
80
81
82
83
84
85
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 79
def retrieve_map_metadata(map_id, reactor_class_name)
key = "reactor:#{reactor_class_name}:map:#{map_id}:metadata"
json = @redis.get(key)
return nil unless json
JSON.parse(json)
end
|
#retrieve_map_offset(map_id, reactor_class_name) ⇒ Object
253
254
255
256
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 253
def retrieve_map_offset(map_id, reactor_class_name)
key = map_offset_key(map_id, reactor_class_name)
@redis.get(key)
end
|
#retrieve_map_results(map_id, reactor_class_name, strict_ordering: true) ⇒ Object
46
47
48
49
50
51
52
53
54
55
56
57
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 46
def retrieve_map_results(map_id, reactor_class_name, strict_ordering: true)
key = map_results_key(map_id, reactor_class_name)
if strict_ordering
results = @redis.hgetall(key)
results.keys.sort_by(&:to_i).map { |k| JSON.parse(results[k]) }
else
results = @redis.lrange(key, 0, -1)
results.map { |r| JSON.parse(r) }
end
end
|
#retrieve_map_results_batch(map_id, reactor_class_name, offset:, limit:, strict_ordering: true) ⇒ Object
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 263
def retrieve_map_results_batch(map_id, reactor_class_name, offset:, limit:, strict_ordering: true)
key = map_results_key(map_id, reactor_class_name)
if strict_ordering
fields = (offset...(offset + limit)).map(&:to_s)
results = @redis.hmget(key, *fields)
results.compact.map { |r| JSON.parse(r) }
else
end_index = offset + limit - 1
results = @redis.lrange(key, offset, end_index)
results.map { |r| JSON.parse(r) }
end
end
|
#scan_reactors(pattern: "reactor:*:context:*", count: 50) ⇒ Object
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 156
def scan_reactors(pattern: "reactor:*:context:*", count: 50)
results = []
batch_keys = []
@redis.scan_each(match: pattern, count: 100) do |key|
batch_keys << key
if batch_keys.size >= 50
results.concat(fetch_and_filter_reactors(batch_keys))
batch_keys = []
return results.take(count) if results.size >= count
end
end
results.concat(fetch_and_filter_reactors(batch_keys)) if batch_keys.any?
results.take(count)
end
|
#set_last_queued_index(map_id, index, reactor_class_name) ⇒ Object
98
99
100
101
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 98
def set_last_queued_index(map_id, index, reactor_class_name)
key = map_last_queued_index_key(map_id, reactor_class_name)
@redis.set(key, index, ex: 86_400)
end
|
#set_map_counter(map_id, count, reactor_class_name) ⇒ Object
59
60
61
62
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 59
def set_map_counter(map_id, count, reactor_class_name)
key = map_counter_key(map_id, reactor_class_name)
@redis.set(key, count, ex: 86_400)
end
|
#set_map_offset(map_id, offset, reactor_class_name) ⇒ Object
243
244
245
246
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 243
def set_map_offset(map_id, offset, reactor_class_name)
key = map_offset_key(map_id, reactor_class_name)
@redis.set(key, offset, ex: 86_400)
end
|
#set_map_offset_if_not_exists(map_id, offset, reactor_class_name) ⇒ Object
248
249
250
251
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 248
def set_map_offset_if_not_exists(map_id, offset, reactor_class_name)
key = map_offset_key(map_id, reactor_class_name)
@redis.set(key, offset, nx: true, ex: 86_400)
end
|
#store_context(context_id, serialized_context, reactor_class_name) ⇒ Object
17
18
19
20
21
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 17
def store_context(context_id, serialized_context, reactor_class_name)
key = context_key(context_id, reactor_class_name)
@redis.set(key, serialized_context, ex: 86_400) end
|
#store_correlation_id(correlation_id, context_id, reactor_class_name) ⇒ Object
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 108
def store_correlation_id(correlation_id, context_id, reactor_class_name)
key = correlation_id_key(correlation_id, reactor_class_name)
success = @redis.set(key, context_id, nx: true, ex: 86_400)
return if success
existing_context_id = @redis.get(key)
if existing_context_id == context_id
@redis.expire(key, 86_400)
return
end
raise Error::ValidationError, "Correlation ID '#{correlation_id}' already exists"
end
|
#store_map_element_context_id(map_id, context_id, reactor_class_name) ⇒ Object
216
217
218
219
220
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 216
def store_map_element_context_id(map_id, context_id, reactor_class_name)
key = map_element_contexts_key(map_id, reactor_class_name)
@redis.rpush(key, context_id)
@redis.expire(key, 86_400)
end
|
#store_map_failed_context_id(map_id, context_id, reactor_class_name) ⇒ Object
232
233
234
235
236
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 232
def store_map_failed_context_id(map_id, context_id, reactor_class_name)
key = map_failed_context_key(map_id, reactor_class_name)
@redis.set(key, context_id, nx: true, ex: 86_400)
end
|
#store_map_result(map_id, index, serialized_result, reactor_class_name, strict_ordering: true) ⇒ Object
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 31
def store_map_result(map_id, index, serialized_result, reactor_class_name, strict_ordering: true)
key = map_results_key(map_id, reactor_class_name)
if strict_ordering
@redis.hset(key, index.to_s, serialized_result.to_json)
else
@redis.rpush(key, serialized_result.to_json)
end
@redis.expire(key, 86_400)
end
|
#subscribe(channel, &block) ⇒ Object
143
144
145
|
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 143
def subscribe(channel, &block)
@redis.subscribe(channel, &block)
end
|