Class: Wurk::Processor
- Inherits:
-
Object
- Object
- Wurk::Processor
- Includes:
- Component
- Defined in:
- lib/wurk/processor.rb
Overview
Inside each Manager, N Processors run in parallel. Each owns one thread, pulls a UnitOfWork from the capsule’s fetcher, parses the payload, walks the server middleware chain, invokes ‘perform`, then ACKs (removes the payload from the per-process private list).
Shutdown is two-stage:
* `terminate` flips a flag; the run loop exits between jobs.
* `kill` additionally raises `Wurk::Shutdown` into the thread so an
in-flight perform unwinds. The current UoW is NOT acked, so the
payload survives in the private list and is reclaimed on next boot.
Spec: docs/target/sidekiq-free.md §14.
Defined Under Namespace
Classes: Counter, SharedWorkState
Constant Summary collapse
- PROCESSED =
Counter.new
- FAILURE =
Counter.new
- EXPIRED =
Counter.new
- WORK_STATE =
SharedWorkState.new
Constants included from Component
Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE
Instance Attribute Summary collapse
-
#capsule ⇒ Object
readonly
Returns the value of attribute capsule.
-
#job ⇒ Object
readonly
Returns the value of attribute job.
-
#thread ⇒ Object
readonly
Returns the value of attribute thread.
Attributes included from Component
Instance Method Summary collapse
-
#handle_exception(ex, ctx = {}) ⇒ Object
Capsule doesn’t define handle_exception (it’s a Configuration method); override Component’s delegation so error handlers fire.
-
#initialize(capsule, &callback) ⇒ Processor
constructor
A new instance of Processor.
-
#kill(wait = false) ⇒ Object
Hard-stop: flips the flag and unwinds the in-flight job by raising Wurk::Shutdown into the worker thread.
-
#process_one ⇒ Object
Single iteration: fetch one UoW, process it.
- #start ⇒ Object
- #stopping? ⇒ Boolean
-
#terminate(wait = false) ⇒ Object
Sidekiq surface — positional boolean to match the drop-in contract.
Methods included from Component
#default_tag, #fire_event, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog
Constructor Details
#initialize(capsule, &callback) ⇒ Processor
Returns a new instance of Processor.
27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/wurk/processor.rb', line 27 def initialize(capsule, &callback) @capsule = capsule @config = capsule @callback = callback @done = false @job = nil @thread = nil @reloader = capsule.config[:reloader] || proc { |&b| b.call } @job_logger = (capsule.config[:job_logger] || JobLogger).new(capsule.config) @retrier = JobRetry.new(capsule) end |
Instance Attribute Details
#capsule ⇒ Object (readonly)
Returns the value of attribute capsule.
25 26 27 |
# File 'lib/wurk/processor.rb', line 25 def capsule @capsule end |
#job ⇒ Object (readonly)
Returns the value of attribute job.
25 26 27 |
# File 'lib/wurk/processor.rb', line 25 def job @job end |
#thread ⇒ Object (readonly)
Returns the value of attribute thread.
25 26 27 |
# File 'lib/wurk/processor.rb', line 25 def thread @thread end |
Instance Method Details
#handle_exception(ex, ctx = {}) ⇒ Object
Capsule doesn’t define handle_exception (it’s a Configuration method); override Component’s delegation so error handlers fire.
69 70 71 |
# File 'lib/wurk/processor.rb', line 69 def handle_exception(ex, ctx = {}) @capsule.config.handle_exception(ex, ctx) end |
#kill(wait = false) ⇒ Object
Hard-stop: flips the flag and unwinds the in-flight job by raising Wurk::Shutdown into the worker thread. The UoW is intentionally not acked — the payload remains in the private list and is reclaimed on next boot via Reliable#bulk_requeue.
51 52 53 54 55 56 57 |
# File 'lib/wurk/processor.rb', line 51 def kill(wait = false) # rubocop:disable Style/OptionalBooleanParameter @done = true return if @thread.nil? @thread.raise ::Wurk::Shutdown @thread.value if wait end |
#process_one ⇒ Object
Single iteration: fetch one UoW, process it. Public so tests can drive the loop step-by-step without spawning a thread.
75 76 77 78 79 |
# File 'lib/wurk/processor.rb', line 75 def process_one @job = fetch process(@job) if @job @job = nil end |
#start ⇒ Object
63 64 65 |
# File 'lib/wurk/processor.rb', line 63 def start @thread ||= safe_thread("#{@capsule.name}/processor", &method(:run)) # rubocop:disable Naming/MemoizedInstanceVariableName end |
#stopping? ⇒ Boolean
59 60 61 |
# File 'lib/wurk/processor.rb', line 59 def stopping? @done end |
#terminate(wait = false) ⇒ Object
Sidekiq surface — positional boolean to match the drop-in contract.
40 41 42 43 44 45 |
# File 'lib/wurk/processor.rb', line 40 def terminate(wait = false) # rubocop:disable Style/OptionalBooleanParameter @done = true return if @thread.nil? @thread.value if wait end |