Class: Pgbus::Configuration

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/configuration.rb,
lib/pgbus/configuration/capsule_dsl.rb

Defined Under Namespace

Classes: CapsuleDSL

Constant Summary collapse

VALID_ROLES =
%i[workers dispatcher scheduler consumers outbox].freeze
VALID_LOG_FORMATS =
%i[text json].freeze
VALID_PGMQ_SCHEMA_MODES =
%i[auto extension embedded].freeze
POOL_SIZE_WARN_THRESHOLD =

Returns the connection pool size to use for the PGMQ client.

If pool_size was explicitly set, returns that value unchanged. Otherwise auto-derives from the threads needed by the roles this process actually runs (respects Configuration#roles from –workers-only / –scheduler-only / –dispatcher-only):

workers role      sum(workers.threads)
consumers role    sum(event_consumers.threads)
dispatcher role   +1
scheduler role    +1

A –scheduler-only deployment that has 50 worker threads configured only needs 1 connection (for the scheduler), not 52.

Auto-tune protects users from the common pitfall of running 15 worker threads with a hand-set pool_size of 5 (resulting in ConnectionPool timeouts under load). Setting pool_size explicitly is still supported for advanced cases where you need a tighter or looser pool than the default formula provides.

50
ASYNC_POOL_CONNECTIONS =

Connections needed per async worker: one for the reactor’s serial execution, one for polling, one for headroom. Fibers share connections because only one runs at a time per reactor thread.

3

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeConfiguration

Returns a new instance of Configuration.



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/pgbus/configuration.rb', line 108

def initialize
  @database_url = nil
  @connection_params = nil
  @pool_size = nil
  @pool_timeout = 5

  @default_queue = "default"
  @queue_prefix = "pgbus"

  @workers = [{ queues: %w[default], threads: 5 }]
  @roles = nil
  @polling_interval = 0.1
  @visibility_timeout = 30

  @prefetch_limit = nil
  @execution_mode = :threads

  @max_jobs_per_worker = nil
  @max_memory_mb = nil
  @max_worker_lifetime = nil

  @dispatch_interval = 1.0

  @circuit_breaker_enabled = true

  @max_retries = 5
  @retry_backoff = 5          # seconds — first VT-retry delay
  @retry_backoff_max = 300    # 5 minutes cap
  @retry_backoff_jitter = 0.15

  @priority_levels = nil
  @default_priority = 1

  @archive_retention = 7 * 24 * 3600 # 7 days

  @outbox_enabled = false
  @outbox_poll_interval = 1.0
  @outbox_batch_size = 100
  @outbox_retention = 24 * 3600 # 1 day

  @idempotency_ttl = 7 * 24 * 3600 # 7 days
  @allowed_global_id_models = nil # nil = allow all (for backwards compat)

  @logger = (defined?(Rails) && Rails.respond_to?(:logger) && Rails.logger) || Logger.new($stdout)
  @log_format = :text
  @error_reporters = []

  @listen_notify = true

  @pgmq_schema_mode = :auto

  @event_consumers = nil

  @recurring_tasks = nil
  @recurring_schedule_interval = 1.0
  @recurring_tasks_file = nil
  @skip_recurring = false
  @recurring_execution_retention = 7 * 24 * 3600 # 7 days

  @zombie_detection = true

  @stats_enabled = true
  @stats_retention = 30 * 24 * 3600 # 30 days

  @connects_to = nil

  @web_auth = nil
  @web_refresh_interval = 5000
  @web_per_page = 25
  @web_live_updates = true
  @web_data_source = nil
  @insights_default_minutes = 30 * 24 * 60 # 30 days
  @base_controller_class = "::ActionController::Base"
  @return_to_app_url = nil
  @metrics_enabled = true

  @streams_enabled = true
  @streams_path = nil
  @streams_queue_prefix = "pgbus_stream"
  @streams_signed_name_secret = nil
  @streams_default_retention = 5 * 60 # 5 minutes
  @streams_retention = {}
  @streams_heartbeat_interval = 15
  @streams_max_connections = 2_000
  @streams_idle_timeout = 3_600 # 1 hour
  # 250ms — this value plays two roles: (1) the TCP keepalive
  # interval for the streamer's PG LISTEN connection, and (2) the
  # upper bound on how long Dispatcher#handle_connect waits for
  # the Listener to acknowledge a synchronous ensure_listening
  # call. 5s was unbounded enough to drop messages on a
  # realistic subscribe burst; 250ms keeps the connect-path race
  # window tight while still leaving headroom over a typical
  # PG keepalive interval.
  @streams_listen_health_check_ms = 250
  @streams_write_deadline_ms = 5_000
  @streams_falcon_streaming_body = false
  # Opt-in: when true, the Dispatcher writes one row to
  # pgbus_stream_stats per broadcast/connect/disconnect. Default
  # off because stream event volume can be much higher than job
  # volume and the Insights surface is only useful if operators
  # actually look at it. Separate from #stats_enabled (which
  # gates pgbus_job_stats recording) on purpose — operators
  # usually want job stats on and stream stats off, or vice versa.
  @streams_stats_enabled = false
  @streams_test_mode = false
end

Instance Attribute Details

#allowed_global_id_modelsObject

Event bus



57
58
59
# File 'lib/pgbus/configuration.rb', line 57

def allowed_global_id_models
  @allowed_global_id_models
end

#archive_retentionObject

Archive compaction. Only the user-facing retention window is configurable; the loop interval and batch size are tuned via constants on Pgbus::Process::Dispatcher.



50
51
52
# File 'lib/pgbus/configuration.rb', line 50

def archive_retention
  @archive_retention
end

#base_controller_classObject

Web dashboard



97
98
99
# File 'lib/pgbus/configuration.rb', line 97

def base_controller_class
  @base_controller_class
end

#circuit_breaker_enabledObject

Circuit breaker. Only ‘enabled` is user-facing — the trip threshold and backoff curve are tuned via constants on Pgbus::CircuitBreaker because they are implementation details that have never been worth exposing.



35
36
37
# File 'lib/pgbus/configuration.rb', line 35

def circuit_breaker_enabled
  @circuit_breaker_enabled
end

#connection_paramsObject

Connection settings



8
9
10
# File 'lib/pgbus/configuration.rb', line 8

def connection_params
  @connection_params
end

#connects_toObject

Multi-database support (optional separate database for pgbus tables) Set to { database: { writing: :pgbus, reading: :pgbus } } to use a separate database. Requires a matching entry in config/database.yml under the “pgbus” key.



86
87
88
# File 'lib/pgbus/configuration.rb', line 86

def connects_to
  @connects_to
end

#database_urlObject

Connection settings



8
9
10
# File 'lib/pgbus/configuration.rb', line 8

def database_url
  @database_url
end

#default_priorityObject

Priority queues



45
46
47
# File 'lib/pgbus/configuration.rb', line 45

def default_priority
  @default_priority
end

#default_queueObject

Queue settings



11
12
13
# File 'lib/pgbus/configuration.rb', line 11

def default_queue
  @default_queue
end

#dispatch_intervalObject

Dispatcher settings



30
31
32
# File 'lib/pgbus/configuration.rb', line 30

def dispatch_interval
  @dispatch_interval
end

#error_reportersObject

Error reporting — array of callable objects invoked on caught exceptions. Each receives (exception, context_hash) or (exception, context_hash, config).



66
67
68
# File 'lib/pgbus/configuration.rb', line 66

def error_reporters
  @error_reporters
end

#event_consumersObject

Event consumers



77
78
79
# File 'lib/pgbus/configuration.rb', line 77

def event_consumers
  @event_consumers
end

#execution_modeObject

Worker settings



14
15
16
# File 'lib/pgbus/configuration.rb', line 14

def execution_mode
  @execution_mode
end

#idempotency_ttlObject

rubocop:disable Style/AccessorGrouping



58
59
60
# File 'lib/pgbus/configuration.rb', line 58

def idempotency_ttl
  @idempotency_ttl
end

#insights_default_minutesObject

Web dashboard



97
98
99
# File 'lib/pgbus/configuration.rb', line 97

def insights_default_minutes
  @insights_default_minutes
end

#listen_notifyObject

LISTEN/NOTIFY. Only the on/off switch is user-facing — the throttle interval is a Postgres-side tuning knob that lives as a constant on Pgbus::Client (NOTIFY_THROTTLE_MS).



71
72
73
# File 'lib/pgbus/configuration.rb', line 71

def listen_notify
  @listen_notify
end

#log_formatObject

rubocop:disable Style/AccessorGrouping



62
63
64
# File 'lib/pgbus/configuration.rb', line 62

def log_format
  @log_format
end

#loggerObject

Logging



61
62
63
# File 'lib/pgbus/configuration.rb', line 61

def logger
  @logger
end

#max_jobs_per_workerObject

Worker recycling



27
28
29
# File 'lib/pgbus/configuration.rb', line 27

def max_jobs_per_worker
  @max_jobs_per_worker
end

#max_memory_mbObject

Worker recycling



27
28
29
# File 'lib/pgbus/configuration.rb', line 27

def max_memory_mb
  @max_memory_mb
end

#max_retriesObject

Dead letter queue



38
39
40
# File 'lib/pgbus/configuration.rb', line 38

def max_retries
  @max_retries
end

#max_worker_lifetimeObject

Worker recycling



27
28
29
# File 'lib/pgbus/configuration.rb', line 27

def max_worker_lifetime
  @max_worker_lifetime
end

#metrics_enabledObject

Web dashboard



97
98
99
# File 'lib/pgbus/configuration.rb', line 97

def metrics_enabled
  @metrics_enabled
end

#outbox_batch_sizeObject

Transactional outbox



53
54
55
# File 'lib/pgbus/configuration.rb', line 53

def outbox_batch_size
  @outbox_batch_size
end

#outbox_enabledObject

Transactional outbox



53
54
55
# File 'lib/pgbus/configuration.rb', line 53

def outbox_enabled
  @outbox_enabled
end

#outbox_poll_intervalObject

Transactional outbox



53
54
55
# File 'lib/pgbus/configuration.rb', line 53

def outbox_poll_interval
  @outbox_poll_interval
end

#outbox_retentionObject

rubocop:disable Style/AccessorGrouping



54
55
56
# File 'lib/pgbus/configuration.rb', line 54

def outbox_retention
  @outbox_retention
end

#pgmq_schema_modeObject

PGMQ schema installation mode (:auto, :extension, :embedded)



74
75
76
# File 'lib/pgbus/configuration.rb', line 74

def pgmq_schema_mode
  @pgmq_schema_mode
end

#polling_intervalObject

Worker settings



14
15
16
# File 'lib/pgbus/configuration.rb', line 14

def polling_interval
  @polling_interval
end

#pool_sizeObject

Connection settings



8
9
10
# File 'lib/pgbus/configuration.rb', line 8

def pool_size
  @pool_size
end

#pool_timeoutObject

Connection settings



8
9
10
# File 'lib/pgbus/configuration.rb', line 8

def pool_timeout
  @pool_timeout
end

#prefetch_limitObject

Worker settings



14
15
16
# File 'lib/pgbus/configuration.rb', line 14

def prefetch_limit
  @prefetch_limit
end

#priority_levelsObject

Priority queues



45
46
47
# File 'lib/pgbus/configuration.rb', line 45

def priority_levels
  @priority_levels
end

#queue_prefixObject

Queue settings



11
12
13
# File 'lib/pgbus/configuration.rb', line 11

def queue_prefix
  @queue_prefix
end

#recurring_execution_retentionObject

rubocop:disable Style/AccessorGrouping



81
82
83
# File 'lib/pgbus/configuration.rb', line 81

def recurring_execution_retention
  @recurring_execution_retention
end

#recurring_schedule_intervalObject

Recurring jobs



80
81
82
# File 'lib/pgbus/configuration.rb', line 80

def recurring_schedule_interval
  @recurring_schedule_interval
end

#recurring_tasksObject

Recurring jobs



80
81
82
# File 'lib/pgbus/configuration.rb', line 80

def recurring_tasks
  @recurring_tasks
end

#recurring_tasks_fileObject

Recurring jobs



80
81
82
# File 'lib/pgbus/configuration.rb', line 80

def recurring_tasks_file
  @recurring_tasks_file
end

#retry_backoffObject

Retry backoff for the VT-based retry path (unhandled exceptions). Jobs can override these per-class via Pgbus::RetryBackoff::JobMixin.



42
43
44
# File 'lib/pgbus/configuration.rb', line 42

def retry_backoff
  @retry_backoff
end

#retry_backoff_jitterObject

Retry backoff for the VT-based retry path (unhandled exceptions). Jobs can override these per-class via Pgbus::RetryBackoff::JobMixin.



42
43
44
# File 'lib/pgbus/configuration.rb', line 42

def retry_backoff_jitter
  @retry_backoff_jitter
end

#retry_backoff_maxObject

Retry backoff for the VT-based retry path (unhandled exceptions). Jobs can override these per-class via Pgbus::RetryBackoff::JobMixin.



42
43
44
# File 'lib/pgbus/configuration.rb', line 42

def retry_backoff_max
  @retry_backoff_max
end

#return_to_app_urlObject

Web dashboard



97
98
99
# File 'lib/pgbus/configuration.rb', line 97

def return_to_app_url
  @return_to_app_url
end

#rolesObject

Supervisor role selection. nil = boot all roles (default behavior). Array of role symbols = boot only the listed roles. Set via the CLI flags –workers-only / –scheduler-only / –dispatcher-only, or directly in an initializer for advanced cases.



22
23
24
# File 'lib/pgbus/configuration.rb', line 22

def roles
  @roles
end

#skip_recurringObject

Recurring jobs



80
81
82
# File 'lib/pgbus/configuration.rb', line 80

def skip_recurring
  @skip_recurring
end

#stats_enabledObject

Job stats



93
94
95
# File 'lib/pgbus/configuration.rb', line 93

def stats_enabled
  @stats_enabled
end

#stats_retentionObject

rubocop:disable Style/AccessorGrouping



94
95
96
# File 'lib/pgbus/configuration.rb', line 94

def stats_retention
  @stats_retention
end

#streams_default_retentionObject

Streams (turbo-rails replacement, SSE-based)



102
103
104
# File 'lib/pgbus/configuration.rb', line 102

def streams_default_retention
  @streams_default_retention
end

#streams_enabledObject

Streams (turbo-rails replacement, SSE-based)



102
103
104
# File 'lib/pgbus/configuration.rb', line 102

def streams_enabled
  @streams_enabled
end

#streams_falcon_streaming_bodyObject

Streams (turbo-rails replacement, SSE-based)



102
103
104
# File 'lib/pgbus/configuration.rb', line 102

def streams_falcon_streaming_body
  @streams_falcon_streaming_body
end

#streams_heartbeat_intervalObject

Streams (turbo-rails replacement, SSE-based)



102
103
104
# File 'lib/pgbus/configuration.rb', line 102

def streams_heartbeat_interval
  @streams_heartbeat_interval
end

#streams_idle_timeoutObject

Streams (turbo-rails replacement, SSE-based)



102
103
104
# File 'lib/pgbus/configuration.rb', line 102

def streams_idle_timeout
  @streams_idle_timeout
end

#streams_listen_health_check_msObject

Streams (turbo-rails replacement, SSE-based)



102
103
104
# File 'lib/pgbus/configuration.rb', line 102

def streams_listen_health_check_ms
  @streams_listen_health_check_ms
end

#streams_max_connectionsObject

Streams (turbo-rails replacement, SSE-based)



102
103
104
# File 'lib/pgbus/configuration.rb', line 102

def streams_max_connections
  @streams_max_connections
end

#streams_pathObject

Streams (turbo-rails replacement, SSE-based)



102
103
104
# File 'lib/pgbus/configuration.rb', line 102

def streams_path
  @streams_path
end

#streams_queue_prefixObject

Streams (turbo-rails replacement, SSE-based)



102
103
104
# File 'lib/pgbus/configuration.rb', line 102

def streams_queue_prefix
  @streams_queue_prefix
end

#streams_retentionObject

Streams (turbo-rails replacement, SSE-based)



102
103
104
# File 'lib/pgbus/configuration.rb', line 102

def streams_retention
  @streams_retention
end

#streams_signed_name_secretObject

Streams (turbo-rails replacement, SSE-based)



102
103
104
# File 'lib/pgbus/configuration.rb', line 102

def streams_signed_name_secret
  @streams_signed_name_secret
end

#streams_stats_enabledObject

Streams (turbo-rails replacement, SSE-based)



102
103
104
# File 'lib/pgbus/configuration.rb', line 102

def streams_stats_enabled
  @streams_stats_enabled
end

#streams_test_modeObject

Streams (turbo-rails replacement, SSE-based)



102
103
104
# File 'lib/pgbus/configuration.rb', line 102

def streams_test_mode
  @streams_test_mode
end

#streams_write_deadline_msObject

Streams (turbo-rails replacement, SSE-based)



102
103
104
# File 'lib/pgbus/configuration.rb', line 102

def streams_write_deadline_ms
  @streams_write_deadline_ms
end

#visibility_timeoutObject

rubocop:disable Style/AccessorGrouping



15
16
17
# File 'lib/pgbus/configuration.rb', line 15

def visibility_timeout
  @visibility_timeout
end

#web_authObject

Web dashboard



97
98
99
# File 'lib/pgbus/configuration.rb', line 97

def web_auth
  @web_auth
end

#web_data_sourceObject

Web dashboard



97
98
99
# File 'lib/pgbus/configuration.rb', line 97

def web_data_source
  @web_data_source
end

#web_live_updatesObject

Web dashboard



97
98
99
# File 'lib/pgbus/configuration.rb', line 97

def web_live_updates
  @web_live_updates
end

#web_per_pageObject

Web dashboard



97
98
99
# File 'lib/pgbus/configuration.rb', line 97

def web_per_page
  @web_per_page
end

#web_refresh_intervalObject

Web dashboard



97
98
99
# File 'lib/pgbus/configuration.rb', line 97

def web_refresh_interval
  @web_refresh_interval
end

#workersObject

rubocop:disable Style/AccessorGrouping



15
16
17
# File 'lib/pgbus/configuration.rb', line 15

def workers
  @workers
end

#zombie_detectionObject

Zombie message detection — logs a warning when a message is redelivered (read_ct > 1) without any prior failure recorded in pgbus_failed_events.



90
91
92
# File 'lib/pgbus/configuration.rb', line 90

def zombie_detection
  @zombie_detection
end

Instance Method Details

#capsule(name, queues:, threads:) ⇒ Object

Define a named capsule and append it to the workers list.

c.capsule :critical, queues: %w[critical], threads: 5
c.capsule :gated, queues: %w[gated], threads: 1, single_active_consumer: true

Names must be unique. Queues must not overlap with capsules already defined (would cause double-processing). Composes with the string DSL —+c.workers “…”+ followed by c.capsule :name, … appends the named capsule to the list parsed from the string.

Raises:

  • (ArgumentError)


401
402
403
404
405
406
407
408
409
410
411
412
413
# File 'lib/pgbus/configuration.rb', line 401

def capsule(name, queues:, threads:, **)
  raise ArgumentError, "capsule queues must be a non-empty Array" unless queues.is_a?(Array) && queues.any?
  raise ArgumentError, "capsule threads must be a positive Integer" unless threads.is_a?(Integer) && threads.positive?

  normalized_name = name.to_s
  @workers ||= []

  raise ArgumentError, "capsule #{name.inspect} is already defined" if @workers.any? { |c| capsule_name(c) == normalized_name }

  validate_no_queue_overlap!(queues)

  @workers << { name: normalized_name, queues: queues, threads: threads, ** }
end

#capsule_named(name) ⇒ Object

Look up a capsule by its name. Accepts symbol or string. Returns the matching Hash, or nil. Used by the CLI’s –capsule selector.



417
418
419
420
421
422
# File 'lib/pgbus/configuration.rb', line 417

def capsule_named(name)
  return nil unless @workers

  key = name.to_s
  @workers.find { |c| capsule_name(c) == key }
end

#connection_optionsObject



535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
# File 'lib/pgbus/configuration.rb', line 535

def connection_options
  if database_url
    database_url
  elsif connection_params
    connection_params
  elsif defined?(ActiveRecord::Base)
    # Extract connection config from ActiveRecord so pgmq-ruby creates its
    # own dedicated PG connections. Sharing AR's raw_connection via a Proc
    # is NOT thread-safe: the ConnectionPool caches the PG::Connection from
    # whichever thread first called the Proc, then hands that same object to
    # other threads — causing nil results, segfaults, and data corruption
    # when AR and PGMQ issue concurrent queries on the same connection.
    extract_ar_connection_hash
  else
    raise ConfigurationError, "No database connection configured. " \
                              "Set Pgbus.configuration.database_url, connection_params, or use with Rails."
  end
end

#dead_letter_queue_name(name) ⇒ Object



220
221
222
# File 'lib/pgbus/configuration.rb', line 220

def dead_letter_queue_name(name)
  "#{queue_name(name)}#{Pgbus::DEAD_LETTER_SUFFIX}"
end

#execution_mode_for(worker_config) ⇒ Object

Returns the execution mode for a specific worker config hash, falling back to the global execution_mode setting.



236
237
238
239
# File 'lib/pgbus/configuration.rb', line 236

def execution_mode_for(worker_config)
  mode = worker_config[:execution_mode] || worker_config["execution_mode"] || execution_mode
  ExecutionPools.normalize_mode(mode)
end

#priority_queue_name(name, priority) ⇒ Object



224
225
226
# File 'lib/pgbus/configuration.rb', line 224

def priority_queue_name(name, priority)
  "#{queue_name(name)}_p#{priority}"
end

#priority_queue_names(name) ⇒ Object



228
229
230
231
232
# File 'lib/pgbus/configuration.rb', line 228

def priority_queue_names(name)
  return [queue_name(name)] unless priority_levels && priority_levels > 1

  (0...priority_levels).map { |p| priority_queue_name(name, p) }
end

#queue_name(name) ⇒ Object



215
216
217
218
# File 'lib/pgbus/configuration.rb', line 215

def queue_name(name)
  full = "#{queue_prefix}_#{name}"
  QueueNameValidator.normalize(full)
end

#resolved_pool_sizeObject



522
523
524
525
526
527
528
529
530
531
532
533
# File 'lib/pgbus/configuration.rb', line 522

def resolved_pool_size
  return pool_size if pool_size

  total = 0
  total += sum_thread_counts(workers, default_threads: 5, group: "worker") if role_enabled?(:workers)
  total += sum_thread_counts(event_consumers, default_threads: 3, group: "event_consumer") if role_enabled?(:consumers)
  total += 1 if role_enabled?(:dispatcher)
  total += 1 if role_enabled?(:scheduler)

  warn_if_oversized(total)
  total
end

#role_enabled?(role) ⇒ Boolean

Returns true if the given role should be booted by the supervisor. When roles is nil (the default), every role is enabled — this matches the legacy single-process behavior. When roles is set (e.g. via the CLI’s –workers-only / –scheduler-only / –dispatcher-only flags), only the listed roles boot.

Accepts symbol or string for case-insensitive comparison.

Returns:

  • (Boolean)


431
432
433
434
435
# File 'lib/pgbus/configuration.rb', line 431

def role_enabled?(role)
  return true if @roles.nil?

  @roles.include?(role.to_s.downcase.to_sym)
end

#validate!Object

Raises:

  • (ArgumentError)


267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# File 'lib/pgbus/configuration.rb', line 267

def validate!
  if pool_size && !(pool_size.is_a?(Numeric) && pool_size.positive?)
    raise ArgumentError, "pool_size must be a positive number or nil (auto-tune)"
  end

  raise ArgumentError, "pool_timeout must be > 0" unless pool_timeout.is_a?(Numeric) && pool_timeout.positive?
  raise ArgumentError, "polling_interval must be > 0" unless polling_interval.is_a?(Numeric) && polling_interval.positive?
  raise ArgumentError, "visibility_timeout must be > 0" unless visibility_timeout.is_a?(Numeric) && visibility_timeout.positive?
  raise ArgumentError, "max_retries must be >= 0" unless max_retries.is_a?(Integer) && max_retries >= 0
  raise ArgumentError, "retry_backoff must be > 0" unless retry_backoff.is_a?(Numeric) && retry_backoff.positive?
  raise ArgumentError, "retry_backoff_max must be > 0" unless retry_backoff_max.is_a?(Numeric) && retry_backoff_max.positive?
  unless retry_backoff_jitter.is_a?(Numeric) && retry_backoff_jitter >= 0 && retry_backoff_jitter <= 1
    raise ArgumentError, "retry_backoff_jitter must be between 0 and 1"
  end

  # Validate global execution_mode
  ExecutionPools.normalize_mode(execution_mode)

  Array(workers).each do |w|
    threads = w[:threads] || w["threads"] || 5
    raise ArgumentError, "worker threads must be > 0" unless threads.is_a?(Integer) && threads.positive?

    # Validate per-worker execution_mode override if present
    mode = w[:execution_mode] || w["execution_mode"]
    ExecutionPools.normalize_mode(mode) if mode
  end

  raise ArgumentError, "prefetch_limit must be > 0" if prefetch_limit && !(prefetch_limit.is_a?(Integer) && prefetch_limit.positive?)

  if priority_levels && !(priority_levels.is_a?(Integer) && priority_levels >= 1 && priority_levels <= 10)
    raise ArgumentError, "priority_levels must be an integer between 1 and 10"
  end

  unless insights_default_minutes.is_a?(Integer) && insights_default_minutes.positive?
    raise ArgumentError, "insights_default_minutes must be a positive integer"
  end

  validate_streams!

  self
end

#validate_streams!Object

Raises:

  • (ArgumentError)


309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/pgbus/configuration.rb', line 309

def validate_streams!
  unless streams_default_retention.is_a?(Numeric) && streams_default_retention >= 0
    raise ArgumentError, "streams_default_retention must be a non-negative number"
  end

  unless streams_max_connections.is_a?(Integer) && streams_max_connections.positive?
    raise ArgumentError, "streams_max_connections must be a positive integer"
  end

  unless streams_heartbeat_interval.is_a?(Numeric) && streams_heartbeat_interval.positive?
    raise ArgumentError, "streams_heartbeat_interval must be a positive number"
  end

  unless streams_idle_timeout.is_a?(Numeric) && streams_idle_timeout.positive?
    raise ArgumentError, "streams_idle_timeout must be a positive number"
  end

  unless streams_listen_health_check_ms.is_a?(Integer) && streams_listen_health_check_ms.positive?
    raise ArgumentError, "streams_listen_health_check_ms must be a positive integer"
  end

  unless streams_write_deadline_ms.is_a?(Integer) && streams_write_deadline_ms.positive?
    raise ArgumentError, "streams_write_deadline_ms must be a positive integer"
  end

  raise ArgumentError, "streams_retention must be a Hash" unless streams_retention.is_a?(Hash)
end