Class: Pgbus::Configuration

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/configuration.rb,
lib/pgbus/configuration/capsule_dsl.rb

Defined Under Namespace

Classes: CapsuleDSL

Constant Summary collapse

VALID_ROLES =
%i[workers dispatcher scheduler consumers outbox].freeze
VALID_LOG_FORMATS =
%i[text json].freeze
VALID_BROADCAST_MODES =
%i[ephemeral durable].freeze
VALID_GROUP_MODES =
[nil, :fifo, :round_robin].freeze
VALID_PGMQ_SCHEMA_MODES =
%i[auto extension embedded].freeze
POOL_SIZE_WARN_THRESHOLD =

Returns the connection pool size to use for the PGMQ client.

If pool_size was explicitly set, returns that value unchanged. Otherwise auto-derives from the threads needed by the roles this process actually runs (respects Configuration#roles from –workers-only / –scheduler-only / –dispatcher-only):

workers role      sum(workers.threads)
consumers role    sum(event_consumers.threads)
dispatcher role   +1
scheduler role    +1

A –scheduler-only deployment that has 50 worker threads configured only needs 1 connection (for the scheduler), not 52.

Auto-tune protects users from the common pitfall of running 15 worker threads with a hand-set pool_size of 5 (resulting in ConnectionPool timeouts under load). Setting pool_size explicitly is still supported for advanced cases where you need a tighter or looser pool than the default formula provides.

50
ASYNC_POOL_CONNECTIONS =

Connections needed per async worker: one for the reactor’s serial execution, one for polling, one for headroom. Fibers share connections because only one runs at a time per reactor thread.

3

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeConfiguration

Returns a new instance of Configuration.



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/pgbus/configuration.rb', line 138

def initialize
  @database_url = nil
  @connection_params = nil
  @pool_size = nil
  @pool_timeout = 5

  @default_queue = "default"
  @queue_prefix = "pgbus"

  @workers = [{ queues: %w[default], threads: 5 }]
  @roles = nil
  @polling_interval = 0.1
  @visibility_timeout = 30

  @prefetch_limit = nil
  @execution_mode = :threads

  @max_jobs_per_worker = nil
  @max_memory_mb = nil
  @max_worker_lifetime = nil

  @stall_threshold = 90
  @read_timeout = 30

  @dispatch_interval = 1.0

  @circuit_breaker_enabled = true

  @max_retries = 5
  @retry_backoff = 5          # seconds — first VT-retry delay
  @retry_backoff_max = 300    # 5 minutes cap
  @retry_backoff_jitter = 0.15

  @priority_levels = nil
  @default_priority = 1
  @group_mode = nil

  @archive_retention = 7 * 24 * 3600 # 7 days

  @outbox_enabled = false
  @outbox_poll_interval = 1.0
  @outbox_batch_size = 100
  @outbox_retention = 24 * 3600 # 1 day

  @idempotency_ttl = 7 * 24 * 3600 # 7 days
  @allowed_global_id_models = nil # nil = allow all (for backwards compat)

  @logger = (defined?(Rails) && Rails.respond_to?(:logger) && Rails.logger) || Logger.new($stdout)
  @log_format = :text
  @error_reporters = []

  @listen_notify = true

  @worker_notify_wakeup = nil
  @worker_notify_host = nil
  @worker_notify_port = nil
  @worker_notify_database_url = nil

  @pgmq_schema_mode = :auto

  @event_consumers = nil

  @recurring_tasks = nil
  @recurring_schedule_interval = 1.0
  @recurring_tasks_file = nil
  @recurring_tasks_files = nil
  @skip_recurring = false
  @recurring_execution_retention = 7 * 24 * 3600 # 7 days

  @zombie_detection = true

  @stats_enabled = true
  @stats_retention = 30 * 24 * 3600 # 30 days

  @connects_to = nil

  @web_auth = nil
  @web_refresh_interval = 5000
  @web_per_page = 25
  @web_live_updates = true
  @web_data_source = nil
  @insights_default_minutes = 30 * 24 * 60 # 30 days
  @base_controller_class = "::ActionController::Base"
  @return_to_app_url = nil
  @metrics_enabled = true

  @streams_enabled = true
  @streams_path = nil
  @streams_queue_prefix = "pgbus_stream"
  # Streamer-only connection overrides. The Streamer's Listener owns a
  # dedicated long-lived `wait_for_notify` PG connection that can't go
  # through a PgBouncer in transaction mode (LISTEN/NOTIFY don't survive
  # transaction-pool COMMIT boundaries — see PlanetScale's docs). Setting
  # any of these overrides only the Streamer's connection options; the
  # worker, dispatcher, and client publish paths keep using the regular
  # `database_url` / `connection_params` (typically pooled).
  #
  #   streams_host          — override host only
  #   streams_port          — override port only (most common case:
  #                           pooler is on 6432, direct is 5432)
  #   streams_database_url  — full URL override; takes precedence over
  #                           the host/port surgicals when set
  @streams_host = nil
  @streams_port = nil
  @streams_database_url = nil
  @streams_signed_name_secret = nil
  @streams_default_retention = 5 * 60 # 5 minutes
  @streams_retention = {}
  @streams_heartbeat_interval = 15
  @streams_max_connections = 2_000
  @streams_idle_timeout = 3_600 # 1 hour
  # 250ms — this value plays two roles: (1) the TCP keepalive
  # interval for the streamer's PG LISTEN connection, and (2) the
  # upper bound on how long Dispatcher#handle_connect waits for
  # the Listener to acknowledge a synchronous ensure_listening
  # call. 5s was unbounded enough to drop messages on a
  # realistic subscribe burst; 250ms keeps the connect-path race
  # window tight while still leaving headroom over a typical
  # PG keepalive interval.
  @streams_listen_health_check_ms = 250
  @streams_write_deadline_ms = 5_000
  @streams_falcon_streaming_body = false
  # Opt-in: when true, the Dispatcher writes one row to
  # pgbus_stream_stats per broadcast/connect/disconnect. Default
  # off because stream event volume can be much higher than job
  # volume and the Insights surface is only useful if operators
  # actually look at it. Separate from #stats_enabled (which
  # gates pgbus_job_stats recording) on purpose — operators
  # usually want job stats on and stream stats off, or vice versa.
  @streams_stats_enabled = false
  @streams_test_mode = false
  @streams_default_broadcast_mode = :ephemeral
  @streams_orphan_sweep_interval = 3600    # 1 hour
  @streams_orphan_threshold = 86_400       # 24 hours
  @streams_durable_patterns = []
  # Streams matching these patterns get connection-driven presence:
  # auto-join on SSE connect, auto-leave on disconnect, touch on the
  # keepalive heartbeat (issue #169). Empty by default (opt-in).
  @streams_presence_patterns = []
  # Extracts a presence member { id:, metadata: } from a connection's
  # authorize-hook context. Defaults to nil, which uses the built-in
  # extractor (see #presence_member_for): a Hash with :member_id/:id,
  # or an object responding to #id.
  @streams_presence_member = nil

  # AppSignal: auto-on when the appsignal gem is loaded; probe runs in
  # the same process, so the operator can disable it independently.
  @appsignal_enabled = true
  @appsignal_probe_enabled = true
end

Instance Attribute Details

#allowed_global_id_modelsObject

Event bus



69
70
71
# File 'lib/pgbus/configuration.rb', line 69

def allowed_global_id_models
  @allowed_global_id_models
end

#appsignal_enabledObject

AppSignal integration (auto-loaded when ::Appsignal is defined and this is true). Set to false to opt out without uninstalling the appsignal gem.



136
137
138
# File 'lib/pgbus/configuration.rb', line 136

def appsignal_enabled
  @appsignal_enabled
end

#appsignal_probe_enabledObject

AppSignal integration (auto-loaded when ::Appsignal is defined and this is true). Set to false to opt out without uninstalling the appsignal gem.



136
137
138
# File 'lib/pgbus/configuration.rb', line 136

def appsignal_probe_enabled
  @appsignal_probe_enabled
end

#archive_retentionObject

Archive compaction. Only the user-facing retention window is configurable; the loop interval and batch size are tuned via constants on Pgbus::Process::Dispatcher.



62
63
64
# File 'lib/pgbus/configuration.rb', line 62

def archive_retention
  @archive_retention
end

#base_controller_classObject

Web dashboard



110
111
112
# File 'lib/pgbus/configuration.rb', line 110

def base_controller_class
  @base_controller_class
end

#circuit_breaker_enabledObject

Circuit breaker. Only ‘enabled` is user-facing — the trip threshold and backoff curve are tuned via constants on Pgbus::CircuitBreaker because they are implementation details that have never been worth exposing.



41
42
43
# File 'lib/pgbus/configuration.rb', line 41

def circuit_breaker_enabled
  @circuit_breaker_enabled
end

#connection_paramsObject

Connection settings



8
9
10
# File 'lib/pgbus/configuration.rb', line 8

def connection_params
  @connection_params
end

#connects_toObject

Multi-database support (optional separate database for pgbus tables) Set to { database: { writing: :pgbus, reading: :pgbus } } to use a separate database. Requires a matching entry in config/database.yml under the “pgbus” key.



99
100
101
# File 'lib/pgbus/configuration.rb', line 99

def connects_to
  @connects_to
end

#database_urlObject

Connection settings



8
9
10
# File 'lib/pgbus/configuration.rb', line 8

def database_url
  @database_url
end

#default_priorityObject

Priority queues



51
52
53
# File 'lib/pgbus/configuration.rb', line 51

def default_priority
  @default_priority
end

#default_queueObject

Queue settings



11
12
13
# File 'lib/pgbus/configuration.rb', line 11

def default_queue
  @default_queue
end

#dispatch_intervalObject

Dispatcher settings



36
37
38
# File 'lib/pgbus/configuration.rb', line 36

def dispatch_interval
  @dispatch_interval
end

#error_reportersObject

Error reporting — array of callable objects invoked on caught exceptions. Each receives (exception, context_hash) or (exception, context_hash, config).



78
79
80
# File 'lib/pgbus/configuration.rb', line 78

def error_reporters
  @error_reporters
end

#event_consumersObject

Event consumers



89
90
91
# File 'lib/pgbus/configuration.rb', line 89

def event_consumers
  @event_consumers
end

#execution_modeObject

Worker settings



14
15
16
# File 'lib/pgbus/configuration.rb', line 14

def execution_mode
  @execution_mode
end

#group_modeObject

Grouped reads (PGMQ v1.11.0+ FIFO grouping). nil = disabled (default read_batch behavior). :fifo = use read_grouped (drains oldest group first, throughput-optimized). :round_robin = use read_grouped_rr (fair round-robin across groups).



57
58
59
# File 'lib/pgbus/configuration.rb', line 57

def group_mode
  @group_mode
end

#idempotency_ttlObject

rubocop:disable Style/AccessorGrouping



70
71
72
# File 'lib/pgbus/configuration.rb', line 70

def idempotency_ttl
  @idempotency_ttl
end

#insights_default_minutesObject

Web dashboard



110
111
112
# File 'lib/pgbus/configuration.rb', line 110

def insights_default_minutes
  @insights_default_minutes
end

#listen_notifyObject

LISTEN/NOTIFY. Only the on/off switch is user-facing — the throttle interval is a Postgres-side tuning knob that lives as a constant on Pgbus::Client (NOTIFY_THROTTLE_MS).



83
84
85
# File 'lib/pgbus/configuration.rb', line 83

def listen_notify
  @listen_notify
end

#log_formatObject

rubocop:disable Style/AccessorGrouping



74
75
76
# File 'lib/pgbus/configuration.rb', line 74

def log_format
  @log_format
end

#loggerObject

Logging



73
74
75
# File 'lib/pgbus/configuration.rb', line 73

def logger
  @logger
end

#max_jobs_per_workerObject

Worker recycling



27
28
29
# File 'lib/pgbus/configuration.rb', line 27

def max_jobs_per_worker
  @max_jobs_per_worker
end

#max_memory_mbObject

Worker recycling



27
28
29
# File 'lib/pgbus/configuration.rb', line 27

def max_memory_mb
  @max_memory_mb
end

#max_retriesObject

Dead letter queue



44
45
46
# File 'lib/pgbus/configuration.rb', line 44

def max_retries
  @max_retries
end

#max_worker_lifetimeObject

Worker recycling



27
28
29
# File 'lib/pgbus/configuration.rb', line 27

def max_worker_lifetime
  @max_worker_lifetime
end

#metrics_enabledObject

Web dashboard



110
111
112
# File 'lib/pgbus/configuration.rb', line 110

def metrics_enabled
  @metrics_enabled
end

#outbox_batch_sizeObject

Transactional outbox



65
66
67
# File 'lib/pgbus/configuration.rb', line 65

def outbox_batch_size
  @outbox_batch_size
end

#outbox_enabledObject

Transactional outbox



65
66
67
# File 'lib/pgbus/configuration.rb', line 65

def outbox_enabled
  @outbox_enabled
end

#outbox_poll_intervalObject

Transactional outbox



65
66
67
# File 'lib/pgbus/configuration.rb', line 65

def outbox_poll_interval
  @outbox_poll_interval
end

#outbox_retentionObject

rubocop:disable Style/AccessorGrouping



66
67
68
# File 'lib/pgbus/configuration.rb', line 66

def outbox_retention
  @outbox_retention
end

#pgmq_schema_modeObject

PGMQ schema installation mode (:auto, :extension, :embedded)



86
87
88
# File 'lib/pgbus/configuration.rb', line 86

def pgmq_schema_mode
  @pgmq_schema_mode
end

#polling_intervalObject

Worker settings



14
15
16
# File 'lib/pgbus/configuration.rb', line 14

def polling_interval
  @polling_interval
end

#pool_sizeObject

Connection settings



8
9
10
# File 'lib/pgbus/configuration.rb', line 8

def pool_size
  @pool_size
end

#pool_timeoutObject

Connection settings



8
9
10
# File 'lib/pgbus/configuration.rb', line 8

def pool_timeout
  @pool_timeout
end

#prefetch_limitObject

Worker settings



14
15
16
# File 'lib/pgbus/configuration.rb', line 14

def prefetch_limit
  @prefetch_limit
end

#priority_levelsObject

Priority queues



51
52
53
# File 'lib/pgbus/configuration.rb', line 51

def priority_levels
  @priority_levels
end

#queue_prefixObject

Queue settings



11
12
13
# File 'lib/pgbus/configuration.rb', line 11

def queue_prefix
  @queue_prefix
end

#read_timeoutObject

Liveness probe: supervisor kills a worker whose claim loop has not advanced for longer than stall_threshold seconds (default 90). read_timeout caps how long a single PGMQ read can block (default 30s), so a dead socket raises instead of parking the loop forever.



33
34
35
# File 'lib/pgbus/configuration.rb', line 33

def read_timeout
  @read_timeout
end

#recurring_execution_retentionObject

rubocop:disable Style/AccessorGrouping



94
95
96
# File 'lib/pgbus/configuration.rb', line 94

def recurring_execution_retention
  @recurring_execution_retention
end

#recurring_schedule_intervalObject

Recurring jobs



92
93
94
# File 'lib/pgbus/configuration.rb', line 92

def recurring_schedule_interval
  @recurring_schedule_interval
end

#recurring_tasksObject

Recurring jobs



92
93
94
# File 'lib/pgbus/configuration.rb', line 92

def recurring_tasks
  @recurring_tasks
end

#recurring_tasks_fileObject

Recurring jobs



92
93
94
# File 'lib/pgbus/configuration.rb', line 92

def recurring_tasks_file
  @recurring_tasks_file
end

#recurring_tasks_filesObject



662
663
664
665
666
# File 'lib/pgbus/configuration.rb', line 662

def recurring_tasks_files
  return @recurring_tasks_files if @recurring_tasks_files

  recurring_tasks_file ? [recurring_tasks_file] : nil
end

#retry_backoffObject

Retry backoff for the VT-based retry path (unhandled exceptions). Jobs can override these per-class via Pgbus::RetryBackoff::JobMixin.



48
49
50
# File 'lib/pgbus/configuration.rb', line 48

def retry_backoff
  @retry_backoff
end

#retry_backoff_jitterObject

Retry backoff for the VT-based retry path (unhandled exceptions). Jobs can override these per-class via Pgbus::RetryBackoff::JobMixin.



48
49
50
# File 'lib/pgbus/configuration.rb', line 48

def retry_backoff_jitter
  @retry_backoff_jitter
end

#retry_backoff_maxObject

Retry backoff for the VT-based retry path (unhandled exceptions). Jobs can override these per-class via Pgbus::RetryBackoff::JobMixin.



48
49
50
# File 'lib/pgbus/configuration.rb', line 48

def retry_backoff_max
  @retry_backoff_max
end

#return_to_app_urlObject

Web dashboard



110
111
112
# File 'lib/pgbus/configuration.rb', line 110

def return_to_app_url
  @return_to_app_url
end

#rolesObject

Supervisor role selection. nil = boot all roles (default behavior). Array of role symbols = boot only the listed roles. Set via the CLI flags –workers-only / –scheduler-only / –dispatcher-only, or directly in an initializer for advanced cases.



22
23
24
# File 'lib/pgbus/configuration.rb', line 22

def roles
  @roles
end

#skip_recurringObject

Recurring jobs



92
93
94
# File 'lib/pgbus/configuration.rb', line 92

def skip_recurring
  @skip_recurring
end

#stall_thresholdObject

Liveness probe: supervisor kills a worker whose claim loop has not advanced for longer than stall_threshold seconds (default 90). read_timeout caps how long a single PGMQ read can block (default 30s), so a dead socket raises instead of parking the loop forever.



33
34
35
# File 'lib/pgbus/configuration.rb', line 33

def stall_threshold
  @stall_threshold
end

#stats_enabledObject

Job stats



106
107
108
# File 'lib/pgbus/configuration.rb', line 106

def stats_enabled
  @stats_enabled
end

#stats_retentionObject

rubocop:disable Style/AccessorGrouping



107
108
109
# File 'lib/pgbus/configuration.rb', line 107

def stats_retention
  @stats_retention
end

#streams_database_urlObject

Streams (turbo-rails replacement, SSE-based)



115
116
117
# File 'lib/pgbus/configuration.rb', line 115

def streams_database_url
  @streams_database_url
end

#streams_default_broadcast_modeObject

rubocop:disable Style/AccessorGrouping



124
125
126
# File 'lib/pgbus/configuration.rb', line 124

def streams_default_broadcast_mode
  @streams_default_broadcast_mode
end

#streams_default_retentionObject

Streams (turbo-rails replacement, SSE-based)



115
116
117
# File 'lib/pgbus/configuration.rb', line 115

def streams_default_retention
  @streams_default_retention
end

#streams_durable_patternsObject

Streams (turbo-rails replacement, SSE-based)



115
116
117
# File 'lib/pgbus/configuration.rb', line 115

def streams_durable_patterns
  @streams_durable_patterns
end

#streams_enabledObject

Streams (turbo-rails replacement, SSE-based)



115
116
117
# File 'lib/pgbus/configuration.rb', line 115

def streams_enabled
  @streams_enabled
end

#streams_falcon_streaming_bodyObject

Streams (turbo-rails replacement, SSE-based)



115
116
117
# File 'lib/pgbus/configuration.rb', line 115

def streams_falcon_streaming_body
  @streams_falcon_streaming_body
end

#streams_heartbeat_intervalObject

Streams (turbo-rails replacement, SSE-based)



115
116
117
# File 'lib/pgbus/configuration.rb', line 115

def streams_heartbeat_interval
  @streams_heartbeat_interval
end

#streams_hostObject

Streams (turbo-rails replacement, SSE-based)



115
116
117
# File 'lib/pgbus/configuration.rb', line 115

def streams_host
  @streams_host
end

#streams_idle_timeoutObject

Streams (turbo-rails replacement, SSE-based)



115
116
117
# File 'lib/pgbus/configuration.rb', line 115

def streams_idle_timeout
  @streams_idle_timeout
end

#streams_listen_health_check_msObject

Streams (turbo-rails replacement, SSE-based)



115
116
117
# File 'lib/pgbus/configuration.rb', line 115

def streams_listen_health_check_ms
  @streams_listen_health_check_ms
end

#streams_max_connectionsObject

Streams (turbo-rails replacement, SSE-based)



115
116
117
# File 'lib/pgbus/configuration.rb', line 115

def streams_max_connections
  @streams_max_connections
end

#streams_orphan_sweep_intervalObject

Streams (turbo-rails replacement, SSE-based)



115
116
117
# File 'lib/pgbus/configuration.rb', line 115

def streams_orphan_sweep_interval
  @streams_orphan_sweep_interval
end

#streams_orphan_thresholdObject

Streams (turbo-rails replacement, SSE-based)



115
116
117
# File 'lib/pgbus/configuration.rb', line 115

def streams_orphan_threshold
  @streams_orphan_threshold
end

#streams_pathObject

Streams (turbo-rails replacement, SSE-based)



115
116
117
# File 'lib/pgbus/configuration.rb', line 115

def streams_path
  @streams_path
end

#streams_portObject

Streams (turbo-rails replacement, SSE-based)



115
116
117
# File 'lib/pgbus/configuration.rb', line 115

def streams_port
  @streams_port
end

#streams_presence_memberObject

Streams (turbo-rails replacement, SSE-based)



115
116
117
# File 'lib/pgbus/configuration.rb', line 115

def streams_presence_member
  @streams_presence_member
end

#streams_presence_patternsObject

Streams (turbo-rails replacement, SSE-based)



115
116
117
# File 'lib/pgbus/configuration.rb', line 115

def streams_presence_patterns
  @streams_presence_patterns
end

#streams_queue_prefixObject

Streams (turbo-rails replacement, SSE-based)



115
116
117
# File 'lib/pgbus/configuration.rb', line 115

def streams_queue_prefix
  @streams_queue_prefix
end

#streams_retentionObject

Streams (turbo-rails replacement, SSE-based)



115
116
117
# File 'lib/pgbus/configuration.rb', line 115

def streams_retention
  @streams_retention
end

#streams_signed_name_secretObject

Streams (turbo-rails replacement, SSE-based)



115
116
117
# File 'lib/pgbus/configuration.rb', line 115

def streams_signed_name_secret
  @streams_signed_name_secret
end

#streams_stats_enabledObject

Streams (turbo-rails replacement, SSE-based)



115
116
117
# File 'lib/pgbus/configuration.rb', line 115

def streams_stats_enabled
  @streams_stats_enabled
end

#streams_test_modeObject

Streams (turbo-rails replacement, SSE-based)



115
116
117
# File 'lib/pgbus/configuration.rb', line 115

def streams_test_mode
  @streams_test_mode
end

#streams_write_deadline_msObject

Streams (turbo-rails replacement, SSE-based)



115
116
117
# File 'lib/pgbus/configuration.rb', line 115

def streams_write_deadline_ms
  @streams_write_deadline_ms
end

#visibility_timeoutObject

rubocop:disable Style/AccessorGrouping



15
16
17
# File 'lib/pgbus/configuration.rb', line 15

def visibility_timeout
  @visibility_timeout
end

#web_authObject

Web dashboard



110
111
112
# File 'lib/pgbus/configuration.rb', line 110

def web_auth
  @web_auth
end

#web_data_sourceObject

Web dashboard



110
111
112
# File 'lib/pgbus/configuration.rb', line 110

def web_data_source
  @web_data_source
end

#web_live_updatesObject

Web dashboard



110
111
112
# File 'lib/pgbus/configuration.rb', line 110

def web_live_updates
  @web_live_updates
end

#web_per_pageObject

Web dashboard



110
111
112
# File 'lib/pgbus/configuration.rb', line 110

def web_per_page
  @web_per_page
end

#web_refresh_intervalObject

Web dashboard



110
111
112
# File 'lib/pgbus/configuration.rb', line 110

def web_refresh_interval
  @web_refresh_interval
end

#worker_notify_database_urlObject

NOTIFY-gated worker wakeups. When true, each Worker fork owns a dedicated NotifyListener PG connection that LISTENs on its queues’ INSERT channels and wakes the loop on a real insert. Defaults to the value of listen_notify. The worker_notify_* overrides mirror streams_* so the LISTEN connection can bypass PgBouncer.



131
132
133
# File 'lib/pgbus/configuration.rb', line 131

def worker_notify_database_url
  @worker_notify_database_url
end

#worker_notify_hostObject

NOTIFY-gated worker wakeups. When true, each Worker fork owns a dedicated NotifyListener PG connection that LISTENs on its queues’ INSERT channels and wakes the loop on a real insert. Defaults to the value of listen_notify. The worker_notify_* overrides mirror streams_* so the LISTEN connection can bypass PgBouncer.



131
132
133
# File 'lib/pgbus/configuration.rb', line 131

def worker_notify_host
  @worker_notify_host
end

#worker_notify_portObject

NOTIFY-gated worker wakeups. When true, each Worker fork owns a dedicated NotifyListener PG connection that LISTENs on its queues’ INSERT channels and wakes the loop on a real insert. Defaults to the value of listen_notify. The worker_notify_* overrides mirror streams_* so the LISTEN connection can bypass PgBouncer.



131
132
133
# File 'lib/pgbus/configuration.rb', line 131

def worker_notify_port
  @worker_notify_port
end

#worker_notify_wakeupObject

NOTIFY-gated worker wakeups. When true, each Worker fork owns a dedicated NotifyListener PG connection that LISTENs on its queues’ INSERT channels and wakes the loop on a real insert. Defaults to the value of listen_notify. The worker_notify_* overrides mirror streams_* so the LISTEN connection can bypass PgBouncer.



131
132
133
# File 'lib/pgbus/configuration.rb', line 131

def worker_notify_wakeup
  @worker_notify_wakeup
end

#workersObject

rubocop:disable Style/AccessorGrouping



15
16
17
# File 'lib/pgbus/configuration.rb', line 15

def workers
  @workers
end

#zombie_detectionObject

Zombie message detection — logs a warning when a message is redelivered (read_ct > 1) without any prior failure recorded in pgbus_failed_events.



103
104
105
# File 'lib/pgbus/configuration.rb', line 103

def zombie_detection
  @zombie_detection
end

Instance Method Details

#capsule(name, queues:, threads:) ⇒ Object

Define a named capsule and append it to the workers list.

c.capsule :critical, queues: %w[critical], threads: 5
c.capsule :gated, queues: %w[gated], threads: 1, single_active_consumer: true

Names must be unique. Queues must not overlap with capsules already defined (would cause double-processing). Composes with the string DSL —+c.workers “…”+ followed by c.capsule :name, … appends the named capsule to the list parsed from the string.

Raises:

  • (ArgumentError)


568
569
570
571
572
573
574
575
576
577
578
579
580
# File 'lib/pgbus/configuration.rb', line 568

def capsule(name, queues:, threads:, **)
  raise ArgumentError, "capsule queues must be a non-empty Array" unless queues.is_a?(Array) && queues.any?
  raise ArgumentError, "capsule threads must be a positive Integer" unless threads.is_a?(Integer) && threads.positive?

  normalized_name = name.to_s
  @workers ||= []

  raise ArgumentError, "capsule #{name.inspect} is already defined" if @workers.any? { |c| capsule_name(c) == normalized_name }

  validate_no_queue_overlap!(queues)

  @workers << { name: normalized_name, queues: queues, threads: threads, ** }
end

#capsule_named(name) ⇒ Object

Look up a capsule by its name. Accepts symbol or string. Returns the matching Hash, or nil. Used by the CLI’s –capsule selector.



584
585
586
587
588
589
# File 'lib/pgbus/configuration.rb', line 584

def capsule_named(name)
  return nil unless @workers

  key = name.to_s
  @workers.find { |c| capsule_name(c) == key }
end

#connection_optionsObject



708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
# File 'lib/pgbus/configuration.rb', line 708

def connection_options
  if database_url
    database_url
  elsif connection_params
    connection_params
  elsif defined?(ActiveRecord::Base)
    # Extract connection config from ActiveRecord so pgmq-ruby creates its
    # own dedicated PG connections. Sharing AR's raw_connection via a Proc
    # is NOT thread-safe: the ConnectionPool caches the PG::Connection from
    # whichever thread first called the Proc, then hands that same object to
    # other threads — causing nil results, segfaults, and data corruption
    # when AR and PGMQ issue concurrent queries on the same connection.
    extract_ar_connection_hash
  else
    raise ConfigurationError, "No database connection configured. " \
                              "Set Pgbus.configuration.database_url, connection_params, or use with Rails."
  end
end

#dead_letter_queue_name(name) ⇒ Object



294
295
296
# File 'lib/pgbus/configuration.rb', line 294

def dead_letter_queue_name(name)
  "#{queue_name(name)}#{Pgbus::DEAD_LETTER_SUFFIX}"
end

#execution_mode_for(worker_config) ⇒ Object

Returns the execution mode for a specific worker config hash, falling back to the global execution_mode setting.



310
311
312
313
# File 'lib/pgbus/configuration.rb', line 310

def execution_mode_for(worker_config)
  mode = worker_config[:execution_mode] || worker_config["execution_mode"] || execution_mode
  ExecutionPools.normalize_mode(mode)
end

#presence_member_for(context) ⇒ Object

Derives a presence member { id:, metadata: } from a connection’s authorize-hook context, or nil when no member can be derived (an anonymous connection — presence is simply skipped). Uses ‘streams_presence_member` when configured; otherwise the built-in extractor handles the common shapes:

- Hash with :member_id (or :id) and optional :metadata
- an object responding to #id (e.g. a User model)

The id is always coerced to a String and metadata defaults to {}.



492
493
494
495
496
497
498
499
500
501
502
# File 'lib/pgbus/configuration.rb', line 492

def presence_member_for(context)
  return nil if context.nil?

  raw = streams_presence_member ? streams_presence_member.call(context) : default_presence_member(context)
  return nil unless raw.is_a?(Hash)

  id = raw[:id]
  return nil if id.nil? || id.to_s.empty?

  { id: id.to_s, metadata: raw[:metadata] || {} }
end

#priority_queue_name(name, priority) ⇒ Object



298
299
300
# File 'lib/pgbus/configuration.rb', line 298

def priority_queue_name(name, priority)
  "#{queue_name(name)}_p#{priority}"
end

#priority_queue_names(name) ⇒ Object



302
303
304
305
306
# File 'lib/pgbus/configuration.rb', line 302

def priority_queue_names(name)
  return [queue_name(name)] unless priority_levels && priority_levels > 1

  (0...priority_levels).map { |p| priority_queue_name(name, p) }
end

#queue_name(name) ⇒ Object



289
290
291
292
# File 'lib/pgbus/configuration.rb', line 289

def queue_name(name)
  full = "#{queue_prefix}_#{name}"
  QueueNameValidator.normalize(full)
end

#resolved_pool_sizeObject



695
696
697
698
699
700
701
702
703
704
705
706
# File 'lib/pgbus/configuration.rb', line 695

def resolved_pool_size
  return pool_size if pool_size

  total = 0
  total += sum_thread_counts(workers, default_threads: 5, group: "worker") if role_enabled?(:workers)
  total += sum_thread_counts(event_consumers, default_threads: 3, group: "event_consumer") if role_enabled?(:consumers)
  total += 1 if role_enabled?(:dispatcher)
  total += 1 if role_enabled?(:scheduler)

  warn_if_oversized(total)
  total
end

#role_enabled?(role) ⇒ Boolean

Returns true if the given role should be booted by the supervisor. When roles is nil (the default), every role is enabled — this matches the legacy single-process behavior. When roles is set (e.g. via the CLI’s –workers-only / –scheduler-only / –dispatcher-only flags), only the listed roles boot.

Accepts symbol or string for case-insensitive comparison.

Returns:

  • (Boolean)


598
599
600
601
602
# File 'lib/pgbus/configuration.rb', line 598

def role_enabled?(role)
  return true if @roles.nil?

  @roles.include?(role.to_s.downcase.to_sym)
end

#stream_durable?(name) ⇒ Boolean

Returns true if the given stream name should be durable based on ‘streams_durable_patterns` (exact string or Regexp match) or the `streams_default_broadcast_mode` fallback.

Returns:

  • (Boolean)


468
469
470
471
472
473
# File 'lib/pgbus/configuration.rb', line 468

def stream_durable?(name)
  patterns = streams_durable_patterns || []
  return true if patterns.any? { |p| p.is_a?(Regexp) ? p.match?(name) : p == name }

  streams_default_broadcast_mode == :durable
end

#stream_presence?(name) ⇒ Boolean

Returns true if the given stream name should have connection-driven presence based on ‘streams_presence_patterns` (exact string or Regexp match). Presence is opt-in, so the default (no patterns) is false. See issue #169.

Returns:

  • (Boolean)


479
480
481
482
# File 'lib/pgbus/configuration.rb', line 479

def stream_presence?(name)
  patterns = streams_presence_patterns || []
  patterns.any? { |p| p.is_a?(Regexp) ? p.match?(name) : p == name }
end

#streams_connection_optionsObject

Connection options the Streamer’s dedicated LISTEN/NOTIFY PG connection should use. Defaults to ‘connection_options` (same as workers and the publish path). If any of `streams_database_url`, `streams_host`, or `streams_port` is set, the Streamer’s connection is reconfigured —everything else keeps using the base options.

The typical use is “workers go through PgBouncer, streamer goes direct”:

c.connects_to = { database: { writing: :pgbus } }   # pooler
c.streams_port = 5432                               # direct

Precedence: streams_database_url > streams_host/port override > base.



739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
# File 'lib/pgbus/configuration.rb', line 739

def streams_connection_options
  return streams_database_url if streams_database_url

  base = connection_options
  return base unless streams_host || streams_port

  case base
  when Hash
    result = base.dup
    result[:host] = streams_host if streams_host
    result[:port] = streams_port if streams_port
    result
  when String
    # libpq's conninfo parser takes later key=value pairs as overrides
    # for earlier ones, so we just append. Handles both URI form
    # (postgres://...) and key=value form.
    parts = [base]
    parts << "host=#{streams_host}" if streams_host
    parts << "port=#{streams_port}" if streams_port
    parts.join(" ")
  else
    base
  end
end

#validate!Object

Raises:

  • (ArgumentError)


371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
# File 'lib/pgbus/configuration.rb', line 371

def validate!
  if pool_size && !(pool_size.is_a?(Numeric) && pool_size.positive?)
    raise ArgumentError, "pool_size must be a positive number or nil (auto-tune)"
  end

  raise ArgumentError, "pool_timeout must be > 0" unless pool_timeout.is_a?(Numeric) && pool_timeout.positive?
  raise ArgumentError, "polling_interval must be > 0" unless polling_interval.is_a?(Numeric) && polling_interval.positive?
  raise ArgumentError, "visibility_timeout must be > 0" unless visibility_timeout.is_a?(Numeric) && visibility_timeout.positive?
  raise ArgumentError, "max_retries must be >= 0" unless max_retries.is_a?(Integer) && max_retries >= 0
  raise ArgumentError, "retry_backoff must be > 0" unless retry_backoff.is_a?(Numeric) && retry_backoff.positive?
  raise ArgumentError, "retry_backoff_max must be > 0" unless retry_backoff_max.is_a?(Numeric) && retry_backoff_max.positive?
  unless retry_backoff_jitter.is_a?(Numeric) && retry_backoff_jitter >= 0 && retry_backoff_jitter <= 1
    raise ArgumentError, "retry_backoff_jitter must be between 0 and 1"
  end

  unless stall_threshold.nil? || (stall_threshold.is_a?(Numeric) && stall_threshold.positive?)
    raise ArgumentError, "stall_threshold must be a positive number or nil to disable"
  end
  unless read_timeout.nil? || (read_timeout.is_a?(Numeric) && read_timeout.positive?)
    raise ArgumentError, "read_timeout must be a positive number or nil to disable"
  end

  # Validate global execution_mode
  ExecutionPools.normalize_mode(execution_mode)

  Array(workers).each do |w|
    threads = w[:threads] || w["threads"] || 5
    raise ArgumentError, "worker threads must be > 0" unless threads.is_a?(Integer) && threads.positive?

    # Validate per-worker execution_mode override if present
    mode = w[:execution_mode] || w["execution_mode"]
    ExecutionPools.normalize_mode(mode) if mode
  end

  raise ArgumentError, "prefetch_limit must be > 0" if prefetch_limit && !(prefetch_limit.is_a?(Integer) && prefetch_limit.positive?)

  if priority_levels && !(priority_levels.is_a?(Integer) && priority_levels >= 1 && priority_levels <= 10)
    raise ArgumentError, "priority_levels must be an integer between 1 and 10"
  end

  unless insights_default_minutes.is_a?(Integer) && insights_default_minutes.positive?
    raise ArgumentError, "insights_default_minutes must be a positive integer"
  end

  validate_streams!

  self
end

#validate_streams!Object

Raises:

  • (ArgumentError)


420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
# File 'lib/pgbus/configuration.rb', line 420

def validate_streams!
  unless streams_default_retention.is_a?(Numeric) && streams_default_retention >= 0
    raise ArgumentError, "streams_default_retention must be a non-negative number"
  end

  unless streams_max_connections.is_a?(Integer) && streams_max_connections.positive?
    raise ArgumentError, "streams_max_connections must be a positive integer"
  end

  unless streams_heartbeat_interval.is_a?(Numeric) && streams_heartbeat_interval.positive?
    raise ArgumentError, "streams_heartbeat_interval must be a positive number"
  end

  unless streams_idle_timeout.is_a?(Numeric) && streams_idle_timeout.positive?
    raise ArgumentError, "streams_idle_timeout must be a positive number"
  end

  unless streams_listen_health_check_ms.is_a?(Integer) && streams_listen_health_check_ms.positive?
    raise ArgumentError, "streams_listen_health_check_ms must be a positive integer"
  end

  unless streams_write_deadline_ms.is_a?(Integer) && streams_write_deadline_ms.positive?
    raise ArgumentError, "streams_write_deadline_ms must be a positive integer"
  end

  raise ArgumentError, "streams_retention must be a Hash" unless streams_retention.is_a?(Hash)

  if streams_orphan_sweep_interval && !(streams_orphan_sweep_interval.is_a?(Numeric) && streams_orphan_sweep_interval.positive?)
    raise ArgumentError, "streams_orphan_sweep_interval must be a positive number or nil to disable"
  end

  raise ArgumentError, "streams_durable_patterns must be an Array of strings/regex" unless streams_durable_patterns.is_a?(Array)

  raise ArgumentError, "streams_presence_patterns must be an Array of strings/regex" unless streams_presence_patterns.is_a?(Array)

  if !streams_presence_member.nil? && !streams_presence_member.respond_to?(:call)
    raise ArgumentError, "streams_presence_member must respond to #call (a Proc/lambda) or be nil"
  end

  return if streams_orphan_threshold.nil?
  return if streams_orphan_threshold.is_a?(Numeric) && streams_orphan_threshold.positive?

  raise ArgumentError, "streams_orphan_threshold must be a positive number or nil to disable"
end

#worker_notify_connection_optionsObject

Connection options for the Worker’s dedicated NotifyListener connection. Mirrors streams_connection_options: defaults to the base connection_options, overridable via worker_notify_database_url / worker_notify_host / worker_notify_port so the LISTEN connection can bypass PgBouncer.



768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
# File 'lib/pgbus/configuration.rb', line 768

def worker_notify_connection_options
  return worker_notify_database_url if worker_notify_database_url

  base = connection_options
  return base unless worker_notify_host || worker_notify_port

  case base
  when Hash
    result = base.dup
    result[:host] = worker_notify_host if worker_notify_host
    result[:port] = worker_notify_port if worker_notify_port
    result
  when String
    parts = [base]
    parts << "host=#{worker_notify_host}" if worker_notify_host
    parts << "port=#{worker_notify_port}" if worker_notify_port
    parts.join(" ")
  else
    base
  end
end

#worker_notify_wakeup?Boolean

Resolved notify wakeup flag: defaults to listen_notify when nil.

Returns:

  • (Boolean)


791
792
793
794
795
796
797
# File 'lib/pgbus/configuration.rb', line 791

def worker_notify_wakeup?
  if @worker_notify_wakeup.nil?
    listen_notify
  else
    @worker_notify_wakeup
  end
end