Class: Evilution::Parallel::WorkQueue::Dispatcher::DeadlineTracker
- Inherits:
-
Object
- Object
- Evilution::Parallel::WorkQueue::Dispatcher::DeadlineTracker
- Defined in:
- lib/evilution/parallel/work_queue/dispatcher/deadline_tracker.rb
Overview
Owns the per-worker item-timeout deadline clock for the Dispatcher: arming a worker’s deadline when it goes busy, re-arming it on each result, surfacing the workers whose deadline has passed, and computing how long IO.select may block. Each worker carries its own deadline so a single stuck worker is reaped in isolation rather than aborting the whole pool (EV-gl1e). Pulling this cohesive timeout concern out of the Dispatcher keeps the dispatcher focused on the collect/recycle orchestration (EV-9mij).
‘workers` is the Dispatcher’s live array (mutated in place as workers recycle), so the tracker always reads the current pool. ‘clock` is injectable for tests.
Instance Method Summary collapse
- #enabled? ⇒ Boolean
-
#initialize(item_timeout:, workers:, clock: -> { Process.clock_gettime(Process::CLOCK_MONOTONIC) }) ⇒ DeadlineTracker
constructor
A new instance of DeadlineTracker.
-
#overdue ⇒ Object
Workers whose deadline has passed while still holding in-flight work.
-
#refresh(worker) ⇒ Object
After a result: re-arm while work remains, otherwise stop the clock.
-
#select_timeout ⇒ Object
Seconds IO.select may block: until the nearest worker deadline (never negative), or the raw timeout when no worker is currently on the clock.
-
#start(worker) ⇒ Object
Arm a worker’s clock when it first goes busy; idempotent for the in-flight item so a refresh does not extend an already-running deadline.
Constructor Details
#initialize(item_timeout:, workers:, clock: -> { Process.clock_gettime(Process::CLOCK_MONOTONIC) }) ⇒ DeadlineTracker
Returns a new instance of DeadlineTracker.
16 17 18 19 20 |
# File 'lib/evilution/parallel/work_queue/dispatcher/deadline_tracker.rb', line 16 def initialize(item_timeout:, workers:, clock: -> { Process.clock_gettime(Process::CLOCK_MONOTONIC) }) @item_timeout = item_timeout @workers = workers @clock = clock end |
Instance Method Details
#enabled? ⇒ Boolean
22 23 24 |
# File 'lib/evilution/parallel/work_queue/dispatcher/deadline_tracker.rb', line 22 def enabled? !@item_timeout.nil? end |
#overdue ⇒ Object
Workers whose deadline has passed while still holding in-flight work.
38 39 40 41 42 43 |
# File 'lib/evilution/parallel/work_queue/dispatcher/deadline_tracker.rb', line 38 def overdue return [] unless enabled? moment = now @workers.select { |worker| worker.deadline && worker.deadline <= moment && worker.pending.positive? } end |
#refresh(worker) ⇒ Object
After a result: re-arm while work remains, otherwise stop the clock.
54 55 56 |
# File 'lib/evilution/parallel/work_queue/dispatcher/deadline_tracker.rb', line 54 def refresh(worker) worker.deadline = (now + @item_timeout if enabled? && worker.pending.positive?) end |
#select_timeout ⇒ Object
Seconds IO.select may block: until the nearest worker deadline (never negative), or the raw timeout when no worker is currently on the clock.
28 29 30 31 32 33 34 35 |
# File 'lib/evilution/parallel/work_queue/dispatcher/deadline_tracker.rb', line 28 def select_timeout return @item_timeout unless enabled? deadlines = @workers.filter_map(&:deadline) return @item_timeout if deadlines.empty? [deadlines.min - now, 0].max end |
#start(worker) ⇒ Object
Arm a worker’s clock when it first goes busy; idempotent for the in-flight item so a refresh does not extend an already-running deadline.
47 48 49 50 51 |
# File 'lib/evilution/parallel/work_queue/dispatcher/deadline_tracker.rb', line 47 def start(worker) return unless enabled? worker.deadline ||= now + @item_timeout end |