Class: Pgbus::Configuration

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/configuration.rb,
lib/pgbus/configuration/capsule_dsl.rb

Defined Under Namespace

Classes: CapsuleDSL

Constant Summary collapse

VALID_ROLES =
%i[workers dispatcher scheduler consumers outbox].freeze
VALID_LOG_FORMATS =
%i[text json].freeze
VALID_BROADCAST_MODES =
%i[ephemeral durable].freeze
VALID_GROUP_MODES =
[nil, :fifo, :round_robin].freeze
VALID_PGMQ_SCHEMA_MODES =
%i[auto extension embedded].freeze
POOL_SIZE_WARN_THRESHOLD =

Returns the connection pool size to use for the PGMQ client.

If pool_size was explicitly set, returns that value unchanged. Otherwise auto-derives from the threads needed by the roles this process actually runs (respects Configuration#roles from –workers-only / –scheduler-only / –dispatcher-only):

workers role      sum(workers.threads)
consumers role    sum(event_consumers.threads)
dispatcher role   +1
scheduler role    +1

A –scheduler-only deployment that has 50 worker threads configured only needs 1 connection (for the scheduler), not 52.

Auto-tune protects users from the common pitfall of running 15 worker threads with a hand-set pool_size of 5 (resulting in ConnectionPool timeouts under load). Setting pool_size explicitly is still supported for advanced cases where you need a tighter or looser pool than the default formula provides.

50
ASYNC_POOL_CONNECTIONS =

Connections needed per async worker: one for the reactor’s serial execution, one for polling, one for headroom. Fibers share connections because only one runs at a time per reactor thread.

3

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeConfiguration

Returns a new instance of Configuration.



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/pgbus/configuration.rb', line 123

def initialize
  @database_url = nil
  @connection_params = nil
  @pool_size = nil
  @pool_timeout = 5

  @default_queue = "default"
  @queue_prefix = "pgbus"

  @workers = [{ queues: %w[default], threads: 5 }]
  @roles = nil
  @polling_interval = 0.1
  @visibility_timeout = 30

  @prefetch_limit = nil
  @execution_mode = :threads

  @max_jobs_per_worker = nil
  @max_memory_mb = nil
  @max_worker_lifetime = nil

  @dispatch_interval = 1.0

  @circuit_breaker_enabled = true

  @max_retries = 5
  @retry_backoff = 5          # seconds — first VT-retry delay
  @retry_backoff_max = 300    # 5 minutes cap
  @retry_backoff_jitter = 0.15

  @priority_levels = nil
  @default_priority = 1
  @group_mode = nil

  @archive_retention = 7 * 24 * 3600 # 7 days

  @outbox_enabled = false
  @outbox_poll_interval = 1.0
  @outbox_batch_size = 100
  @outbox_retention = 24 * 3600 # 1 day

  @idempotency_ttl = 7 * 24 * 3600 # 7 days
  @allowed_global_id_models = nil # nil = allow all (for backwards compat)

  @logger = (defined?(Rails) && Rails.respond_to?(:logger) && Rails.logger) || Logger.new($stdout)
  @log_format = :text
  @error_reporters = []

  @listen_notify = true

  @pgmq_schema_mode = :auto

  @event_consumers = nil

  @recurring_tasks = nil
  @recurring_schedule_interval = 1.0
  @recurring_tasks_file = nil
  @recurring_tasks_files = nil
  @skip_recurring = false
  @recurring_execution_retention = 7 * 24 * 3600 # 7 days

  @zombie_detection = true

  @stats_enabled = true
  @stats_retention = 30 * 24 * 3600 # 30 days

  @connects_to = nil

  @web_auth = nil
  @web_refresh_interval = 5000
  @web_per_page = 25
  @web_live_updates = true
  @web_data_source = nil
  @insights_default_minutes = 30 * 24 * 60 # 30 days
  @base_controller_class = "::ActionController::Base"
  @return_to_app_url = nil
  @metrics_enabled = true

  @streams_enabled = true
  @streams_path = nil
  @streams_queue_prefix = "pgbus_stream"
  # Streamer-only connection overrides. The Streamer's Listener owns a
  # dedicated long-lived `wait_for_notify` PG connection that can't go
  # through a PgBouncer in transaction mode (LISTEN/NOTIFY don't survive
  # transaction-pool COMMIT boundaries — see PlanetScale's docs). Setting
  # any of these overrides only the Streamer's connection options; the
  # worker, dispatcher, and client publish paths keep using the regular
  # `database_url` / `connection_params` (typically pooled).
  #
  #   streams_host          — override host only
  #   streams_port          — override port only (most common case:
  #                           pooler is on 6432, direct is 5432)
  #   streams_database_url  — full URL override; takes precedence over
  #                           the host/port surgicals when set
  @streams_host = nil
  @streams_port = nil
  @streams_database_url = nil
  @streams_signed_name_secret = nil
  @streams_default_retention = 5 * 60 # 5 minutes
  @streams_retention = {}
  @streams_heartbeat_interval = 15
  @streams_max_connections = 2_000
  @streams_idle_timeout = 3_600 # 1 hour
  # 250ms — this value plays two roles: (1) the TCP keepalive
  # interval for the streamer's PG LISTEN connection, and (2) the
  # upper bound on how long Dispatcher#handle_connect waits for
  # the Listener to acknowledge a synchronous ensure_listening
  # call. 5s was unbounded enough to drop messages on a
  # realistic subscribe burst; 250ms keeps the connect-path race
  # window tight while still leaving headroom over a typical
  # PG keepalive interval.
  @streams_listen_health_check_ms = 250
  @streams_write_deadline_ms = 5_000
  @streams_falcon_streaming_body = false
  # Opt-in: when true, the Dispatcher writes one row to
  # pgbus_stream_stats per broadcast/connect/disconnect. Default
  # off because stream event volume can be much higher than job
  # volume and the Insights surface is only useful if operators
  # actually look at it. Separate from #stats_enabled (which
  # gates pgbus_job_stats recording) on purpose — operators
  # usually want job stats on and stream stats off, or vice versa.
  @streams_stats_enabled = false
  @streams_test_mode = false
  @streams_default_broadcast_mode = :ephemeral
  @streams_orphan_sweep_interval = 3600    # 1 hour
  @streams_orphan_threshold = 86_400       # 24 hours
  @streams_durable_patterns = []

  # AppSignal: auto-on when the appsignal gem is loaded; probe runs in
  # the same process, so the operator can disable it independently.
  @appsignal_enabled = true
  @appsignal_probe_enabled = true
end

Instance Attribute Details

#allowed_global_id_modelsObject

Event bus



63
64
65
# File 'lib/pgbus/configuration.rb', line 63

def allowed_global_id_models
  @allowed_global_id_models
end

#appsignal_enabledObject

AppSignal integration (auto-loaded when ::Appsignal is defined and this is true). Set to false to opt out without uninstalling the appsignal gem.



121
122
123
# File 'lib/pgbus/configuration.rb', line 121

def appsignal_enabled
  @appsignal_enabled
end

#appsignal_probe_enabledObject

AppSignal integration (auto-loaded when ::Appsignal is defined and this is true). Set to false to opt out without uninstalling the appsignal gem.



121
122
123
# File 'lib/pgbus/configuration.rb', line 121

def appsignal_probe_enabled
  @appsignal_probe_enabled
end

#archive_retentionObject

Archive compaction. Only the user-facing retention window is configurable; the loop interval and batch size are tuned via constants on Pgbus::Process::Dispatcher.



56
57
58
# File 'lib/pgbus/configuration.rb', line 56

def archive_retention
  @archive_retention
end

#base_controller_classObject

Web dashboard



104
105
106
# File 'lib/pgbus/configuration.rb', line 104

def base_controller_class
  @base_controller_class
end

#circuit_breaker_enabledObject

Circuit breaker. Only ‘enabled` is user-facing — the trip threshold and backoff curve are tuned via constants on Pgbus::CircuitBreaker because they are implementation details that have never been worth exposing.



35
36
37
# File 'lib/pgbus/configuration.rb', line 35

def circuit_breaker_enabled
  @circuit_breaker_enabled
end

#connection_paramsObject

Connection settings



8
9
10
# File 'lib/pgbus/configuration.rb', line 8

def connection_params
  @connection_params
end

#connects_toObject

Multi-database support (optional separate database for pgbus tables) Set to { database: { writing: :pgbus, reading: :pgbus } } to use a separate database. Requires a matching entry in config/database.yml under the “pgbus” key.



93
94
95
# File 'lib/pgbus/configuration.rb', line 93

def connects_to
  @connects_to
end

#database_urlObject

Connection settings



8
9
10
# File 'lib/pgbus/configuration.rb', line 8

def database_url
  @database_url
end

#default_priorityObject

Priority queues



45
46
47
# File 'lib/pgbus/configuration.rb', line 45

def default_priority
  @default_priority
end

#default_queueObject

Queue settings



11
12
13
# File 'lib/pgbus/configuration.rb', line 11

def default_queue
  @default_queue
end

#dispatch_intervalObject

Dispatcher settings



30
31
32
# File 'lib/pgbus/configuration.rb', line 30

def dispatch_interval
  @dispatch_interval
end

#error_reportersObject

Error reporting — array of callable objects invoked on caught exceptions. Each receives (exception, context_hash) or (exception, context_hash, config).



72
73
74
# File 'lib/pgbus/configuration.rb', line 72

def error_reporters
  @error_reporters
end

#event_consumersObject

Event consumers



83
84
85
# File 'lib/pgbus/configuration.rb', line 83

def event_consumers
  @event_consumers
end

#execution_modeObject

Worker settings



14
15
16
# File 'lib/pgbus/configuration.rb', line 14

def execution_mode
  @execution_mode
end

#group_modeObject

Grouped reads (PGMQ v1.11.0+ FIFO grouping). nil = disabled (default read_batch behavior). :fifo = use read_grouped (drains oldest group first, throughput-optimized). :round_robin = use read_grouped_rr (fair round-robin across groups).



51
52
53
# File 'lib/pgbus/configuration.rb', line 51

def group_mode
  @group_mode
end

#idempotency_ttlObject

rubocop:disable Style/AccessorGrouping



64
65
66
# File 'lib/pgbus/configuration.rb', line 64

def idempotency_ttl
  @idempotency_ttl
end

#insights_default_minutesObject

Web dashboard



104
105
106
# File 'lib/pgbus/configuration.rb', line 104

def insights_default_minutes
  @insights_default_minutes
end

#listen_notifyObject

LISTEN/NOTIFY. Only the on/off switch is user-facing — the throttle interval is a Postgres-side tuning knob that lives as a constant on Pgbus::Client (NOTIFY_THROTTLE_MS).



77
78
79
# File 'lib/pgbus/configuration.rb', line 77

def listen_notify
  @listen_notify
end

#log_formatObject

rubocop:disable Style/AccessorGrouping



68
69
70
# File 'lib/pgbus/configuration.rb', line 68

def log_format
  @log_format
end

#loggerObject

Logging



67
68
69
# File 'lib/pgbus/configuration.rb', line 67

def logger
  @logger
end

#max_jobs_per_workerObject

Worker recycling



27
28
29
# File 'lib/pgbus/configuration.rb', line 27

def max_jobs_per_worker
  @max_jobs_per_worker
end

#max_memory_mbObject

Worker recycling



27
28
29
# File 'lib/pgbus/configuration.rb', line 27

def max_memory_mb
  @max_memory_mb
end

#max_retriesObject

Dead letter queue



38
39
40
# File 'lib/pgbus/configuration.rb', line 38

def max_retries
  @max_retries
end

#max_worker_lifetimeObject

Worker recycling



27
28
29
# File 'lib/pgbus/configuration.rb', line 27

def max_worker_lifetime
  @max_worker_lifetime
end

#metrics_enabledObject

Web dashboard



104
105
106
# File 'lib/pgbus/configuration.rb', line 104

def metrics_enabled
  @metrics_enabled
end

#outbox_batch_sizeObject

Transactional outbox



59
60
61
# File 'lib/pgbus/configuration.rb', line 59

def outbox_batch_size
  @outbox_batch_size
end

#outbox_enabledObject

Transactional outbox



59
60
61
# File 'lib/pgbus/configuration.rb', line 59

def outbox_enabled
  @outbox_enabled
end

#outbox_poll_intervalObject

Transactional outbox



59
60
61
# File 'lib/pgbus/configuration.rb', line 59

def outbox_poll_interval
  @outbox_poll_interval
end

#outbox_retentionObject

rubocop:disable Style/AccessorGrouping



60
61
62
# File 'lib/pgbus/configuration.rb', line 60

def outbox_retention
  @outbox_retention
end

#pgmq_schema_modeObject

PGMQ schema installation mode (:auto, :extension, :embedded)



80
81
82
# File 'lib/pgbus/configuration.rb', line 80

def pgmq_schema_mode
  @pgmq_schema_mode
end

#polling_intervalObject

Worker settings



14
15
16
# File 'lib/pgbus/configuration.rb', line 14

def polling_interval
  @polling_interval
end

#pool_sizeObject

Connection settings



8
9
10
# File 'lib/pgbus/configuration.rb', line 8

def pool_size
  @pool_size
end

#pool_timeoutObject

Connection settings



8
9
10
# File 'lib/pgbus/configuration.rb', line 8

def pool_timeout
  @pool_timeout
end

#prefetch_limitObject

Worker settings



14
15
16
# File 'lib/pgbus/configuration.rb', line 14

def prefetch_limit
  @prefetch_limit
end

#priority_levelsObject

Priority queues



45
46
47
# File 'lib/pgbus/configuration.rb', line 45

def priority_levels
  @priority_levels
end

#queue_prefixObject

Queue settings



11
12
13
# File 'lib/pgbus/configuration.rb', line 11

def queue_prefix
  @queue_prefix
end

#recurring_execution_retentionObject

rubocop:disable Style/AccessorGrouping



88
89
90
# File 'lib/pgbus/configuration.rb', line 88

def recurring_execution_retention
  @recurring_execution_retention
end

#recurring_schedule_intervalObject

Recurring jobs



86
87
88
# File 'lib/pgbus/configuration.rb', line 86

def recurring_schedule_interval
  @recurring_schedule_interval
end

#recurring_tasksObject

Recurring jobs



86
87
88
# File 'lib/pgbus/configuration.rb', line 86

def recurring_tasks
  @recurring_tasks
end

#recurring_tasks_fileObject

Recurring jobs



86
87
88
# File 'lib/pgbus/configuration.rb', line 86

def recurring_tasks_file
  @recurring_tasks_file
end

#recurring_tasks_filesObject



588
589
590
591
592
# File 'lib/pgbus/configuration.rb', line 588

def recurring_tasks_files
  return @recurring_tasks_files if @recurring_tasks_files

  recurring_tasks_file ? [recurring_tasks_file] : nil
end

#retry_backoffObject

Retry backoff for the VT-based retry path (unhandled exceptions). Jobs can override these per-class via Pgbus::RetryBackoff::JobMixin.



42
43
44
# File 'lib/pgbus/configuration.rb', line 42

def retry_backoff
  @retry_backoff
end

#retry_backoff_jitterObject

Retry backoff for the VT-based retry path (unhandled exceptions). Jobs can override these per-class via Pgbus::RetryBackoff::JobMixin.



42
43
44
# File 'lib/pgbus/configuration.rb', line 42

def retry_backoff_jitter
  @retry_backoff_jitter
end

#retry_backoff_maxObject

Retry backoff for the VT-based retry path (unhandled exceptions). Jobs can override these per-class via Pgbus::RetryBackoff::JobMixin.



42
43
44
# File 'lib/pgbus/configuration.rb', line 42

def retry_backoff_max
  @retry_backoff_max
end

#return_to_app_urlObject

Web dashboard



104
105
106
# File 'lib/pgbus/configuration.rb', line 104

def return_to_app_url
  @return_to_app_url
end

#rolesObject

Supervisor role selection. nil = boot all roles (default behavior). Array of role symbols = boot only the listed roles. Set via the CLI flags –workers-only / –scheduler-only / –dispatcher-only, or directly in an initializer for advanced cases.



22
23
24
# File 'lib/pgbus/configuration.rb', line 22

def roles
  @roles
end

#skip_recurringObject

Recurring jobs



86
87
88
# File 'lib/pgbus/configuration.rb', line 86

def skip_recurring
  @skip_recurring
end

#stats_enabledObject

Job stats



100
101
102
# File 'lib/pgbus/configuration.rb', line 100

def stats_enabled
  @stats_enabled
end

#stats_retentionObject

rubocop:disable Style/AccessorGrouping



101
102
103
# File 'lib/pgbus/configuration.rb', line 101

def stats_retention
  @stats_retention
end

#streams_database_urlObject

Streams (turbo-rails replacement, SSE-based)



109
110
111
# File 'lib/pgbus/configuration.rb', line 109

def streams_database_url
  @streams_database_url
end

#streams_default_broadcast_modeObject

rubocop:disable Style/AccessorGrouping



117
118
119
# File 'lib/pgbus/configuration.rb', line 117

def streams_default_broadcast_mode
  @streams_default_broadcast_mode
end

#streams_default_retentionObject

Streams (turbo-rails replacement, SSE-based)



109
110
111
# File 'lib/pgbus/configuration.rb', line 109

def streams_default_retention
  @streams_default_retention
end

#streams_durable_patternsObject

Streams (turbo-rails replacement, SSE-based)



109
110
111
# File 'lib/pgbus/configuration.rb', line 109

def streams_durable_patterns
  @streams_durable_patterns
end

#streams_enabledObject

Streams (turbo-rails replacement, SSE-based)



109
110
111
# File 'lib/pgbus/configuration.rb', line 109

def streams_enabled
  @streams_enabled
end

#streams_falcon_streaming_bodyObject

Streams (turbo-rails replacement, SSE-based)



109
110
111
# File 'lib/pgbus/configuration.rb', line 109

def streams_falcon_streaming_body
  @streams_falcon_streaming_body
end

#streams_heartbeat_intervalObject

Streams (turbo-rails replacement, SSE-based)



109
110
111
# File 'lib/pgbus/configuration.rb', line 109

def streams_heartbeat_interval
  @streams_heartbeat_interval
end

#streams_hostObject

Streams (turbo-rails replacement, SSE-based)



109
110
111
# File 'lib/pgbus/configuration.rb', line 109

def streams_host
  @streams_host
end

#streams_idle_timeoutObject

Streams (turbo-rails replacement, SSE-based)



109
110
111
# File 'lib/pgbus/configuration.rb', line 109

def streams_idle_timeout
  @streams_idle_timeout
end

#streams_listen_health_check_msObject

Streams (turbo-rails replacement, SSE-based)



109
110
111
# File 'lib/pgbus/configuration.rb', line 109

def streams_listen_health_check_ms
  @streams_listen_health_check_ms
end

#streams_max_connectionsObject

Streams (turbo-rails replacement, SSE-based)



109
110
111
# File 'lib/pgbus/configuration.rb', line 109

def streams_max_connections
  @streams_max_connections
end

#streams_orphan_sweep_intervalObject

Streams (turbo-rails replacement, SSE-based)



109
110
111
# File 'lib/pgbus/configuration.rb', line 109

def streams_orphan_sweep_interval
  @streams_orphan_sweep_interval
end

#streams_orphan_thresholdObject

Streams (turbo-rails replacement, SSE-based)



109
110
111
# File 'lib/pgbus/configuration.rb', line 109

def streams_orphan_threshold
  @streams_orphan_threshold
end

#streams_pathObject

Streams (turbo-rails replacement, SSE-based)



109
110
111
# File 'lib/pgbus/configuration.rb', line 109

def streams_path
  @streams_path
end

#streams_portObject

Streams (turbo-rails replacement, SSE-based)



109
110
111
# File 'lib/pgbus/configuration.rb', line 109

def streams_port
  @streams_port
end

#streams_queue_prefixObject

Streams (turbo-rails replacement, SSE-based)



109
110
111
# File 'lib/pgbus/configuration.rb', line 109

def streams_queue_prefix
  @streams_queue_prefix
end

#streams_retentionObject

Streams (turbo-rails replacement, SSE-based)



109
110
111
# File 'lib/pgbus/configuration.rb', line 109

def streams_retention
  @streams_retention
end

#streams_signed_name_secretObject

Streams (turbo-rails replacement, SSE-based)



109
110
111
# File 'lib/pgbus/configuration.rb', line 109

def streams_signed_name_secret
  @streams_signed_name_secret
end

#streams_stats_enabledObject

Streams (turbo-rails replacement, SSE-based)



109
110
111
# File 'lib/pgbus/configuration.rb', line 109

def streams_stats_enabled
  @streams_stats_enabled
end

#streams_test_modeObject

Streams (turbo-rails replacement, SSE-based)



109
110
111
# File 'lib/pgbus/configuration.rb', line 109

def streams_test_mode
  @streams_test_mode
end

#streams_write_deadline_msObject

Streams (turbo-rails replacement, SSE-based)



109
110
111
# File 'lib/pgbus/configuration.rb', line 109

def streams_write_deadline_ms
  @streams_write_deadline_ms
end

#visibility_timeoutObject

rubocop:disable Style/AccessorGrouping



15
16
17
# File 'lib/pgbus/configuration.rb', line 15

def visibility_timeout
  @visibility_timeout
end

#web_authObject

Web dashboard



104
105
106
# File 'lib/pgbus/configuration.rb', line 104

def web_auth
  @web_auth
end

#web_data_sourceObject

Web dashboard



104
105
106
# File 'lib/pgbus/configuration.rb', line 104

def web_data_source
  @web_data_source
end

#web_live_updatesObject

Web dashboard



104
105
106
# File 'lib/pgbus/configuration.rb', line 104

def web_live_updates
  @web_live_updates
end

#web_per_pageObject

Web dashboard



104
105
106
# File 'lib/pgbus/configuration.rb', line 104

def web_per_page
  @web_per_page
end

#web_refresh_intervalObject

Web dashboard



104
105
106
# File 'lib/pgbus/configuration.rb', line 104

def web_refresh_interval
  @web_refresh_interval
end

#workersObject

rubocop:disable Style/AccessorGrouping



15
16
17
# File 'lib/pgbus/configuration.rb', line 15

def workers
  @workers
end

#zombie_detectionObject

Zombie message detection — logs a warning when a message is redelivered (read_ct > 1) without any prior failure recorded in pgbus_failed_events.



97
98
99
# File 'lib/pgbus/configuration.rb', line 97

def zombie_detection
  @zombie_detection
end

Instance Method Details

#capsule(name, queues:, threads:) ⇒ Object

Define a named capsule and append it to the workers list.

c.capsule :critical, queues: %w[critical], threads: 5
c.capsule :gated, queues: %w[gated], threads: 1, single_active_consumer: true

Names must be unique. Queues must not overlap with capsules already defined (would cause double-processing). Composes with the string DSL —+c.workers “…”+ followed by c.capsule :name, … appends the named capsule to the list parsed from the string.

Raises:

  • (ArgumentError)


494
495
496
497
498
499
500
501
502
503
504
505
506
# File 'lib/pgbus/configuration.rb', line 494

def capsule(name, queues:, threads:, **)
  raise ArgumentError, "capsule queues must be a non-empty Array" unless queues.is_a?(Array) && queues.any?
  raise ArgumentError, "capsule threads must be a positive Integer" unless threads.is_a?(Integer) && threads.positive?

  normalized_name = name.to_s
  @workers ||= []

  raise ArgumentError, "capsule #{name.inspect} is already defined" if @workers.any? { |c| capsule_name(c) == normalized_name }

  validate_no_queue_overlap!(queues)

  @workers << { name: normalized_name, queues: queues, threads: threads, ** }
end

#capsule_named(name) ⇒ Object

Look up a capsule by its name. Accepts symbol or string. Returns the matching Hash, or nil. Used by the CLI’s –capsule selector.



510
511
512
513
514
515
# File 'lib/pgbus/configuration.rb', line 510

def capsule_named(name)
  return nil unless @workers

  key = name.to_s
  @workers.find { |c| capsule_name(c) == key }
end

#connection_optionsObject



634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
# File 'lib/pgbus/configuration.rb', line 634

def connection_options
  if database_url
    database_url
  elsif connection_params
    connection_params
  elsif defined?(ActiveRecord::Base)
    # Extract connection config from ActiveRecord so pgmq-ruby creates its
    # own dedicated PG connections. Sharing AR's raw_connection via a Proc
    # is NOT thread-safe: the ConnectionPool caches the PG::Connection from
    # whichever thread first called the Proc, then hands that same object to
    # other threads — causing nil results, segfaults, and data corruption
    # when AR and PGMQ issue concurrent queries on the same connection.
    extract_ar_connection_hash
  else
    raise ConfigurationError, "No database connection configured. " \
                              "Set Pgbus.configuration.database_url, connection_params, or use with Rails."
  end
end

#dead_letter_queue_name(name) ⇒ Object



262
263
264
# File 'lib/pgbus/configuration.rb', line 262

def dead_letter_queue_name(name)
  "#{queue_name(name)}#{Pgbus::DEAD_LETTER_SUFFIX}"
end

#execution_mode_for(worker_config) ⇒ Object

Returns the execution mode for a specific worker config hash, falling back to the global execution_mode setting.



278
279
280
281
# File 'lib/pgbus/configuration.rb', line 278

def execution_mode_for(worker_config)
  mode = worker_config[:execution_mode] || worker_config["execution_mode"] || execution_mode
  ExecutionPools.normalize_mode(mode)
end

#priority_queue_name(name, priority) ⇒ Object



266
267
268
# File 'lib/pgbus/configuration.rb', line 266

def priority_queue_name(name, priority)
  "#{queue_name(name)}_p#{priority}"
end

#priority_queue_names(name) ⇒ Object



270
271
272
273
274
# File 'lib/pgbus/configuration.rb', line 270

def priority_queue_names(name)
  return [queue_name(name)] unless priority_levels && priority_levels > 1

  (0...priority_levels).map { |p| priority_queue_name(name, p) }
end

#queue_name(name) ⇒ Object



257
258
259
260
# File 'lib/pgbus/configuration.rb', line 257

def queue_name(name)
  full = "#{queue_prefix}_#{name}"
  QueueNameValidator.normalize(full)
end

#resolved_pool_sizeObject



621
622
623
624
625
626
627
628
629
630
631
632
# File 'lib/pgbus/configuration.rb', line 621

def resolved_pool_size
  return pool_size if pool_size

  total = 0
  total += sum_thread_counts(workers, default_threads: 5, group: "worker") if role_enabled?(:workers)
  total += sum_thread_counts(event_consumers, default_threads: 3, group: "event_consumer") if role_enabled?(:consumers)
  total += 1 if role_enabled?(:dispatcher)
  total += 1 if role_enabled?(:scheduler)

  warn_if_oversized(total)
  total
end

#role_enabled?(role) ⇒ Boolean

Returns true if the given role should be booted by the supervisor. When roles is nil (the default), every role is enabled — this matches the legacy single-process behavior. When roles is set (e.g. via the CLI’s –workers-only / –scheduler-only / –dispatcher-only flags), only the listed roles boot.

Accepts symbol or string for case-insensitive comparison.

Returns:

  • (Boolean)


524
525
526
527
528
# File 'lib/pgbus/configuration.rb', line 524

def role_enabled?(role)
  return true if @roles.nil?

  @roles.include?(role.to_s.downcase.to_sym)
end

#stream_durable?(name) ⇒ Boolean

Returns true if the given stream name should be durable based on ‘streams_durable_patterns` (exact string or Regexp match) or the `streams_default_broadcast_mode` fallback.

Returns:

  • (Boolean)


423
424
425
426
427
428
# File 'lib/pgbus/configuration.rb', line 423

def stream_durable?(name)
  patterns = streams_durable_patterns || []
  return true if patterns.any? { |p| p.is_a?(Regexp) ? p.match?(name) : p == name }

  streams_default_broadcast_mode == :durable
end

#streams_connection_optionsObject

Connection options the Streamer’s dedicated LISTEN/NOTIFY PG connection should use. Defaults to ‘connection_options` (same as workers and the publish path). If any of `streams_database_url`, `streams_host`, or `streams_port` is set, the Streamer’s connection is reconfigured —everything else keeps using the base options.

The typical use is “workers go through PgBouncer, streamer goes direct”:

c.connects_to = { database: { writing: :pgbus } }   # pooler
c.streams_port = 5432                               # direct

Precedence: streams_database_url > streams_host/port override > base.



665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
# File 'lib/pgbus/configuration.rb', line 665

def streams_connection_options
  return streams_database_url if streams_database_url

  base = connection_options
  return base unless streams_host || streams_port

  case base
  when Hash
    result = base.dup
    result[:host] = streams_host if streams_host
    result[:port] = streams_port if streams_port
    result
  when String
    # libpq's conninfo parser takes later key=value pairs as overrides
    # for earlier ones, so we just append. Handles both URI form
    # (postgres://...) and key=value form.
    parts = [base]
    parts << "host=#{streams_host}" if streams_host
    parts << "port=#{streams_port}" if streams_port
    parts.join(" ")
  else
    base
  end
end

#validate!Object

Raises:

  • (ArgumentError)


339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
# File 'lib/pgbus/configuration.rb', line 339

def validate!
  if pool_size && !(pool_size.is_a?(Numeric) && pool_size.positive?)
    raise ArgumentError, "pool_size must be a positive number or nil (auto-tune)"
  end

  raise ArgumentError, "pool_timeout must be > 0" unless pool_timeout.is_a?(Numeric) && pool_timeout.positive?
  raise ArgumentError, "polling_interval must be > 0" unless polling_interval.is_a?(Numeric) && polling_interval.positive?
  raise ArgumentError, "visibility_timeout must be > 0" unless visibility_timeout.is_a?(Numeric) && visibility_timeout.positive?
  raise ArgumentError, "max_retries must be >= 0" unless max_retries.is_a?(Integer) && max_retries >= 0
  raise ArgumentError, "retry_backoff must be > 0" unless retry_backoff.is_a?(Numeric) && retry_backoff.positive?
  raise ArgumentError, "retry_backoff_max must be > 0" unless retry_backoff_max.is_a?(Numeric) && retry_backoff_max.positive?
  unless retry_backoff_jitter.is_a?(Numeric) && retry_backoff_jitter >= 0 && retry_backoff_jitter <= 1
    raise ArgumentError, "retry_backoff_jitter must be between 0 and 1"
  end

  # Validate global execution_mode
  ExecutionPools.normalize_mode(execution_mode)

  Array(workers).each do |w|
    threads = w[:threads] || w["threads"] || 5
    raise ArgumentError, "worker threads must be > 0" unless threads.is_a?(Integer) && threads.positive?

    # Validate per-worker execution_mode override if present
    mode = w[:execution_mode] || w["execution_mode"]
    ExecutionPools.normalize_mode(mode) if mode
  end

  raise ArgumentError, "prefetch_limit must be > 0" if prefetch_limit && !(prefetch_limit.is_a?(Integer) && prefetch_limit.positive?)

  if priority_levels && !(priority_levels.is_a?(Integer) && priority_levels >= 1 && priority_levels <= 10)
    raise ArgumentError, "priority_levels must be an integer between 1 and 10"
  end

  unless insights_default_minutes.is_a?(Integer) && insights_default_minutes.positive?
    raise ArgumentError, "insights_default_minutes must be a positive integer"
  end

  validate_streams!

  self
end

#validate_streams!Object

Raises:

  • (ArgumentError)


381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
# File 'lib/pgbus/configuration.rb', line 381

def validate_streams!
  unless streams_default_retention.is_a?(Numeric) && streams_default_retention >= 0
    raise ArgumentError, "streams_default_retention must be a non-negative number"
  end

  unless streams_max_connections.is_a?(Integer) && streams_max_connections.positive?
    raise ArgumentError, "streams_max_connections must be a positive integer"
  end

  unless streams_heartbeat_interval.is_a?(Numeric) && streams_heartbeat_interval.positive?
    raise ArgumentError, "streams_heartbeat_interval must be a positive number"
  end

  unless streams_idle_timeout.is_a?(Numeric) && streams_idle_timeout.positive?
    raise ArgumentError, "streams_idle_timeout must be a positive number"
  end

  unless streams_listen_health_check_ms.is_a?(Integer) && streams_listen_health_check_ms.positive?
    raise ArgumentError, "streams_listen_health_check_ms must be a positive integer"
  end

  unless streams_write_deadline_ms.is_a?(Integer) && streams_write_deadline_ms.positive?
    raise ArgumentError, "streams_write_deadline_ms must be a positive integer"
  end

  raise ArgumentError, "streams_retention must be a Hash" unless streams_retention.is_a?(Hash)

  if streams_orphan_sweep_interval && !(streams_orphan_sweep_interval.is_a?(Numeric) && streams_orphan_sweep_interval.positive?)
    raise ArgumentError, "streams_orphan_sweep_interval must be a positive number or nil to disable"
  end

  raise ArgumentError, "streams_durable_patterns must be an Array of strings/regex" unless streams_durable_patterns.is_a?(Array)

  return if streams_orphan_threshold.nil?
  return if streams_orphan_threshold.is_a?(Numeric) && streams_orphan_threshold.positive?

  raise ArgumentError, "streams_orphan_threshold must be a positive number or nil to disable"
end