Class: Pgbus::Process::Dispatcher
- Inherits:
-
Object
- Object
- Pgbus::Process::Dispatcher
- Includes:
- SignalHandler
- Defined in:
- lib/pgbus/process/dispatcher.rb
Constant Summary collapse
- CLEANUP_INTERVAL =
Maintenance runs on coarser intervals than the main loop
3600- REAP_INTERVAL =
Run idempotency cleanup every hour
300- CONCURRENCY_INTERVAL =
Run stale process reaping every 5 minutes
300- BATCH_CLEANUP_INTERVAL =
Run concurrency cleanup every 5 minutes
3600- RECURRING_CLEANUP_INTERVAL =
Run batch cleanup every hour
3600- ARCHIVE_COMPACTION_INTERVAL =
Run recurring execution cleanup every hour
3600- OUTBOX_CLEANUP_INTERVAL =
Run archive compaction every hour
3600- JOB_LOCK_CLEANUP_INTERVAL =
Run outbox cleanup every hour
300- STATS_CLEANUP_INTERVAL =
Run job lock cleanup every 5 minutes
3600- ARCHIVE_COMPACTION_BATCH_SIZE =
Page size for archive compaction. Each cycle deletes up to this many archived rows per queue. Tuned via constant rather than configuration because the value rarely needs adjusting and a too-small value just delays cleanup, never breaks anything.
1000
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
Instance Method Summary collapse
- #graceful_shutdown ⇒ Object
- #immediate_shutdown ⇒ Object
-
#initialize(config: Pgbus.configuration) ⇒ Dispatcher
constructor
A new instance of Dispatcher.
- #run ⇒ Object
Methods included from SignalHandler
included, #interruptible_sleep, #process_signals, #restore_signals, #setup_signals
Constructor Details
#initialize(config: Pgbus.configuration) ⇒ Dispatcher
Returns a new instance of Dispatcher.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/pgbus/process/dispatcher.rb', line 27 def initialize(config: Pgbus.configuration) @config = config @shutting_down = false @last_cleanup_at = monotonic_now @last_reap_at = monotonic_now @last_concurrency_at = monotonic_now @last_batch_cleanup_at = monotonic_now @last_recurring_cleanup_at = monotonic_now @last_archive_compaction_at = monotonic_now @last_stream_archive_compaction_at = monotonic_now @last_outbox_cleanup_at = monotonic_now @last_job_lock_cleanup_at = monotonic_now @last_stats_cleanup_at = monotonic_now end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
25 26 27 |
# File 'lib/pgbus/process/dispatcher.rb', line 25 def config @config end |
Instance Method Details
#graceful_shutdown ⇒ Object
64 65 66 |
# File 'lib/pgbus/process/dispatcher.rb', line 64 def graceful_shutdown @shutting_down = true end |
#immediate_shutdown ⇒ Object
68 69 70 |
# File 'lib/pgbus/process/dispatcher.rb', line 68 def immediate_shutdown @shutting_down = true end |
#run ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/pgbus/process/dispatcher.rb', line 42 def run setup_signals start_heartbeat Pgbus.logger.info do "[Pgbus] Dispatcher started: interval=#{config.dispatch_interval}s" end loop do break if @shutting_down process_signals break if @shutting_down run_maintenance break if @shutting_down interruptible_sleep(config.dispatch_interval) end shutdown end |