Class: RubyReactor::SidekiqWorkers::Worker
- Inherits:
-
Object
- Object
- RubyReactor::SidekiqWorkers::Worker
- Includes:
- Sidekiq::Worker
- Defined in:
- lib/ruby_reactor/sidekiq_workers/worker.rb
Overview
Sidekiq worker for executing RubyReactor reactors asynchronously with non-blocking retry capabilities
Instance Method Summary collapse
-
#perform(context_id, reactor_class_name = nil, snooze_count = 0) ⇒ Object
Identity-only payload: storage is the source of truth.
Instance Method Details
#perform(context_id, reactor_class_name = nil, snooze_count = 0) ⇒ Object
Identity-only payload: storage is the source of truth. Rehydrate the live context from storage by id, then resume. A nil read means the context was swept, expired, or already terminal-and-collected — nothing to resume.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/ruby_reactor/sidekiq_workers/worker.rb', line 23 def perform(context_id, reactor_class_name = nil, snooze_count = 0) # Normalize so a nil/omitted name resolves to the same storage key the # enqueue path wrote (always via reactor_storage_name). Without this a # nil here builds "reactor::context:<id>" and misses the stored # "reactor:AnonymousReactor:context:<id>", silently no-op'ing. reactor_class_name ||= RubyReactor.reactor_storage_name(nil) data = RubyReactor.configuration.storage_adapter.retrieve_context(context_id, reactor_class_name) return if data.nil? begin context = ContextSerializer.deserialize_hash(data) rescue RubyReactor::Error::DeserializationError, RubyReactor::Error::SchemaVersionError => e # Permanent failures — re-reading the same stored blob will keep # failing. Mark the context as failed (best-effort) and return so # Sidekiq does not burn its retry budget. handle_deserialization_failure(context_id, reactor_class_name, e) return end # If reactor_class_name is provided, use it to get the reactor class # This handles cases where the class can't be found via const_get if reactor_class_name && context.reactor_class.nil? begin context.reactor_class = Object.const_get(reactor_class_name) rescue NameError # If not found, try to find it in the current namespace # This is a fallback for test environments context.reactor_class = reactor_class_name.constantize if reactor_class_name.respond_to?(:constantize) end end # Mark that we're executing inline to prevent nested async calls context.inline_async_execution = true begin # Resume execution from the failed step executor = Executor.new(context.reactor_class, {}, context) executor.resume_execution # No explicit save here: resume_execution's ensure block already persists # the final root state (`save_context unless skip_context_persist?`), and # in the worker the executor's context IS the root, so an extra checkpoint! # would just re-write the identical blob to the identical key. The # skip_context_persist? guard (stale-batch redelivery of an already-terminal # context) is likewise honored there. # Return the executor (which now has the result stored in it) executor rescue RubyReactor::Lock::AcquisitionError, RubyReactor::Semaphore::AcquisitionError, RubyReactor::RateLimit::ExceededError, RubyReactor::OrderedLock::WaitError => e # Snooze on expected concurrency, rate, or ordering contention. # OrderedLock::WaitError carries a poison-pill-derived retry hint, # consumed by compute_snooze_delay below. We avoid Sidekiq's native # retry path so this doesn't burn the job's retry budget or appear # as an error in dashboards. After the configured cap is reached we # escalate by marking the reactor as failed. handle_snooze(context_id, reactor_class_name, context, snooze_count, e) rescue RubyReactor::RateLimitRegistry::UnknownLimitError => e # Permanent configuration error — snoozing or retrying the same job # will keep failing. Mark the context failed immediately. escalate_snooze(context, snooze_count, e) end end |