Class: Pgbus::Process::Dispatcher

Inherits:
Object
  • Object
show all
Includes:
SignalHandler
Defined in:
lib/pgbus/process/dispatcher.rb

Constant Summary collapse

CLEANUP_INTERVAL =

Maintenance runs on coarser intervals than the main loop

3600
REAP_INTERVAL =

Run idempotency cleanup every hour

300
CONCURRENCY_INTERVAL =

Run stale process reaping every 5 minutes

300
BATCH_CLEANUP_INTERVAL =

Run concurrency cleanup every 5 minutes

3600
RECURRING_CLEANUP_INTERVAL =

Run batch cleanup every hour

3600
ARCHIVE_COMPACTION_INTERVAL =

Run recurring execution cleanup every hour

3600
OUTBOX_CLEANUP_INTERVAL =

Run archive compaction every hour

3600
JOB_LOCK_CLEANUP_INTERVAL =

Run outbox cleanup every hour

300
STATS_CLEANUP_INTERVAL =

Run job lock cleanup every 5 minutes

3600
ARCHIVE_COMPACTION_BATCH_SIZE =

Page size for archive compaction. Each cycle deletes up to this many archived rows per queue. Tuned via constant rather than configuration because the value rarely needs adjusting and a too-small value just delays cleanup, never breaks anything.

1000

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from SignalHandler

included, #interruptible_sleep, #process_signals, #restore_signals, #setup_signals

Constructor Details

#initialize(config: Pgbus.configuration) ⇒ Dispatcher

Returns a new instance of Dispatcher.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/pgbus/process/dispatcher.rb', line 27

def initialize(config: Pgbus.configuration)
  @config = config
  @shutting_down = false
  @last_cleanup_at = monotonic_now
  @last_reap_at = monotonic_now
  @last_concurrency_at = monotonic_now
  @last_batch_cleanup_at = monotonic_now
  @last_recurring_cleanup_at = monotonic_now
  @last_archive_compaction_at = monotonic_now
  @last_stream_archive_compaction_at = monotonic_now
  @last_outbox_cleanup_at = monotonic_now
  @last_job_lock_cleanup_at = monotonic_now
  @last_stats_cleanup_at = monotonic_now
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



25
26
27
# File 'lib/pgbus/process/dispatcher.rb', line 25

def config
  @config
end

Instance Method Details

#graceful_shutdownObject



64
65
66
# File 'lib/pgbus/process/dispatcher.rb', line 64

def graceful_shutdown
  @shutting_down = true
end

#immediate_shutdownObject



68
69
70
# File 'lib/pgbus/process/dispatcher.rb', line 68

def immediate_shutdown
  @shutting_down = true
end

#runObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/pgbus/process/dispatcher.rb', line 42

def run
  setup_signals
  start_heartbeat
  Pgbus.logger.info do
    "[Pgbus] Dispatcher started: interval=#{config.dispatch_interval}s"
  end

  loop do
    break if @shutting_down

    process_signals
    break if @shutting_down

    run_maintenance
    break if @shutting_down

    interruptible_sleep(config.dispatch_interval)
  end

  shutdown
end