Module: DispatchPolicy::TickLoop

Defined in:
lib/dispatch_policy/tick_loop.rb

Overview

Drives admission until ‘stop_when` fires (deadline, shutdown signal, etc). Runs one Tick per policy per loop iteration; sleeps `idle_pause` when no jobs were admitted across all policies. Periodically (every `sweep_every_ticks` iterations) sweeps stale inflight rows and inactive partitions.

Class Method Summary collapse

Class Method Details

.pause(seconds) ⇒ Object

sleep, but never with a negative argument (which would raise ArgumentError mid-loop) — a non-positive pause just means “no pause”.



64
65
66
67
# File 'lib/dispatch_policy/tick_loop.rb', line 64

def pause(seconds)
  secs = seconds.to_f
  sleep(secs) if secs.positive?
end

.policy_names(filter) ⇒ Object



69
70
71
72
73
74
75
# File 'lib/dispatch_policy/tick_loop.rb', line 69

def policy_names(filter)
  if filter
    [filter.to_s]
  else
    DispatchPolicy.registry.names
  end
end

.run(policy_name: nil, shard: nil, stop_when: -> { false }) ⇒ Object

Parameters:

  • policy_name (String, nil) (defaults to: nil)

    limit to one policy. nil = all registered.

  • shard (String, nil) (defaults to: nil)

    limit to one shard. nil = all shards.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/dispatch_policy/tick_loop.rb', line 14

def run(policy_name: nil, shard: nil, stop_when: -> { false })
  config       = DispatchPolicy.config
  logger       = config.logger
  iteration    = 0

  loop do
    break if stop_when.call

    unless DispatchPolicy.config.enabled
      # Master switch off: stop polling. The job that drives
      # TickLoop.run will re-schedule itself; we exit cleanly so
      # the next iteration sees the flag and stops too.
      logger&.info("[dispatch_policy] TickLoop exiting because config.enabled = false")
      break
    end

    names = policy_names(policy_name)
    if names.empty?
      pause(config.idle_pause)
      next
    end

    admitted = 0
    names.each do |name|
      break if stop_when.call

      begin
        result = Tick.run(policy_name: name, shard: shard)
        admitted += result.jobs_admitted
      rescue StandardError => e
        logger&.error("[dispatch_policy] tick error policy=#{name} shard=#{shard.inspect} #{e.class}: #{e.message}\n#{e.backtrace.first(10).join("\n")}")
      end
    end

    iteration += 1
    # sweep_every_ticks <= 0 means "never sweep" (rather than crashing
    # the loop with ZeroDivisionError on `iteration % 0`).
    sweep_every = config.sweep_every_ticks.to_i
    sweep! if sweep_every.positive? && (iteration % sweep_every).zero?

    if admitted.zero?
      pause(config.idle_pause)
    else
      pause(config.busy_pause)
    end
  end
end

.sweep!Object



77
78
79
80
81
82
83
84
85
86
87
# File 'lib/dispatch_policy/tick_loop.rb', line 77

def sweep!
  cfg = DispatchPolicy.config
  Repository.sweep_stale_inflight!(
    cutoff_seconds:        cfg.inflight_stale_after,
    queued_cutoff_seconds: cfg.inflight_queued_stale_after
  )
  Repository.sweep_inactive_partitions!(cutoff_seconds: cfg.partition_inactive_after)
  Repository.sweep_old_tick_samples!(cutoff_seconds: cfg.metrics_retention)
rescue StandardError => e
  DispatchPolicy.config.logger&.error("[dispatch_policy] sweep error: #{e.class}: #{e.message}")
end