Class: Evilution::Parallel::WorkQueue::Dispatcher::DeadlineTracker

Inherits:
Object
  • Object
show all
Defined in:
lib/evilution/parallel/work_queue/dispatcher/deadline_tracker.rb

Overview

Owns the per-worker item-timeout deadline clock for the Dispatcher: arming a worker’s deadline when it goes busy, re-arming it on each result, surfacing the workers whose deadline has passed, and computing how long IO.select may block. Each worker carries its own deadline so a single stuck worker is reaped in isolation rather than aborting the whole pool (EV-gl1e). Pulling this cohesive timeout concern out of the Dispatcher keeps the dispatcher focused on the collect/recycle orchestration (EV-9mij).

‘workers` is the Dispatcher’s live array (mutated in place as workers recycle), so the tracker always reads the current pool. ‘clock` is injectable for tests.

Instance Method Summary collapse

Constructor Details

#initialize(item_timeout:, workers:, clock: -> { Process.clock_gettime(Process::CLOCK_MONOTONIC) }) ⇒ DeadlineTracker

Returns a new instance of DeadlineTracker.



16
17
18
19
20
# File 'lib/evilution/parallel/work_queue/dispatcher/deadline_tracker.rb', line 16

def initialize(item_timeout:, workers:, clock: -> { Process.clock_gettime(Process::CLOCK_MONOTONIC) })
  @item_timeout = item_timeout
  @workers = workers
  @clock = clock
end

Instance Method Details

#enabled?Boolean

Returns:

  • (Boolean)


22
23
24
# File 'lib/evilution/parallel/work_queue/dispatcher/deadline_tracker.rb', line 22

def enabled?
  !@item_timeout.nil?
end

#overdueObject

Workers whose deadline has passed while still holding in-flight work.



38
39
40
41
42
43
# File 'lib/evilution/parallel/work_queue/dispatcher/deadline_tracker.rb', line 38

def overdue
  return [] unless enabled?

  moment = now
  @workers.select { |worker| worker.deadline && worker.deadline <= moment && worker.pending.positive? }
end

#refresh(worker) ⇒ Object

After a result: re-arm while work remains, otherwise stop the clock.



54
55
56
# File 'lib/evilution/parallel/work_queue/dispatcher/deadline_tracker.rb', line 54

def refresh(worker)
  worker.deadline = (now + @item_timeout if enabled? && worker.pending.positive?)
end

#select_timeoutObject

Seconds IO.select may block: until the nearest worker deadline (never negative), or the raw timeout when no worker is currently on the clock.



28
29
30
31
32
33
34
35
# File 'lib/evilution/parallel/work_queue/dispatcher/deadline_tracker.rb', line 28

def select_timeout
  return @item_timeout unless enabled?

  deadlines = @workers.filter_map(&:deadline)
  return @item_timeout if deadlines.empty?

  [deadlines.min - now, 0].max
end

#start(worker) ⇒ Object

Arm a worker’s clock when it first goes busy; idempotent for the in-flight item so a refresh does not extend an already-running deadline.



47
48
49
50
51
# File 'lib/evilution/parallel/work_queue/dispatcher/deadline_tracker.rb', line 47

def start(worker)
  return unless enabled?

  worker.deadline ||= now + @item_timeout
end