Module: Pgbus

Defined in:
lib/pgbus.rb,
lib/pgbus/cli.rb,
lib/pgbus/mcp.rb,
lib/pgbus/batch.rb,
lib/pgbus/event.rb,
lib/pgbus/client.rb,
lib/pgbus/engine.rb,
lib/pgbus/outbox.rb,
lib/pgbus/streams.rb,
lib/pgbus/testing.rb,
lib/pgbus/version.rb,
lib/pgbus/bus_record.rb,
lib/pgbus/mcp/runner.rb,
lib/pgbus/mcp/server.rb,
lib/pgbus/serializer.rb,
lib/pgbus/uniqueness.rb,
lib/pgbus/concurrency.rb,
lib/pgbus/dedup_cache.rb,
lib/pgbus/pgmq_schema.rb,
lib/pgbus/stat_buffer.rb,
lib/pgbus/streams/key.rb,
lib/pgbus/mcp/rack_app.rb,
lib/pgbus/mcp/redactor.rb,
lib/pgbus/rate_counter.rb,
lib/pgbus/web/streamer.rb,
lib/pgbus/config_loader.rb,
lib/pgbus/configuration.rb,
lib/pgbus/log_formatter.rb,
lib/pgbus/mcp/base_tool.rb,
lib/pgbus/outbox/poller.rb,
lib/pgbus/queue_factory.rb,
lib/pgbus/retry_backoff.rb,
lib/pgbus/error_reporter.rb,
lib/pgbus/process/worker.rb,
lib/pgbus/recurring/task.rb,
lib/pgbus/streams/cursor.rb,
lib/pgbus/web/stream_app.rb,
app/models/pgbus/job_lock.rb,
app/models/pgbus/job_stat.rb,
lib/pgbus/circuit_breaker.rb,
lib/pgbus/execution_pools.rb,
lib/pgbus/instrumentation.rb,
lib/pgbus/streams/filters.rb,
lib/pgbus/web/data_source.rb,
app/models/pgbus/semaphore.rb,
lib/pgbus/process/consumer.rb,
lib/pgbus/streams/envelope.rb,
lib/pgbus/streams/presence.rb,
lib/pgbus/streams/renderer.rb,
lib/pgbus/testing/minitest.rb,
lib/pgbus/autovacuum_tuning.rb,
lib/pgbus/client/read_after.rb,
lib/pgbus/event_bus/handler.rb,
lib/pgbus/process/heartbeat.rb,
lib/pgbus/process/lifecycle.rb,
lib/pgbus/streams/coalescer.rb,
lib/pgbus/table_maintenance.rb,
app/models/pgbus/batch_entry.rb,
app/models/pgbus/queue_state.rb,
app/models/pgbus/stream_stat.rb,
lib/pgbus/active_job/adapter.rb,
lib/pgbus/event_bus/registry.rb,
lib/pgbus/mcp/tools/dlq_tool.rb,
lib/pgbus/process/dispatcher.rb,
lib/pgbus/process/queue_lock.rb,
lib/pgbus/process/supervisor.rb,
lib/pgbus/recurring/schedule.rb,
lib/pgbus/streams/streamable.rb,
lib/pgbus/testing/assertions.rb,
lib/pgbus/web/authentication.rb,
app/models/pgbus/outbox_entry.rb,
lib/pgbus/active_job/executor.rb,
lib/pgbus/event_bus/publisher.rb,
lib/pgbus/mcp/health_analyzer.rb,
lib/pgbus/mcp/tools/jobs_tool.rb,
lib/pgbus/process/wake_signal.rb,
lib/pgbus/recurring/scheduler.rb,
lib/pgbus/streams/signed_name.rb,
app/models/pgbus/process_entry.rb,
lib/pgbus/client/notify_stream.rb,
lib/pgbus/event_bus/subscriber.rb,
lib/pgbus/mcp/tools/locks_tool.rb,
lib/pgbus/mcp/tools/stats_tool.rb,
lib/pgbus/queue_name_validator.rb,
app/models/pgbus/recurring_task.rb,
app/models/pgbus/uniqueness_key.rb,
lib/pgbus/concurrency/semaphore.rb,
lib/pgbus/failed_event_recorder.rb,
lib/pgbus/mcp/tools/health_tool.rb,
lib/pgbus/mcp/tools/queues_tool.rb,
lib/pgbus/recurring/command_job.rb,
lib/pgbus/web/streamer/instance.rb,
lib/pgbus/web/streamer/listener.rb,
lib/pgbus/web/streamer/registry.rb,
app/helpers/pgbus/streams_helper.rb,
app/models/pgbus/processed_event.rb,
lib/pgbus/integrations/appsignal.rb,
lib/pgbus/process/signal_handler.rb,
lib/pgbus/web/metrics_serializer.rb,
lib/pgbus/web/streamer/heartbeat.rb,
lib/pgbus/web/streamer/io_writer.rb,
lib/pgbus/process/notify_listener.rb,
lib/pgbus/recurring/config_loader.rb,
lib/pgbus/web/streamer/connection.rb,
app/models/pgbus/blocked_execution.rb,
lib/pgbus/mcp/tools/processes_tool.rb,
lib/pgbus/mcp/tools/recurring_tool.rb,
app/models/pgbus/application_record.rb,
lib/generators/pgbus/migration_path.rb,
lib/pgbus/configuration/capsule_dsl.rb,
lib/pgbus/mcp/tools/dlq_detail_tool.rb,
lib/pgbus/mcp/tools/job_detail_tool.rb,
lib/pgbus/mcp/tools/throughput_tool.rb,
lib/pgbus/process/consumer_priority.rb,
app/helpers/pgbus/application_helper.rb,
app/models/pgbus/recurring_execution.rb,
lib/pgbus/client/ensure_stream_queue.rb,
lib/pgbus/execution_pools/async_pool.rb,
lib/pgbus/recurring/already_recorded.rb,
app/controllers/pgbus/jobs_controller.rb,
lib/generators/pgbus/update_generator.rb,
lib/pgbus/execution_pools/thread_pool.rb,
lib/pgbus/generators/config_converter.rb,
lib/pgbus/mcp/tools/queue_detail_tool.rb,
lib/pgbus/streams/turbo_broadcastable.rb,
lib/pgbus/web/streamer/stream_counter.rb,
app/controllers/pgbus/locks_controller.rb,
lib/generators/pgbus/install_generator.rb,
lib/pgbus/integrations/appsignal/probe.rb,
app/controllers/pgbus/events_controller.rb,
app/controllers/pgbus/locale_controller.rb,
app/controllers/pgbus/outbox_controller.rb,
app/controllers/pgbus/queues_controller.rb,
lib/pgbus/concurrency/blocked_execution.rb,
lib/pgbus/generators/migration_detector.rb,
lib/pgbus/streams/turbo_stream_override.rb,
app/controllers/pgbus/batches_controller.rb,
lib/pgbus/streams/broadcastable_override.rb,
lib/pgbus/web/streamer/falcon_connection.rb,
app/controllers/pgbus/insights_controller.rb,
lib/generators/pgbus/add_outbox_generator.rb,
app/controllers/pgbus/api/stats_controller.rb,
app/controllers/pgbus/dashboard_controller.rb,
app/controllers/pgbus/frontends_controller.rb,
app/controllers/pgbus/processes_controller.rb,
lib/generators/pgbus/add_presence_generator.rb,
lib/generators/pgbus/upgrade_pgmq_generator.rb,
lib/pgbus/integrations/appsignal/subscriber.rb,
app/controllers/pgbus/api/metrics_controller.rb,
app/controllers/pgbus/application_controller.rb,
app/controllers/pgbus/dead_letter_controller.rb,
lib/generators/pgbus/add_job_locks_generator.rb,
lib/generators/pgbus/add_job_stats_generator.rb,
lib/generators/pgbus/add_recurring_generator.rb,
lib/pgbus/streams/watermark_cache_middleware.rb,
app/controllers/pgbus/api/insights_controller.rb,
lib/pgbus/generators/database_target_detector.rb,
lib/generators/pgbus/tune_autovacuum_generator.rb,
lib/generators/pgbus/tune_fillfactor_generator.rb,
lib/pgbus/web/streamer/stream_event_dispatcher.rb,
lib/generators/pgbus/add_queue_states_generator.rb,
lib/generators/pgbus/add_stream_stats_generator.rb,
app/controllers/pgbus/recurring_tasks_controller.rb,
lib/generators/pgbus/migrate_job_locks_generator.rb,
lib/generators/pgbus/add_job_stats_latency_generator.rb,
lib/generators/pgbus/add_failed_events_index_generator.rb,
lib/generators/pgbus/add_job_stats_queue_index_generator.rb

Defined Under Namespace

Modules: ActiveJob, Api, ApplicationHelper, AutovacuumTuning, CLI, Concurrency, ConfigLoader, ErrorReporter, EventBus, ExecutionPools, Generators, Instrumentation, Integrations, LogFormatter, MCP, Outbox, PgmqSchema, Process, QueueFactory, QueueNameValidator, Recurring, RetryBackoff, Serializer, Streams, StreamsHelper, TableMaintenance, Testing, Uniqueness, Web Classes: ApplicationController, ApplicationRecord, Batch, BatchEntry, BatchesController, BlockedExecution, BusRecord, CircuitBreaker, Client, ConcurrencyLimitExceeded, Configuration, ConfigurationError, DashboardController, DeadLetterController, DeadLetterError, DedupCache, Engine, Error, Event, EventsController, FailedEventRecorder, FrontendsController, InsightsController, JobLock, JobNotUnique, JobStat, JobsController, LocaleController, LocksController, OutboxController, OutboxEntry, ProcessEntry, ProcessedEvent, ProcessesController, QueueNotFoundError, QueueState, QueuesController, RateCounter, ReadTimeoutError, RecurringExecution, RecurringTask, RecurringTasksController, SchemaNotReady, Semaphore, SerializationError, StatBuffer, StreamStat, UniquenessKey

Constant Summary collapse

DEAD_LETTER_SUFFIX =

Suffix appended to a queue name to derive its dead-letter companion (e.g. “pgbus_default” -> “pgbus_default_dlq”). Hard-coded here because changing it on a running deployment would orphan every existing DLQ message; nothing in the codebase or in user reports has ever needed this to be configurable.

"_dlq"
VERSION =
"0.9.5"

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.stoppingObject



28
29
30
# File 'lib/pgbus.rb', line 28

def stopping
  @stopping || false
end

Class Method Details

.clientObject



97
98
99
# File 'lib/pgbus.rb', line 97

def client
  @client ||= Client.new(configuration)
end

.configurationObject



89
90
91
# File 'lib/pgbus.rb', line 89

def configuration
  @configuration ||= Configuration.new
end

.configure {|configuration| ... } ⇒ Object

Yields:



93
94
95
# File 'lib/pgbus.rb', line 93

def configure
  yield configuration
end

.loaderObject



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/pgbus.rb', line 32

def loader
  @loader ||= begin
    loader = Zeitwerk::Loader.for_gem
    loader.inflector.inflect(
      "pgbus" => "Pgbus",
      "cli" => "CLI",
      "dsl" => "DSL",
      "capsule_dsl" => "CapsuleDSL",
      "mcp" => "MCP"
    )
    loader.ignore("#{__dir__}/generators")
    loader.ignore("#{__dir__}/active_job")
    loader.ignore("#{__dir__}/pgbus/testing")
    # The MCP diagnostic server is optional — its tool classes subclass
    # MCP::Tool from the (optional) `mcp` gem. Keeping it out of Zeitwerk
    # means we never reference the gem's constants at autoload time;
    # `Pgbus::MCP.load!` (called by the `pgbus mcp` CLI command) requires
    # the gem and loads the subsystem explicitly.
    loader.ignore("#{__dir__}/pgbus/mcp")
    loader.ignore("#{__dir__}/pgbus/mcp.rb")
    # Vendor integrations are loaded conditionally (when the vendor gem
    # is present) by lib/pgbus/engine.rb. Keeping them out of Zeitwerk
    # means we don't reference vendor constants at autoload time.
    loader.ignore("#{__dir__}/pgbus/integrations")
    # lib/puma/plugin/pgbus_streams.rb is a Puma plugin — it's required
    # explicitly by the user from config/puma.rb via `plugin :pgbus_streams`.
    # Without this ignore, Zeitwerk scans lib/puma/ under the pgbus loader
    # root and tries to autoload Puma::Plugin, which collides with the real
    # Puma::Plugin class defined by the puma gem itself.
    loader.ignore("#{__dir__}/puma")
    loader
  end
end

.loggerObject



168
169
170
# File 'lib/pgbus.rb', line 168

def logger
  configuration.logger
end

.logger=(value) ⇒ Object



172
173
174
# File 'lib/pgbus.rb', line 172

def logger=(value)
  configuration.logger = value
end

.models_loaderObject

Separate loader for app/models used only in non-Rails contexts (specs, standalone scripts). When the Engine boots, Rails’ autoloader takes over app/models and this loader is torn down to avoid conflicts.



69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/pgbus.rb', line 69

def models_loader
  models_dir = File.expand_path("../app/models", __dir__)
  return nil unless File.directory?(models_dir)

  @models_loader ||= begin
    loader = Zeitwerk::Loader.new
    loader.tag = "pgbus-models"
    loader.push_dir(models_dir)
    loader.setup
    loader
  end
end

.reset!Object



153
154
155
156
157
158
# File 'lib/pgbus.rb', line 153

def reset!
  @client&.close
  @client = nil
  @configuration = nil
  @stream_cache = nil
end

.reset_client!Object

Discard the inherited PGMQ client after fork. Do NOT call close — the parent’s @pgmq_mutex is in undefined state post-fork and attempting to acquire it can deadlock. The next call to Pgbus.client will lazily create a fresh one.



164
165
166
# File 'lib/pgbus.rb', line 164

def reset_client!
  @client = nil
end

.stream(streamables, durable: nil) ⇒ Object

Entry point for the streams subsystem — ‘Pgbus.stream(name).broadcast(html)` or `Pgbus.stream(@order).current_msg_id`. Defined on Pgbus itself rather than inside lib/pgbus/streams.rb because that file is only Zeitwerk-loaded when Pgbus::Streams::Stream is first referenced — the chicken-and-egg problem means `Pgbus.stream(…)` would be undefined on the first call. Referencing Streams::Stream inside the method body forces Zeitwerk to load lib/pgbus/streams.rb lazily on first use, which is fine.

Caches Stream instances by logical name so high-frequency callers (e.g. Turbo::StreamsChannel.broadcast_stream_to inside an after_update_commit callback firing 1000x/sec) don’t allocate a new Stream + Mutex per broadcast. The cache is process-local; reset! clears it. The cache key is the resolved name string, not the raw streamables, so ‘Pgbus.stream(@order)` and `Pgbus.stream(@order)` in the same process return the same instance. `durable:` defaults to `nil` so the configuration resolves the mode: `streams_durable_patterns` first (exact string or regex match), then `streams_default_broadcast_mode`. Pass `durable: true`/`false` explicitly to bypass the resolver for this Stream instance.



120
121
122
123
124
125
126
# File 'lib/pgbus.rb', line 120

def stream(streamables, durable: nil)
  name = Streams::Stream.name_from(streamables)
  resolved = durable.nil? ? configuration.stream_durable?(name) : durable
  cache_key = "#{name}:#{resolved ? "d" : "e"}"
  @stream_cache ||= Concurrent::Map.new
  @stream_cache.compute_if_absent(cache_key) { Streams::Stream.new(streamables, durable: resolved) }
end

.stream_key(*parts) ⇒ Object

Compose a short, pgbus-safe stream identifier from any mix of records, strings, symbols, and arrays. Delegates to ‘Pgbus::Streams::Key.stream_key`; raises `ArgumentError` if the resulting key would overflow the pgbus queue-name budget. See `lib/pgbus/streams/key.rb` for the digest policy and rationale.

Pgbus.stream_key(chat, :messages)
# => "ai_chat_3a4f9c21b7d20e18:messages"

Pgbus.stream(Pgbus.stream_key(chat, :messages)).broadcast(html)


138
139
140
# File 'lib/pgbus.rb', line 138

def stream_key(*parts, **)
  Streams::Key.stream_key(*parts, **)
end

.stream_key!(key) ⇒ Object

Accepts an already-built stream key verbatim (no re-keying), only enforcing the queue-name budget. Use when you hold a key string and want to pass it to both ‘turbo_stream_from` and a broadcaster without the colon-separator guard raising on the second call.

key = Pgbus.stream_key(chat, :messages)
Pgbus.stream(Pgbus.stream_key!(key)).broadcast(html)


149
150
151
# File 'lib/pgbus.rb', line 149

def stream_key!(key)
  Streams::Key.stream_key!(key)
end

.teardown_models_loader!Object



82
83
84
85
86
87
# File 'lib/pgbus.rb', line 82

def teardown_models_loader!
  return unless @models_loader

  @models_loader.unregister
  @models_loader = nil
end