Class: Wurk::Processor

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/wurk/processor.rb

Overview

Inside each Manager, N Processors run in parallel. Each owns one thread, pulls a UnitOfWork from the capsule’s fetcher, parses the payload, walks the server middleware chain, invokes ‘perform`, then ACKs (removes the payload from the per-process private list).

Shutdown is two-stage:

* `terminate` flips a flag; the run loop exits between jobs.
* `kill` additionally raises `Wurk::Shutdown` into the thread so an
  in-flight perform unwinds. The current UoW is NOT acked, so the
  payload survives in the private list and is reclaimed on next boot.

Spec: docs/target/sidekiq-free.md §14.

Defined Under Namespace

Classes: Counter, SharedWorkState

Constant Summary collapse

PROCESSED =
Counter.new
FAILURE =
Counter.new
EXPIRED =
Counter.new
WORK_STATE =
SharedWorkState.new

Constants included from Component

Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE

Instance Attribute Summary collapse

Attributes included from Component

#config

Instance Method Summary collapse

Methods included from Component

#default_tag, #fire_event, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog

Constructor Details

#initialize(capsule, &callback) ⇒ Processor

Returns a new instance of Processor.



27
28
29
30
31
32
33
34
35
36
37
# File 'lib/wurk/processor.rb', line 27

def initialize(capsule, &callback)
  @capsule = capsule
  @config = capsule
  @callback = callback
  @done = false
  @job = nil
  @thread = nil
  @reloader = capsule.config[:reloader] || proc { |&b| b.call }
  @job_logger = (capsule.config[:job_logger] || JobLogger).new(capsule.config)
  @retrier = JobRetry.new(capsule)
end

Instance Attribute Details

#capsuleObject (readonly)

Returns the value of attribute capsule.



25
26
27
# File 'lib/wurk/processor.rb', line 25

def capsule
  @capsule
end

#jobObject (readonly)

Returns the value of attribute job.



25
26
27
# File 'lib/wurk/processor.rb', line 25

def job
  @job
end

#threadObject (readonly)

Returns the value of attribute thread.



25
26
27
# File 'lib/wurk/processor.rb', line 25

def thread
  @thread
end

Instance Method Details

#handle_exception(ex, ctx = {}) ⇒ Object

Capsule doesn’t define handle_exception (it’s a Configuration method); override Component’s delegation so error handlers fire.



69
70
71
# File 'lib/wurk/processor.rb', line 69

def handle_exception(ex, ctx = {})
  @capsule.config.handle_exception(ex, ctx)
end

#kill(wait = false) ⇒ Object

Hard-stop: flips the flag and unwinds the in-flight job by raising Wurk::Shutdown into the worker thread. The UoW is intentionally not acked — the payload remains in the private list and is reclaimed on next boot via Reliable#bulk_requeue.



51
52
53
54
55
56
57
# File 'lib/wurk/processor.rb', line 51

def kill(wait = false) # rubocop:disable Style/OptionalBooleanParameter
  @done = true
  return if @thread.nil?

  @thread.raise ::Wurk::Shutdown
  @thread.value if wait
end

#process_oneObject

Single iteration: fetch one UoW, process it. Public so tests can drive the loop step-by-step without spawning a thread.



75
76
77
78
79
# File 'lib/wurk/processor.rb', line 75

def process_one
  @job = fetch
  process(@job) if @job
  @job = nil
end

#startObject



63
64
65
# File 'lib/wurk/processor.rb', line 63

def start
  @thread ||= safe_thread("#{@capsule.name}/processor", &method(:run)) # rubocop:disable Naming/MemoizedInstanceVariableName
end

#stopping?Boolean

Returns:

  • (Boolean)


59
60
61
# File 'lib/wurk/processor.rb', line 59

def stopping?
  @done
end

#terminate(wait = false) ⇒ Object

Sidekiq surface — positional boolean to match the drop-in contract.



40
41
42
43
44
45
# File 'lib/wurk/processor.rb', line 40

def terminate(wait = false) # rubocop:disable Style/OptionalBooleanParameter
  @done = true
  return if @thread.nil?

  @thread.value if wait
end