Class: RubyReactor::SidekiqWorkers::Worker
- Inherits:
-
Object
- Object
- RubyReactor::SidekiqWorkers::Worker
- Includes:
- Sidekiq::Worker
- Defined in:
- lib/ruby_reactor/sidekiq_workers/worker.rb
Overview
Sidekiq worker for executing RubyReactor reactors asynchronously with non-blocking retry capabilities
Instance Method Summary collapse
Instance Method Details
#perform(serialized_context, reactor_class_name = nil, snooze_count = 0) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/ruby_reactor/sidekiq_workers/worker.rb', line 20 def perform(serialized_context, reactor_class_name = nil, snooze_count = 0) begin context = ContextSerializer.deserialize(serialized_context) rescue RubyReactor::Error::DeserializationError, RubyReactor::Error::SchemaVersionError => e # Permanent failures — retrying the same blob will keep failing. # Mark the context as failed (best-effort) and return so Sidekiq # does not burn its retry budget. handle_deserialization_failure(serialized_context, reactor_class_name, e) return end # If reactor_class_name is provided, use it to get the reactor class # This handles cases where the class can't be found via const_get if reactor_class_name && context.reactor_class.nil? begin context.reactor_class = Object.const_get(reactor_class_name) rescue NameError # If not found, try to find it in the current namespace # This is a fallback for test environments context.reactor_class = reactor_class_name.constantize if reactor_class_name.respond_to?(:constantize) end end # Mark that we're executing inline to prevent nested async calls context.inline_async_execution = true begin # Resume execution from the failed step executor = Executor.new(context.reactor_class, {}, context) executor.resume_execution # Skip the post-run save when the executor deliberately suppressed # persistence (stale-batch redelivery of an already-terminal context) # — re-saving here would clobber the stored terminal record with this # run's stale in-memory status. executor.save_context unless executor.skip_context_persist? # Return the executor (which now has the result stored in it) executor rescue RubyReactor::Lock::AcquisitionError, RubyReactor::Semaphore::AcquisitionError, RubyReactor::RateLimit::ExceededError, RubyReactor::OrderedLock::WaitError => e # Snooze on expected concurrency, rate, or ordering contention. # OrderedLock::WaitError carries a poison-pill-derived retry hint, # consumed by compute_snooze_delay below. We avoid Sidekiq's native # retry path so this doesn't burn the job's retry budget or appear # as an error in dashboards. After the configured cap is reached we # escalate by marking the reactor as failed. handle_snooze(serialized_context, reactor_class_name, context, snooze_count, e) rescue RubyReactor::RateLimitRegistry::UnknownLimitError => e # Permanent configuration error — snoozing or retrying the same job # will keep failing. Mark the context failed immediately. escalate_snooze(context, snooze_count, e) end end |