Module: DispatchPolicy::TickLoop

Defined in:
lib/dispatch_policy/tick_loop.rb

Overview

Drives admission until ‘stop_when` fires (deadline, shutdown signal, etc). Runs one Tick per policy per loop iteration; sleeps `idle_pause` when no jobs were admitted across all policies. Periodically (every `sweep_every_ticks` iterations) sweeps stale inflight rows and inactive partitions.

Class Method Summary collapse

Class Method Details

.policy_names(filter) ⇒ Object



61
62
63
64
65
66
67
# File 'lib/dispatch_policy/tick_loop.rb', line 61

def policy_names(filter)
  if filter
    [filter.to_s]
  else
    DispatchPolicy.registry.names
  end
end

.run(policy_name: nil, shard: nil, stop_when: -> { false }) ⇒ Object

Parameters:

  • policy_name (String, nil) (defaults to: nil)

    limit to one policy. nil = all registered.

  • shard (String, nil) (defaults to: nil)

    limit to one shard. nil = all shards.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/dispatch_policy/tick_loop.rb', line 14

def run(policy_name: nil, shard: nil, stop_when: -> { false })
  config       = DispatchPolicy.config
  logger       = config.logger
  iteration    = 0

  loop do
    break if stop_when.call

    unless DispatchPolicy.config.enabled
      # Master switch off: stop polling. The job that drives
      # TickLoop.run will re-schedule itself; we exit cleanly so
      # the next iteration sees the flag and stops too.
      logger&.info("[dispatch_policy] TickLoop exiting because config.enabled = false")
      break
    end

    names = policy_names(policy_name)
    if names.empty?
      sleep(config.idle_pause)
      next
    end

    admitted = 0
    names.each do |name|
      break if stop_when.call

      begin
        result = Tick.run(policy_name: name, shard: shard)
        admitted += result.jobs_admitted
      rescue StandardError => e
        logger&.error("[dispatch_policy] tick error policy=#{name} shard=#{shard.inspect} #{e.class}: #{e.message}\n#{e.backtrace.first(10).join("\n")}")
      end
    end

    iteration += 1
    if (iteration % config.sweep_every_ticks).zero?
      sweep!
    end

    if admitted.zero?
      sleep(config.idle_pause)
    elsif config.busy_pause.to_f.positive?
      sleep(config.busy_pause)
    end
  end
end

.sweep!Object



69
70
71
72
73
74
75
76
# File 'lib/dispatch_policy/tick_loop.rb', line 69

def sweep!
  cfg = DispatchPolicy.config
  Repository.sweep_stale_inflight!(cutoff_seconds: cfg.inflight_stale_after)
  Repository.sweep_inactive_partitions!(cutoff_seconds: cfg.partition_inactive_after)
  Repository.sweep_old_tick_samples!(cutoff_seconds: cfg.metrics_retention)
rescue StandardError => e
  DispatchPolicy.config.logger&.error("[dispatch_policy] sweep error: #{e.class}: #{e.message}")
end