Class: RubyReactor::SidekiqWorkers::Worker
- Inherits:
-
Object
- Object
- RubyReactor::SidekiqWorkers::Worker
- Includes:
- Sidekiq::Worker
- Defined in:
- lib/ruby_reactor/sidekiq_workers/worker.rb
Overview
Sidekiq worker for executing RubyReactor reactors asynchronously with non-blocking retry capabilities
Instance Method Summary collapse
Instance Method Details
#perform(serialized_context, reactor_class_name = nil, snooze_count = 0) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/ruby_reactor/sidekiq_workers/worker.rb', line 20 def perform(serialized_context, reactor_class_name = nil, snooze_count = 0) context = ContextSerializer.deserialize(serialized_context) # If reactor_class_name is provided, use it to get the reactor class # This handles cases where the class can't be found via const_get if reactor_class_name && context.reactor_class.nil? begin context.reactor_class = Object.const_get(reactor_class_name) rescue NameError # If not found, try to find it in the current namespace # This is a fallback for test environments context.reactor_class = reactor_class_name.constantize if reactor_class_name.respond_to?(:constantize) end end # Mark that we're executing inline to prevent nested async calls context.inline_async_execution = true begin # Resume execution from the failed step executor = Executor.new(context.reactor_class, {}, context) executor.resume_execution executor.save_context # Return the executor (which now has the result stored in it) executor rescue RubyReactor::Lock::AcquisitionError, RubyReactor::Semaphore::AcquisitionError, RubyReactor::RateLimit::ExceededError => e # Snooze on expected concurrency or rate contention. We avoid # Sidekiq's native retry path so this doesn't burn the job's retry # budget or appear as an error in dashboards. After the configured # cap is reached we escalate by marking the reactor as failed. handle_snooze(serialized_context, reactor_class_name, context, snooze_count, e) end end |