Class: RubyReactor::SidekiqWorkers::Worker

Inherits:
Object
  • Object
show all
Includes:
Sidekiq::Worker
Defined in:
lib/ruby_reactor/sidekiq_workers/worker.rb

Overview

Sidekiq worker for executing RubyReactor reactors asynchronously with non-blocking retry capabilities

Instance Method Summary collapse

Instance Method Details

#perform(context_id, reactor_class_name = nil, snooze_count = 0) ⇒ Object

Identity-only payload: storage is the source of truth. Rehydrate the live context from storage by id, then resume. A nil read means the context was swept, expired, or already terminal-and-collected — nothing to resume.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/ruby_reactor/sidekiq_workers/worker.rb', line 23

def perform(context_id, reactor_class_name = nil, snooze_count = 0)
  # Normalize so a nil/omitted name resolves to the same storage key the
  # enqueue path wrote (always via reactor_storage_name). Without this a
  # nil here builds "reactor::context:<id>" and misses the stored
  # "reactor:AnonymousReactor:context:<id>", silently no-op'ing.
  reactor_class_name ||= RubyReactor.reactor_storage_name(nil)
  data = RubyReactor.configuration.storage_adapter.retrieve_context(context_id, reactor_class_name)
  return if data.nil?

  begin
    context = ContextSerializer.deserialize_hash(data)
  rescue RubyReactor::Error::DeserializationError,
         RubyReactor::Error::SchemaVersionError => e
    # Permanent failures — re-reading the same stored blob will keep
    # failing. Mark the context as failed (best-effort) and return so
    # Sidekiq does not burn its retry budget.
    handle_deserialization_failure(context_id, reactor_class_name, e)
    return
  end

  # If reactor_class_name is provided, use it to get the reactor class
  # This handles cases where the class can't be found via const_get
  if reactor_class_name && context.reactor_class.nil?
    begin
      context.reactor_class = Object.const_get(reactor_class_name)
    rescue NameError
      # If not found, try to find it in the current namespace
      # This is a fallback for test environments
      context.reactor_class = reactor_class_name.constantize if reactor_class_name.respond_to?(:constantize)
    end
  end

  # Mark that we're executing inline to prevent nested async calls
  context.inline_async_execution = true

  begin
    # Resume execution from the failed step
    executor = Executor.new(context.reactor_class, {}, context)
    executor.resume_execution
    # No explicit save here: resume_execution's ensure block already persists
    # the final root state (`save_context unless skip_context_persist?`), and
    # in the worker the executor's context IS the root, so an extra checkpoint!
    # would just re-write the identical blob to the identical key. The
    # skip_context_persist? guard (stale-batch redelivery of an already-terminal
    # context) is likewise honored there.

    # Return the executor (which now has the result stored in it)
    executor
  rescue RubyReactor::Lock::AcquisitionError,
         RubyReactor::Semaphore::AcquisitionError,
         RubyReactor::RateLimit::ExceededError,
         RubyReactor::OrderedLock::WaitError => e
    # Snooze on expected concurrency, rate, or ordering contention.
    # OrderedLock::WaitError carries a poison-pill-derived retry hint,
    # consumed by compute_snooze_delay below. We avoid Sidekiq's native
    # retry path so this doesn't burn the job's retry budget or appear
    # as an error in dashboards. After the configured cap is reached we
    # escalate by marking the reactor as failed.
    handle_snooze(context_id, reactor_class_name, context, snooze_count, e)
  rescue RubyReactor::RateLimitRegistry::UnknownLimitError => e
    # Permanent configuration error — snoozing or retrying the same job
    # will keep failing. Mark the context failed immediately.
    escalate_snooze(context, snooze_count, e)
  end
end