Pgbus
PostgreSQL-native job processing and event bus for Rails, built on PGMQ.
Why Pgbus? If you already run PostgreSQL, you don't need Redis for background jobs. Pgbus gives you ActiveJob integration, AMQP-style topic routing, dead letter queues, worker memory management, and a live dashboard -- all backed by your existing database.
Table of contents
- Features
- Requirements
- Installation
- Quick start
- Reliability
- Routing and ordering
- Persistence and batching
- Observability
- Real-time broadcasts
- Testing
- Operations
- Reference
- Development
- License
Features
- ActiveJob adapter -- drop-in replacement, zero config migration from other backends
- Turbo Streams replacement --
pgbus_stream_fromdrops into turbo-rails apps with no ActionCable, no Redis, no lost messages on reconnect. Includes transactional broadcasts (deferred until commit), backlog replay on connect, server-side audience filtering, and presence tracking. Fixes rails/rails#52420, hotwired/turbo#1261, and hotwired/turbo-rails#674. - Event bus -- publish/subscribe with AMQP-style topic routing (
orders.#,payments.*) - Dead letter queues -- automatic DLQ routing after configurable retries
- Worker recycling -- memory, job count, and lifetime limits prevent runaway processes
- LISTEN/NOTIFY -- instant wake-up, polling as fallback only
- Idempotent events -- deduplication via
(event_id, handler_class)unique index with in-memory cache - Live dashboard -- Turbo Frames auto-refresh with throughput rate, no ActionCable required
- Supervisor/worker model -- forked processes with heartbeat monitoring and lifecycle state machine
- Priority queues -- route jobs to priority sub-queues, highest-priority-first processing
- Circuit breaker -- auto-pause queues after consecutive failures, exponential backoff
- Queue pause/resume -- manual or automatic via dashboard
- Prefetch flow control -- cap in-flight messages per worker to prevent overload
- Archive compaction -- automatic purge of old archived messages
- Transactional outbox -- publish events atomically inside database transactions
- Single active consumer -- advisory-lock-based exclusive queue processing for strict ordering
- Consumer priority -- higher-priority workers get first dibs, lower-priority workers back off
- Job uniqueness -- prevent duplicate jobs with reaper-based crash recovery, no TTL-driven expiry
- Retry backoff -- exponential backoff with jitter for VT-based retries, per-job overrides
- Error reporting -- pluggable error reporters for APM integration (Appsignal, Sentry, etc.)
- Structured logging -- JSON log formatter with component extraction and thread-local context
- Queue health -- dead tuple monitoring, autovacuum tuning, Prometheus metrics
Requirements
- Ruby >= 3.3
- Rails >= 7.1
- PostgreSQL with the PGMQ extension
Installation
Add to your Gemfile:
gem "pgbus"
Then install the PGMQ extension in your database:
CREATE EXTENSION IF NOT EXISTS pgmq;
Quick start
1. Configure (optional)
Pgbus works with zero config in Rails -- it uses your existing ActiveRecord connection. For custom setups, drop a Ruby initializer:
# config/initializers/pgbus.rb
Pgbus.configure do |c|
c.queue_prefix = "myapp"
c.max_retries = 5
c.visibility_timeout = 30.seconds # ActiveSupport::Duration accepted
c.idempotency_ttl = 7.days
# Worker recycling — prevents long-lived processes from leaking memory
c.max_jobs_per_worker = 10_000
c.max_memory_mb = 512
c.max_worker_lifetime = 1.hour
# Capsule string DSL — Sidekiq-style "queues: threads; queues: threads"
c.workers = "default, mailers: 10; critical: 5"
# Or use named capsules with advanced options
c.capsule :ordered, queues: %w[ordered_events], threads: 1, single_active_consumer: true
end
The capsule string DSL is the shortest form for the common case. Use c.capsule when you need named capsules with advanced options like single_active_consumer or consumer_priority. See Routing and ordering for the full set.
Upgrading from an older pgbus? Run
rails generate pgbus:update. It does two things in one pass:
- Converts any legacy
config/pgbus.ymlto a Ruby initializer atconfig/initializers/pgbus.rb(skipped if the initializer already exists).- Inspects your live database and adds any missing pgbus migrations to
db/migrate(ordb/pgbus_migrateif you useconnects_to). The generator detects your separate-database config automatically fromPgbus.configuration.connects_toor by scanning the initializer /config/application.rb, so you don't have to re-specify--database=pgbusevery time.Useful flags:
--dry-run(print the plan without creating files),--skip-config,--skip-migrations,--quiet. Running it on a database with no pgbus tables at all will redirect you topgbus:installinstead of stacking individual add_* migrations.
2. Use as ActiveJob backend
# config/application.rb
config.active_job.queue_adapter = :pgbus
That's it. Your existing jobs work unchanged:
class OrderConfirmationJob < ApplicationJob
queue_as :mailers
def perform(order)
OrderMailer.confirmation(order).deliver_now
end
end
# Enqueue
OrderConfirmationJob.perform_later(order)
# Schedule
OrderConfirmationJob.set(wait: 5.minutes).perform_later(order)
3. Event bus (optional)
Publish events with AMQP-style topic routing:
# Publish an event
Pgbus::EventBus::Publisher.publish(
"orders.created",
{ order_id: order.id, total: order.total }
)
# Publish with delay
Pgbus::EventBus::Publisher.publish_later(
"invoices.due",
{ invoice_id: invoice.id },
delay: 30.days
)
Subscribe with handlers:
# app/handlers/order_created_handler.rb
class OrderCreatedHandler < Pgbus::EventBus::Handler
idempotent! # Deduplicate by (event_id, handler_class)
def handle(event)
order_id = event.payload["order_id"]
Analytics.track_order(order_id)
InventoryService.reserve(order_id)
end
end
# Register in an initializer
Pgbus::EventBus::Registry.instance.subscribe(
"orders.created",
OrderCreatedHandler
)
# Wildcard patterns
Pgbus::EventBus::Registry.instance.subscribe(
"orders.#", # matches orders.created, orders.updated, orders.shipped.confirmed
OrderAuditHandler
)
4. Start workers
bundle exec pgbus start
This boots a supervisor that manages:
- Workers -- process ActiveJob queues
- Dispatcher -- runs maintenance tasks (idempotency cleanup, stale process reaping)
- Consumers -- process event bus messages
5. Mount the dashboard
# config/routes.rb
mount Pgbus::Engine => "/pgbus"
The dashboard shows queues, jobs, processes, failures, dead letter messages, and event subscribers. It auto-refreshes via Turbo Frames with no WebSocket dependency.
Protect it in production with a simple auth lambda:
Pgbus.configure do |config|
config.web_auth = ->(request) {
request.env["warden"].user&.admin?
}
end
Or inherit from your own authenticated controller (like mission_control-jobs):
Pgbus.configure do |config|
config.base_controller_class = "Admin::BaseController"
end
When base_controller_class is set, all dashboard controllers inherit from that class instead of ActionController::Base. This is the recommended approach when mounting the dashboard inside an authenticated namespace -- your base controller's before_action filters, helper methods, and authentication logic apply automatically without monkey-patching.
Add a "back to app" button in the dashboard nav to return to your main application:
Pgbus.configure do |config|
config.return_to_app_url = "/admin"
end
Reliability
These features stop bad jobs from cascading into outages: deduplication, concurrency caps, automatic queue pausing on repeated failures, in-flight backpressure, and worker recycling.
Job uniqueness
Prevent duplicate jobs from running. Unlike limits_concurrency (which controls how many jobs with the same key run), uniqueness guarantees at most one job with a given key exists in the system at any time.
class ImportOrderJob < ApplicationJob
ensures_uniqueness strategy: :until_executed,
key: ->(order_id) { "import-order-#{order_id}" },
on_conflict: :reject
def perform(order_id)
# Only ONE instance per order_id can exist — from enqueue through completion.
# If another ImportOrderJob for this order_id is already enqueued or running,
# the duplicate is rejected immediately.
end
end
Strategies
| Strategy | Lock acquired | Lock released | Prevents |
|---|---|---|---|
:until_executed |
At enqueue | On completion or DLQ | Duplicate enqueue AND execution |
:while_executing |
At execution start | On completion or DLQ | Duplicate execution only |
Conflict policies
| Policy | Behavior |
|---|---|
:reject |
Raise Pgbus::JobNotUnique (default) |
:discard |
Silently drop the duplicate |
:log |
Log a warning and drop |
Lock lifecycle
The lock is never released by a timer. It is held as long as the job exists in the system:
Enqueue ──→ pgbus_job_locks (state: queued, owner_pid: nil)
│
Worker picks up job
│
▼
claim_for_execution! (state: executing, owner_pid: PID)
│
┌───────┴───────┐
▼ ▼
Success Crash
release! (lock orphaned)
(row deleted) │
▼
Reaper checks:
Is owner_pid in pgbus_processes
with fresh heartbeat?
│
┌─────┴─────┐
No Yes
▼ ▼
release! (keep lock,
(orphaned) job is running)
Crash recovery works through the reaper (runs every 5 minutes in the dispatcher). It cross-references owner_pid in pgbus_job_locks against pgbus_processes heartbeats. If the owning worker has no fresh heartbeat, the lock is orphaned and released — the PGMQ message's visibility timeout will expire and the job will be retried by another worker.
A last-resort TTL (default 24 hours) handles the case where the entire pgbus supervisor is dead and the reaper itself can't run.
Uniqueness vs concurrency controls
ensures_uniqueness |
limits_concurrency |
|
|---|---|---|
| Purpose | Prevent duplicate jobs | Limit concurrent execution slots |
| Lock type | Binary lock (one or none) | Counting semaphore (up to N) |
| At enqueue | :until_executed blocks duplicates |
Checks semaphore, blocks/discards/raises |
| At execution | :while_executing blocks duplicate runs |
Not checked (semaphore acquired at enqueue) |
| Duplicate in queue | :until_executed: impossible. :while_executing: allowed, only one runs |
Allowed up to N, rest blocked |
| Crash recovery | Reaper checks heartbeats | Semaphore expires_at + dispatcher cleanup |
| Use when | "This exact job must not run twice" | "At most N of these can run at once" |
When to use which:
- Payment processing, order import, unique email sends →
ensures_uniqueness - Rate-limited API calls, resource-constrained tasks →
limits_concurrency - Both at once → combine them (they use separate tables, no conflicts)
Setup
rails generate pgbus:add_job_locks # Add the migration
rails generate pgbus:add_job_locks --database=pgbus # For separate database
Concurrency controls
Limit how many jobs with the same key can run concurrently:
class ProcessOrderJob < ApplicationJob
limits_concurrency to: 1,
key: ->(order_id) { "ProcessOrder-#{order_id}" },
duration: 15.minutes,
on_conflict: :block
def perform(order_id)
# Only one job per order_id runs at a time
end
end
Options
| Option | Default | Description |
|---|---|---|
to: |
(required) | Maximum concurrent jobs for the same key |
key: |
Job class name | Proc receiving job arguments, returns a string key |
duration: |
15.minutes |
Safety expiry for the semaphore (crashed worker recovery) |
on_conflict: |
:block |
What to do when the limit is reached |
Conflict strategies
| Strategy | Behavior |
|---|---|
:block |
Hold the job in a blocked queue. It is automatically released when a slot opens or the semaphore expires. |
:discard |
Silently drop the job. |
:raise |
Raise Pgbus::ConcurrencyLimitExceeded so the caller can handle it. |
How concurrency works
- Enqueue: The adapter checks a semaphore table for the concurrency key. If under the limit, it increments the counter and sends the job to PGMQ. If at the limit, it applies the
on_conflictstrategy. - Complete: After a job succeeds or is dead-lettered, the executor signals the concurrency system via an
ensureblock (guaranteeing the signal fires even if the archive step fails). It first tries to promote a blocked job (atomic delete + enqueue in a single transaction). If nothing to promote, it releases the semaphore slot. - Safety net: The dispatcher periodically cleans up expired semaphores and orphaned blocked executions to recover from crashed workers.
Concurrency compared to other backends
Pgbus, SolidQueue, GoodJob, and Sidekiq all offer concurrency controls, but with fundamentally different locking strategies and trade-offs.
Architecture comparison
| Pgbus | SolidQueue | GoodJob | Sidekiq Enterprise | |
|---|---|---|---|---|
| Lock backend | PostgreSQL rows (pgbus_semaphores table) |
PostgreSQL rows (solid_queue_semaphores) |
PostgreSQL advisory locks (pg_advisory_xact_lock) |
Redis sorted sets (lease-based) |
| Lock granularity | Counting semaphore (allows N concurrent) | Counting semaphore (allows N concurrent) | Count query under advisory lock | Sorted set entries with TTL |
| Acquire mechanism | Atomic INSERT ... ON CONFLICT DO UPDATE WHERE value < max (single SQL) |
UPDATE ... SET value = value + 1 WHERE value < limit |
pg_advisory_xact_lock then SELECT COUNT(*) in rolled-back txn |
Redis Lua script (atomic check-and-add) |
| At-limit behavior | :block (hold in queue), :discard, or :raise |
Blocks in solid_queue_blocked_executions |
Enqueue: silently dropped. Perform: retry with backoff (forever) | Reschedule with backoff (raises OverLimit, middleware re-enqueues) |
| Blocked job storage | pgbus_blocked_executions table with priority ordering |
solid_queue_blocked_executions table |
No blocked queue — retries via ActiveJob retry mechanism | No blocked queue — job returns to Redis queue with delay |
| Release on completion | ensure block: promote next blocked job or decrement semaphore |
Inline after finished/failed_with (inside same transaction as of PR #689) |
Release advisory lock via pg_advisory_unlock |
Lease auto-expires from sorted set |
| Crash recovery | Semaphore expires_at + dispatcher expire_stale cleanup |
Semaphore expires_at + concurrency maintenance task |
Advisory locks auto-release on session disconnect | TTL-based lease expiry (default 5 min) |
| Message lifecycle | PGMQ visibility timeout (FOR UPDATE SKIP LOCKED) — message stays in queue until archived |
AR-backed claimed_executions table |
AR-backed good_jobs table with advisory lock per row |
Redis list + sorted set |
Key design differences
Pgbus uses PGMQ's native FOR UPDATE SKIP LOCKED for message claiming and a separate semaphore table for concurrency control. This two-layer approach means the message queue and concurrency system are independent — PGMQ handles exactly-once delivery, the semaphore handles admission control. The semaphore acquire is a single atomic SQL (INSERT ... ON CONFLICT DO UPDATE WHERE value < max), avoiding the need for explicit row locks.
SolidQueue uses AR models for everything — jobs, claimed executions, and semaphores all live in PostgreSQL tables. This means the entire lifecycle can be wrapped in AR transactions. However, as documented in rails/solid_queue#689, this model is vulnerable to race conditions when semaphore expiry, job completion, and blocked-job release interleave across transactions. Pgbus avoids several of these by design: PGMQ's visibility timeout handles message recovery without a claimed_executions table, and there is no "release during shutdown" codepath.
GoodJob takes a different approach entirely: advisory locks. Each job dequeue acquires a session-level advisory lock on the job row, and concurrency checks use transaction-scoped advisory locks on the concurrency key. This means the check and the perform are serialized at the database level. The downside is that advisory locks are session-scoped — if a connection is returned to the pool without unlocking, the lock persists. GoodJob handles this by auto-releasing on session disconnect, but connection pool sharing between web and worker can cause surprising behavior.
Sidekiq Enterprise uses Redis sorted sets with TTL-based leases. Each concurrent slot is a sorted set entry with an expiry timestamp. This is fast and simple but has no durability guarantee — Redis failover can lose leases, temporarily allowing over-limit execution. The sidekiq-unique-jobs gem (open-source) uses a similar Lua-script approach but with more lock strategies (:until_executing, :while_executing, :until_and_while_executing) and configurable conflict handlers (:reject, :reschedule, :replace, :raise).
Race condition resilience
| Scenario | Pgbus | SolidQueue | GoodJob | Sidekiq |
|---|---|---|---|---|
| Worker crash mid-execution | PGMQ visibility timeout expires → message re-read. Semaphore expires via expire_stale. |
claimed_execution survives → supervisor's process pruning calls fail_all_with. |
Advisory lock released on session disconnect. | Lease TTL expires in Redis. |
| Blocked job released while original still executing | Not possible — promote only happens in signal_concurrency, which only runs after job success/DLQ. |
Fixed in PR #689 — now checks for claimed executions before releasing. | N/A — no blocked queue; retries independently. | N/A — no blocked queue. |
| Archive succeeds but signal fails | ensure block guarantees signal fires even if archive raises. For SIGKILL: semaphore expires via dispatcher. |
Fixed in PR #689 — unblock_next_job moved inside same transaction as finished. |
Advisory lock released by session disconnect. | Lease auto-expires. |
| Concurrent enqueue and signal race | Semaphore acquire is a single atomic SQL — no read-then-write gap. | Fixed in PR #689 — FOR UPDATE lock on semaphore row serializes enqueue with signal. |
pg_advisory_xact_lock serializes the concurrency check. |
Redis Lua script is atomic. |
Circuit breaker and queue pause/resume
Pgbus automatically pauses queues that fail repeatedly, preventing cascading failures.
Pgbus.configure do |config|
config.circuit_breaker_enabled = true # default
end
The trip threshold (5 consecutive failures), base backoff (30s), and
max backoff (600s) are tuned via constants on Pgbus::CircuitBreaker.
Override the constants in an initializer if you need different values —
they are not exposed as configuration because tweaking them at runtime
has never proved useful in practice.
When a queue hits the failure threshold:
- The circuit breaker auto-pauses the queue with exponential backoff
- After the backoff expires, the queue auto-resumes and the trip counter resets
- If failures continue, each trip doubles the backoff (capped at
MAX_BACKOFF)
You can also manually pause/resume queues from the dashboard. The pause state is stored in the pgbus_queue_states table and survives restarts.
rails generate pgbus:add_queue_states # Add the queue_states migration
rails generate pgbus:add_queue_states --database=pgbus # For separate database
Prefetch flow control
Cap the number of in-flight (claimed but unfinished) messages per worker:
Pgbus.configure do |config|
config.prefetch_limit = 20 # nil = unlimited (default)
end
The worker tracks in-flight messages with an atomic counter and only fetches min(idle_threads, prefetch_available) messages per cycle. The counter is decremented in an ensure block so it never gets stuck.
Worker recycling
Pgbus workers recycle themselves to prevent memory bloat. This is the main reliability difference vs. solid_queue, which leaves workers alive forever.
Pgbus.configure do |config|
config.max_jobs_per_worker = 10_000 # Restart after 10k jobs
config.max_memory_mb = 512 # Restart if memory exceeds 512MB
config.max_worker_lifetime = 1.hour # Restart after 1 hour
end
When a limit is hit, the worker drains its thread pool, exits, and the supervisor forks a fresh process. RSS memory is sampled from /proc/self/statm (Linux) or ps -o rss (macOS).
Retry backoff
When a job fails, Pgbus extends the PGMQ visibility timeout with exponential backoff so retries are spread out instead of bunched at fixed intervals:
Pgbus.configure do |config|
config.retry_backoff = 5 # base delay (seconds)
config.retry_backoff_max = 300 # cap at 5 minutes
config.retry_backoff_jitter = 0.15 # +-15% randomization
end
The delay formula is base * 2^(attempt-1) * (1 + random_jitter). For a job that fails 4 times with defaults: ~5s, ~10s, ~20s, ~40s before hitting DLQ on the 5th read.
Jobs can override the global settings per-class:
class FragileApiJob < ApplicationJob
include Pgbus::RetryBackoff::JobMixin
pgbus_retry_backoff base: 10, max: 600, jitter: 0.2
def perform(...)
# ...
end
end
Async execution mode (fibers)
Workers can optionally execute jobs as fibers instead of threads. This is ideal for I/O-bound workloads (HTTP calls, email delivery, LLM API calls) where jobs spend most of their time waiting on network I/O.
Pgbus.configure do |config|
# Global: all workers use async mode
config.execution_mode = :async
# Or per-worker: mix thread and async workers
config.workers = [
{ queues: %w[webhooks emails], threads: 100, execution_mode: :async },
{ queues: %w[default], threads: 5 } # stays thread-based
]
end
Prerequisites:
- Add
gem "async"to your Gemfile - Set
config.active_support.isolation_level = :fiberin your Rails app
Why it reduces connections: In thread mode, each thread holds a database connection while waiting on I/O. With 50 threads, that's 50 connections. In async mode, 50 fibers share 3-5 connections because fibers yield during I/O and only one runs at a time.
CLI flag: pgbus start --execution-mode async
Safety: Messages stay in PGMQ with visibility timeout protection regardless of execution mode. If a fiber or worker crashes, the visibility timeout expires and messages become available for re-read. No data loss risk.
Not recommended for: CPU-bound jobs (image processing, heavy computation). These block the single reactor thread and should use thread mode.
Routing and ordering
How messages flow between producers and the workers that handle them: priority sub-queues, consumer priority for active/standby workers, and single-active-consumer for strict ordering.
Priority queues
Route jobs to priority sub-queues so high-priority work is processed first:
Pgbus.configure do |config|
config.priority_levels = 3 # Creates _p0, _p1, _p2 sub-queues per logical queue
config.default_priority = 1 # Jobs without explicit priority go to _p1
end
Workers read from _p0 (highest) first, then _p1, then _p2. Only when higher-priority sub-queues are empty does the worker read from lower ones.
Use ActiveJob's built-in priority attribute:
class CriticalAlertJob < ApplicationJob
queue_as :default
queue_with_priority 0 # Highest priority
def perform(alert_id)
# ...
end
end
class ReportJob < ApplicationJob
queue_as :default
queue_with_priority 2 # Lowest priority
def perform(report_id)
# ...
end
end
When priority_levels is nil (default), priority queues are disabled and all jobs go to a single queue per logical name.
Consumer priority
When multiple workers subscribe to the same queues, higher-priority workers process messages first. Lower-priority workers back off (3x polling interval) when a higher-priority worker is active.
Pgbus.configure do |c|
c.capsule :primary, queues: %w[default], threads: 10, consumer_priority: 10
c.capsule :fallback, queues: %w[default], threads: 5, consumer_priority: 0
end
Priority is stored in heartbeat metadata. Workers check the pgbus_processes table to discover higher-priority peers. When a high-priority worker goes stale (no heartbeat for 5 minutes), lower-priority workers automatically resume normal polling.
Single active consumer
For queues that require strict ordering, enable single active consumer mode. Only one worker process can read from a queue at a time — others skip it and process other queues.
Pgbus.configure do |c|
c.capsule :ordered_primary, queues: %w[ordered_events], threads: 1, single_active_consumer: true
c.capsule :ordered_standby, queues: %w[ordered_events], threads: 1, single_active_consumer: true
end
Uses PostgreSQL session-level advisory locks (pg_try_advisory_lock). The lock is non-blocking — workers that can't acquire it simply skip the queue. Locks auto-release on connection close (including crashes), so failover is automatic. The standby capsule takes over within one polling tick if the primary dies.
Persistence and batching
How Pgbus integrates with your application's transactions and tracks groups of related work: outbox for atomic publish, batches for fan-out coordination, archive compaction for keeping the queue tables small.
Batches
Coordinate groups of jobs with callbacks when all complete:
batch = Pgbus::Batch.new(
on_finish: BatchFinishedJob,
on_success: BatchSucceededJob,
on_discard: BatchFailedJob,
description: "Import users",
properties: { initiated_by: current_user.id }
)
batch.enqueue do
users.each { |user| ImportUserJob.perform_later(user.id) }
end
Callbacks
| Callback | Fired when |
|---|---|
on_finish |
All jobs completed (success or discard) |
on_success |
All jobs completed successfully (zero discarded) |
on_discard |
At least one job was dead-lettered |
Callback jobs receive the batch properties hash as their argument:
class BatchFinishedJob < ApplicationJob
def perform(properties)
user = User.find(properties["initiated_by"])
ImportMailer.complete(user).deliver_later
end
end
How batches work
Batch.new(...)creates a tracking row inpgbus_batcheswithstatus: "pending"batch.enqueue { ... }tags each enqueued job with thepgbus_batch_idin its payload- After each job completes or is dead-lettered, the executor atomically updates the batch counters
- When
completed_jobs + discarded_jobs == total_jobs, the batch status flips to"finished"and callback jobs are enqueued - The dispatcher cleans up finished batches older than 7 days
Transactional outbox
Publish events atomically inside your database transactions. A background poller moves outbox entries to PGMQ.
rails generate pgbus:add_outbox # Add the outbox migration
rails generate pgbus:add_outbox --database=pgbus # For separate database
Pgbus.configure do |config|
config.outbox_enabled = true
config.outbox_poll_interval = 1.0 # seconds
config.outbox_batch_size = 100
config.outbox_retention = 1.day # ActiveSupport::Duration also accepted
end
Usage:
ActiveRecord::Base.transaction do
order = Order.create!(params)
# Published atomically with the order — if the transaction rolls back,
# the outbox entry is also rolled back. No lost or phantom events.
Pgbus::Outbox.publish("default", { order_id: order.id })
# For topic-based event bus:
Pgbus::Outbox.publish_event("orders.created", { order_id: order.id })
end
The outbox poller uses FOR UPDATE SKIP LOCKED inside a transaction to claim entries, publishes them to PGMQ, and marks them as published. Failed entries are skipped and retried next cycle.
Archive compaction
PGMQ archive tables grow unbounded. Pgbus automatically purges old entries:
Pgbus.configure do |config|
config.archive_retention = 7.days # ActiveSupport::Duration (default 7 days)
end
The compaction loop runs every hour and deletes up to 1000 rows per
queue per cycle. Both knobs live as constants on
Pgbus::Process::Dispatcher (ARCHIVE_COMPACTION_INTERVAL,
ARCHIVE_COMPACTION_BATCH_SIZE) — they have never been worth surfacing
as configuration. The dispatcher runs archive compaction as part of its
maintenance loop, deleting archived messages older than archive_retention
in batches to avoid long-running transactions.
Observability
Error reporting, structured logging, and queue health monitoring.
Error reporting
By default, Pgbus logs caught exceptions and continues. To route them to your APM service (Appsignal, Sentry, Honeybadger, etc.), push callable reporters onto config.error_reporters:
Pgbus.configure do |c|
c.error_reporters << ->(ex, ctx) {
Appsignal.set_error(ex) { |t| t.(ctx) }
}
end
Each reporter receives (exception, context_hash). The context hash includes keys like action, queue, job_class, and msg_id depending on the call site. Reporters that accept a third argument also receive the Pgbus configuration object.
Reporters are wired into all critical rescue paths: job execution failures, worker fetch/process errors, dispatcher maintenance, supervisor fork failures, circuit breaker trips, outbox publish errors, and failed event recording. Non-critical paths (dashboard queries, stat recording) remain log-only.
ErrorReporter.report is guaranteed to never raise — if a reporter or the logger itself throws, the error is swallowed silently. This preserves fault-tolerance invariants at every rescue site.
Structured logging
Pgbus ships two log formatters inspired by Sidekiq's Logger::Formatters:
Pgbus.configure do |c|
c.log_format = :json # or :text (default)
end
Text format (default):
INFO 2025-01-15T10:30:00.000Z pid=1234 tid=abc queue=default: Starting job
JSON format:
{"ts":"2025-01-15T10:30:00.000Z","pid":1234,"tid":"abc","lvl":"INFO","component":"Pgbus","msg":"Starting job","ctx":{"queue":"default"}}
The JSON formatter extracts [Pgbus] and [Pgbus::Web] prefixes from log messages into a separate component field so the msg field stays clean for log aggregators. Thread-local context can be added via Pgbus::LogFormatter.with_context(queue: "default") { ... } and appears under the ctx key.
You can also set a formatter directly on the logger:
Pgbus.configure do |c|
c.logger.formatter = Pgbus::LogFormatter::JSON.new
end
Queue health monitoring
The dashboard includes a Queue Health panel showing PostgreSQL vacuum stats per PGMQ table: dead tuple counts, live tuple counts, bloat ratio (dead / total), last vacuum age, and MVCC horizon age. The same stats appear on individual queue detail pages.
Autovacuum tuning
PGMQ queue tables have high insert/delete churn that overwhelms PostgreSQL's default autovacuum settings. Pgbus applies aggressive per-table tuning automatically:
- New queues at runtime:
Client#ensure_single_queueapplies tuning afterpgmq.create() - Existing installations:
rails generate pgbus:updatedetects untuned tables - Fresh installs: The install migration includes tuning for the default queue
To apply tuning manually or after db:schema:load (which loses ALTER TABLE settings):
rails generate pgbus:tune_autovacuum # Generate migration
rails generate pgbus:tune_autovacuum --database=pgbus # For separate database
The pgbus:tune_autovacuum rake task also hooks into db:schema:load automatically.
Prometheus metrics
When config.metrics_enabled = true (default), the dashboard exposes Prometheus-compatible gauges:
| Metric | Description |
|---|---|
pgbus_table_dead_tuples |
Dead tuple count per PGMQ table |
pgbus_table_live_tuples |
Live tuple count per PGMQ table |
pgbus_table_bloat_ratio |
Dead / (dead + live) per table |
pgbus_table_last_vacuum_age_seconds |
Seconds since last vacuum per table |
pgbus_oldest_transaction_age_seconds |
MVCC horizon pin risk |
pgbus_worker_pool_capacity |
Total worker thread slots |
pgbus_worker_pool_busy |
Currently busy worker threads |
pgbus_worker_pool_utilization |
Busy / capacity ratio |
Real-time broadcasts (turbo-streams replacement)
Pgbus ships a drop-in replacement for turbo-rails' turbo_stream_from helper that fixes several well-known ActionCable correctness bugs by using PGMQ message IDs as a replay cursor. Same API as turbo-rails. No Redis. No ActionCable. No lost messages on reconnect.
Bugs fixed:
- rails/rails#52420 -- "page born stale": a broadcast that fires between controller render and WebSocket subscribe is silently lost with ActionCable. Pgbus captures a PGMQ
msg_idwatermark at render time and replays any messages published in the gap via the SSELast-Event-IDmechanism. - hotwired/turbo#1261 -- missed messages on reconnect. Pgbus persists the cursor on the client (EventSource's built-in
Last-Event-ID) and replays from the PGMQ archive on every reconnect. - hotwired/turbo-rails#674 -- no way to detect disconnect. Pgbus dispatches
pgbus:open,pgbus:gap-detected, andpgbus:closeDOM events on the stream element.
Usage
Swap turbo_stream_from for pgbus_stream_from in your view:
<%# Before %>
<%= turbo_stream_from @order %>
<%# After %>
<%= pgbus_stream_from @order %>
Everything else stays the same. The model concern keeps working unchanged:
class Order < ApplicationRecord
broadcasts_to ->(order) { [order.account, :orders] }
end
broadcasts_to, broadcast_replace_to, broadcasts_refreshes, broadcast_append_later_to, and every other Turbo::Broadcastable helper funnels through a single Turbo::StreamsChannel.broadcast_stream_to method that pgbus monkey-patches at engine boot. The signed-stream-name verification reuses Turbo.signed_stream_verifier_key so existing signed tokens Just Work.
Add the Puma plugin to config/puma.rb so SSE connections drain cleanly on deploy:
# config/puma.rb
plugin :pgbus_streams
Without the plugin, Puma closes hijacked SSE sockets abruptly during graceful restart, which looks to browsers like a network error and triggers an immediate reconnect. With the plugin, the streamer writes a pgbus:shutdown sentinel before the socket closes; browsers reconnect to the new worker and replay missed messages via Last-Event-ID.
Requirements
- Puma 6.1+ or Falcon. Streams use
rack.hijackby default. Puma 6.1+ supports it via partial hijack (thread-releasing, see puma/puma#1009). Falcon supports both the hijack path (via protocol-rack's emulation) and a native streaming body path (streams_falcon_streaming_body = true) that integrates with Falcon's fiber scheduler for better backpressure and connection lifecycle management. Unicorn, Pitchfork, and Passenger return HTTP 501 from the streams endpoint. - PostgreSQL LISTEN/NOTIFY.
config.listen_notify = true(the default). Stream queues override PGMQ's 250ms NOTIFY throttle to 0 so every broadcast fires individually. - HTTP/2 or HTTP/3 in production. SSE has a 6-connection-per-origin limit on HTTP/1.1; HTTP/2 lifts it. Falcon supports HTTP/2 natively without a reverse proxy.
Configuration
Pgbus.configure do |c|
c.streams_enabled = true # default
c.streams_queue_prefix = "pgbus_stream"
c.streams_default_retention = 5 * 60 # 5 minutes
c.streams_retention = { # per-stream overrides
/^chat_/ => 7 * 24 * 3600, # 7 days for chat history
"presence_room" => 30 # 30 seconds for presence
}
c.streams_heartbeat_interval = 15 # seconds
c.streams_max_connections = 2_000 # per web-server process (Puma worker or Falcon process)
c.streams_idle_timeout = 3_600 # close idle connections after 1h
c.streams_listen_health_check_ms = 250 # PG LISTEN keepalive + ensure_listening ack budget
c.streams_write_deadline_ms = 5_000 # write_nonblock deadline
c.streams_falcon_streaming_body = false # opt-in: Falcon-native streaming body
end
Falcon-native streaming body (opt-in)
When running on Falcon, enable native streaming body support for better integration with Falcon's fiber scheduler:
c.streams_falcon_streaming_body = true
With this flag, StreamApp returns [200, headers, Writable] instead of hijacking the socket. Falcon drives the response lifecycle with proper backpressure, connection cleanup, and fiber-scheduled IO. SSE writes go through Protocol::HTTP::Body::Writable which is fiber-safe and yields to other fibers when blocked.
Without the flag (default), Falcon uses the same rack.hijack path as Puma via protocol-rack's emulation. Both paths are tested and work correctly — the streaming body path is an optimization for Falcon deployments that want tighter scheduler integration.
How it works
Stream broadcasts are stored in PGMQ queues prefixed pgbus_stream_*. Each broadcast is assigned a monotonic msg_id by PGMQ. The pgbus_stream_from helper captures the current MAX(msg_id) at render time and embeds it in the HTML as since-id. When the SSE client connects, it sends that cursor as ?since= on the first request and as Last-Event-ID on reconnects. The streamer replays from pgmq.q_* (live) UNION pgmq.a_* (archive) for any msg_id > cursor, then switches to LISTEN/NOTIFY for the live path. There is no message identity gap between the render and the subscribe — the cursor model guarantees every broadcast is delivered exactly once, in order, even across reconnects.
One Puma worker (or Falcon reactor) hosts one Pgbus::Web::Streamer::Instance singleton with three threads (Listener / Dispatcher / Heartbeat) and one dedicated PG connection for LISTEN. Hijacked SSE sockets are held outside the web server's thread pool on Puma (confirmed by an integration test that fires 20 concurrent hijacked connections and observes them complete in parallel on an 8-thread Puma server, puma/puma#1009) and inside a fiber on Falcon (one fiber per hijacked connection, scheduler-backed non-blocking IO).
Don't use
ActionController::Livefor pgbus streams. It's the conventional Rails answer for SSE and it's the wrong one.Liveblocks inside@app.call(env)for the lifetime of the connection, which ties up a Puma thread per subscriber — the exact problem therack.hijack-based architecture above exists to avoid. Pgbus's streams endpoint is a mounted Rack app (not a Rails controller) so the temptation isn't even available, and arake pgbus:streams:lint_no_livetask fails CI if any pgbus controllerincludes it. If you're tempted to wire SSE through a Rails controller, usepgbus_stream_fromin your view instead — the helper handles the cursor, replay, reconnect, and Puma-thread-release concerns for you.
Per-stream retention is handled by the main pgbus dispatcher process on the same interval as the dispatcher's ARCHIVE_COMPACTION_INTERVAL constant. Streams default to a 5-minute retention because SSE clients reconnect within seconds; chat-style applications override the retention to days via streams_retention.
Stream name helpers
Apps using UUID primary keys with turbo-rails-style dom IDs can hit PGMQ's 47-character queue-name ceiling ("gid://app/Ai::Chat/9c14e8b2-...:messages" exceeds the limit before the pgbus_ prefix is even added). Pgbus provides helpers to generate short, collision-safe stream names:
# In your ApplicationRecord
class ApplicationRecord < ActiveRecord::Base
primary_abstract_class
include Pgbus::Streams::Streamable
end
This gives every model short_id (16-hex SHA-256 prefix of the GlobalID) and to_stream_key:
chat = Ai::Chat.find("9c14e8b2-...")
chat.short_id # => "ai_chat_a3f8c1e9d2b47610"
chat.to_stream_key # => "ai_chat_a3f8c1e9d2b47610"
# Compose multi-part stream names
Pgbus.stream_key(chat, :messages) # => "ai_chat_a3f8c1e9d2b47610_messages"
# Use in views
<%= pgbus_stream_from Pgbus.stream_key(@chat, :messages) %>
The budget is computed from config.queue_prefix at call time so prefix overrides adjust automatically. If a stream name exceeds the budget, Pgbus::Streams::StreamNameTooLong is raised immediately with the offending name, computed budget, and a pointer to Pgbus.stream_key — before PGMQ is ever touched.
Transactional broadcasts
This is the feature no other Rails real-time stack can offer. A broadcast issued inside an open ActiveRecord transaction is deferred until the transaction commits. If it rolls back, the broadcast silently drops — clients never see the change that the database never persisted.
ActiveRecord::Base.transaction do
@order.update!(status: "shipped")
@order.broadcast_replace_to :account # ← deferred until commit
RelatedService.update_counters!(@order) # ← might raise, rolling back the update
end
# If RelatedService raised, the database state is unchanged AND no SSE client
# ever saw a "shipped" broadcast. The broadcast and the data mutation are
# atomic with respect to each other.
ActionCable can't do this because its broadcast path goes through Redis pub/sub, which has no concept of your application's transaction boundary. Pgbus detects the open AR transaction via ActiveRecord::Base.connection.current_transaction.after_commit, which is a first-class Rails API — no outbox table, no background worker, no extra storage.
Outside an open transaction, broadcasts are synchronous and return the assigned msg_id as before. Inside a transaction, they return nil (the id isn't known until commit time).
Replaying history on connect (replay:)
By default pgbus_stream_from @room captures MAX(msg_id) at render time and replays only broadcasts published after that — the page-born-stale fix. For chat-history-style applications where the page should show backlog on load, pass replay::
<%# Show the last 50 messages on load, then stream live %>
<%= pgbus_stream_from @room, replay: 50 %>
<%# Show everything in PGMQ retention on load %>
<%= pgbus_stream_from @room, replay: :all %>
<%# Default behavior (post-render only) — same as omitting the option %>
<%= pgbus_stream_from @room, replay: :watermark %>
The replay cap is applied server-side: the helper computes since_id = max(0, current_msg_id - N) for integer N and writes that into the HTML attribute. The client just reads the attribute and sends it as ?since= on connect. Nothing else changes about the transport.
How much history is actually available depends on the stream's retention setting (streams_retention or streams_default_retention, both in seconds). A chat stream configured with streams_retention = { /^chat_/ => 7.days } will replay up to seven days of history with replay: :all; a notification stream with the 5-minute default will only go back five minutes.
Server-side audience filtering
Some broadcasts shouldn't reach every subscriber on a stream. Pgbus supports per-connection filtering via a registry of named predicates evaluated against each connection's authorize-hook context:
# config/initializers/pgbus_streams.rb
Pgbus::Streams.filters.register(:admin_only) { |user| user&.admin? }
Pgbus::Streams.filters.register(:workspace_member) do |user, stream_name|
user&.workspace_ids&.include?(stream_name.split(":").last.to_i)
end
The authorize hook on Pgbus::Web::StreamApp doubles as a context provider — return any non-boolean value (typically a User model) and pgbus will pass it to the filter predicate when evaluating broadcasts:
Pgbus::Web::StreamApp.new(authorize: ->(env, _stream_name) {
user = User.find_by(id: env["rack.session"][:user_id])
return false unless user
user # ← context attached to the connection
})
Then label broadcasts with the filter you want to apply:
@order.broadcast_replace_to :account # delivered to everyone
Pgbus.stream("ops").broadcast(html, visible_to: :admin_only) # admins only
Failure semantics:
- Unknown filter label → fail-CLOSED with a warning log. Audience filtering is a data-isolation feature; failing open on a typo would turn a restricted broadcast into a public one. The warning log is loud enough that typos still get noticed in dev ("why are no subscribers receiving my broadcast?" → check the log).
- Filter predicate raises → fail-CLOSED. A buggy predicate that crashes is treated as "deny" so private data doesn't leak on an exception path.
- No
visible_toon the broadcast → no filter applied; everyone sees it.
The filter registry is process-local. Each Puma worker (or Falcon reactor) has its own copy populated at boot. Filter predicates run on the subscriber side — the predicate itself can't be serialized through PGMQ, so the broadcast carries only the label name.
Presence
Pgbus tracks who is currently subscribed to a stream via a pgbus_presence_members table. This is the standard "X people are in this room" feature that chat apps and collaboration tools need:
rails generate pgbus:add_presence
rails db:migrate
class RoomsController < ApplicationController
def show
@room = Room.find(params[:id])
Pgbus.stream(@room).presence.join(
member_id: current_user.id.to_s,
metadata: { name: current_user.name, avatar: current_user.avatar_url }
) do |member|
render_to_string(partial: "presence/joined", locals: { member: member })
end
end
def destroy
Pgbus.stream(@room).presence.leave(member_id: current_user.id.to_s) do |member|
"<turbo-stream action=\"remove\" target=\"presence-#{member['id']}\"></turbo-stream>"
end
end
end
The block passed to join/leave is rendered into HTML and broadcast through the regular pgbus stream pipeline — so it shows up in every connected client's DOM in real time, alongside the normal broadcasts_to output. Reading the current member list:
Pgbus.stream(@room).presence.members
# => [{ "id" => "7", "metadata" => {...}, "joined_at" => "...", "last_seen_at" => "..." }]
Pgbus.stream(@room).presence.count
# => 5
Heartbeat and expiry. Members that don't ping presence.touch(member_id: ...) periodically can be expired by a sweeper:
# Run from a cron, ActiveJob, or after each subscriber heartbeat
Pgbus.stream(@room).presence.sweep!(older_than: 60.seconds.ago)
The sweep uses DELETE ... RETURNING so multiple workers running it concurrently won't double-emit leave events.
Deliberately left to the application:
- Join/leave is explicit, not connection-driven. The controller decides who is "present" — a connected SSE client is not always a present user (think tab-in-background, multi-tab dedup).
- The stale-member sweep is manual. Run it from a cron, an ActiveJob, or your existing heartbeat — pgbus does not assume one over the others.
- The DOM markup for join/leave is whatever your
join/leaveblock returns. Pgbus does not impose a fixed presence schema on<pgbus-stream-source>.
Stream stats (opt-in)
Pgbus can record one row in pgbus_stream_stats per broadcast, connect, and disconnect so the /pgbus/insights dashboard shows stream throughput alongside job throughput. This is disabled by default because stream event volume can dwarf job volume in chat-style apps — enable it deliberately when you want the observability.
# config/initializers/pgbus.rb
Pgbus.configure do |c|
c.streams_stats_enabled = true
end
Then run the migration generator once:
rails generate pgbus:add_stream_stats # Add the migration
rails generate pgbus:add_stream_stats --database=pgbus # For separate database
rails db:migrate
The Insights tab gains a "Real-time Streams" section with counts of broadcasts / connects / disconnects, an "active" estimate (connects − disconnects in the selected window), average fanout per broadcast, and a "Top Streams by Broadcast Volume" table. The existing stats_retention config covers cleanup, so there is no separate retention knob.
Overhead on a real Puma + PGMQ setup (bundle exec rake bench:streams): the most visible cost is an INSERT per connect/disconnect pair, which shows up under thundering-herd connect scenarios (K=50 concurrent connects: ~+20% per-connect latency). Steady-state broadcast and fanout numbers stay in the run-to-run noise band. Enable it if Insights is useful; leave it off if the write traffic worries you.
Testing
Pgbus ships opt-in test helpers for both RSpec and Minitest. The testing module is never autoloaded by Zeitwerk -- you must require it explicitly, so it cannot leak into production.
RSpec setup
Add one line to your spec/rails_helper.rb (or spec/spec_helper.rb):
# spec/rails_helper.rb
require "pgbus/testing/rspec"
This does three things:
- Loads
Pgbus::Testingwith the in-memoryEventStoreand mode management - Registers the
have_published_eventmatcher - Includes
Pgbus::Testing::Assertionsinto all example groups
You still need to activate a testing mode and clear the store per test. Add a before/after block:
# spec/rails_helper.rb
require "pgbus/testing/rspec"
RSpec.configure do |config|
config.before { Pgbus::Testing.fake! }
config.after do
Pgbus::Testing.disabled!
Pgbus::Testing.store.clear!
end
end
Or scope it to specific groups:
RSpec.configure do |config|
config.before(:each, :pgbus) { Pgbus::Testing.fake! }
config.after(:each, :pgbus) do
Pgbus::Testing.disabled!
Pgbus::Testing.store.clear!
end
end
# Usage:
RSpec.describe OrderService, :pgbus do
it "publishes an event" do
expect { described_class.create!(attrs) }
.to have_published_event("orders.created")
end
end
Minitest / TestUnit setup
Add the require and include to your test/test_helper.rb:
# test/test_helper.rb
require "pgbus/testing/minitest"
class ActiveSupport::TestCase
include Pgbus::Testing::MinitestHelpers
end
MinitestHelpers hooks into Minitest's lifecycle automatically:
before_setup-- activates:fakemode and clears the event store before each test- Includes all assertion helpers (
assert_pgbus_published,assert_no_pgbus_published,pgbus_published_events,perform_published_events)
No additional setup/teardown blocks are needed -- the module handles it.
Event bus assertions
Both RSpec and Minitest share the same assertion helpers via Pgbus::Testing::Assertions:
# Assert that a block publishes exactly N events
assert_pgbus_published(count: 1, routing_key: "orders.created") do
OrderService.create!(attrs)
end
# Assert that a block publishes zero events
assert_no_pgbus_published(routing_key: "orders.created") do
OrderService.preview(attrs)
end
# Inspect captured events directly
events = pgbus_published_events(routing_key: "orders.created")
assert_equal 1, events.size
assert_equal({ "id" => 42 }, events.first.payload)
# Capture events, then dispatch them to registered handlers
perform_published_events do
OrderService.create!(attrs)
end
# After the block, all captured events have been dispatched to their
# matching handlers synchronously -- useful for testing side effects
RSpec matchers
The have_published_event matcher supports chainable constraints:
# Basic: assert any event was published with the given routing key
expect { publish_order(order) }
.to have_published_event("orders.created")
# With payload matching (uses RSpec's values_match?, so hash_including works)
expect { publish_order(order) }
.to have_published_event("orders.created")
.with_payload(hash_including("id" => order.id))
# With header matching
expect { publish_order(order) }
.to have_published_event("orders.created")
.with_headers(hash_including("x-tenant" => "acme"))
# Exact count
expect { publish_order(order) }
.to have_published_event("orders.created")
.exactly(1)
# Combine all constraints
expect { publish_order(order) }
.to have_published_event("orders.created")
.with_payload(hash_including("id" => order.id))
.with_headers(hash_including("x-tenant" => "acme"))
.exactly(1)
# Negated
expect { publish_order(order) }
.not_to have_published_event("orders.cancelled")
Testing modes
Three modes control how Pgbus::EventBus::Publisher.publish behaves:
| Mode | Behavior | Use case |
|---|---|---|
:fake |
Captures events in-memory, no PGMQ calls, no handler dispatch | Most unit/integration tests |
:inline |
Captures events AND immediately dispatches to matching handlers | Testing side effects (handler logic) |
:disabled |
Pass-through to real publisher (production behavior) | Default; integration tests with real PGMQ |
Switch modes globally or scoped to a block:
# Global (persists until changed)
Pgbus::Testing.fake!
Pgbus::Testing.inline!
Pgbus::Testing.disabled!
# Scoped (restores previous mode after block)
Pgbus::Testing.inline! do
OrderService.create!(attrs) # handlers fire synchronously
end
# mode is restored to whatever it was before
# Query current mode
Pgbus::Testing.fake? # => true/false
Pgbus::Testing.inline? # => true/false
Pgbus::Testing.disabled? # => true/false
The :inline mode skips delayed publishes (delay: > 0) -- those are captured in the store but not dispatched. Use Pgbus::Testing.store.drain! to manually dispatch all captured events including delayed ones.
SSE streams in tests
When using use_transactional_fixtures = true (the default in Rails), pgbus SSE streams are incompatible with transactional test isolation. The rack.hijack mechanism spawns background threads that acquire their own database connections outside the test transaction, which causes:
- Connection pool exhaustion after enough system tests
- CI hangs (tests freeze waiting for a connection)
Errno::EPIPEerrors when the browser navigates away
Automatic fix with Pgbus::Testing: When you activate :fake or :inline mode (as shown above), pgbus automatically enables streams_test_mode. The SSE endpoint returns a stub response (valid SSE headers + a comment + immediate close) without hijacking, without spawning background threads, and without acquiring any database connections. The <pgbus-stream-source> custom element still renders and connects, but no PGMQ polling occurs.
If you're using the RSpec or Minitest setup shown above, you don't need to do anything extra -- streams are safe automatically.
Manual configuration (if you don't use Pgbus::Testing):
# config/initializers/pgbus.rb
Pgbus.configure do |c|
c.streams_test_mode = true if Rails.env.test?
end
Or toggle it per test:
# RSpec
before { Pgbus.configuration.streams_test_mode = true }
after { Pgbus.configuration.streams_test_mode = false }
# Minitest
setup { Pgbus.configuration.streams_test_mode = true }
teardown { Pgbus.configuration.streams_test_mode = false }
What streams_test_mode does: The StreamApp short-circuits after signature verification and authorization checks but before any hijack, streaming body, or capacity logic. It returns:
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache, no-transform
: pgbus test mode — connection accepted, no polling
This is a valid SSE response that the browser's EventSource will accept. No Streamer singleton is created, no PG LISTEN connection is opened, and no dispatcher/heartbeat/listener threads are spawned.
Testing actual stream delivery: If you need to verify end-to-end SSE message delivery in integration tests, disable streams_test_mode and use the PumaTestHarness from the pgbus test support:
require "pgbus/testing"
Pgbus::Testing.disabled! do
# streams_test_mode is automatically disabled
# Use real Puma + real PGMQ for end-to-end stream tests
end
See spec/integration/streams/ in the pgbus source for examples of integration tests that exercise the full SSE pipeline with a real Puma server.
Operations
Day-to-day running of Pgbus: starting and stopping processes, observing what is happening on the dashboard, the database tables Pgbus relies on, and how to migrate from an existing job backend.
CLI
pgbus start # Start supervisor with workers + dispatcher + scheduler
pgbus status # Show running processes
pgbus queues # List queues with depth/metrics
pgbus version # Print version
pgbus help # Show help
Role flags (split deployments)
By default, pgbus start boots every role in one supervisor (workers, dispatcher, scheduler, event consumers, outbox poller). For containerized deployments where each role lives in a separate process, use the role flags:
pgbus start --workers-only # Only worker processes
pgbus start --scheduler-only # Only the recurring-task scheduler
pgbus start --dispatcher-only # Only the maintenance dispatcher
These flags are mutually exclusive. The auto-tuned pool_size adjusts to the role: a --scheduler-only deployment with 50 worker threads configured only opens the connections it actually needs (1 for the scheduler), not 51.
Capsule selection
--capsule NAME boots a single named capsule. Combine with --workers-only to run one capsule per container:
pgbus start --workers-only --capsule critical
pgbus start --workers-only --capsule default
The capsule name is the :name you passed to c.capsule in your initializer (or the first queue token when using the string DSL).
Dashboard
The dashboard is a mountable Rails engine at /pgbus with:
- Overview — queue depths, enqueued count, active processes, failure count, throughput rate
- Queues — per-queue metrics, purge/pause/resume/delete actions
- Jobs — enqueued and failed jobs, retry/discard actions
- Dead letter — DLQ messages with retry/discard, bulk actions
- Processes — active workers/dispatcher/consumers with heartbeat status
- Events — registered subscribers and processed events
- Outbox — transactional outbox entries pending publication
- Locks — active job uniqueness locks with state (queued/executing), owner PID@hostname, age
- Insights — throughput chart (jobs/min), status distribution donut, slowest job classes table
All tables use Turbo Frames for periodic auto-refresh without page reloads. Destructive actions use styled confirmation dialogs (not browser confirm()), and flash messages appear as auto-dismissing toast notifications.
Queue management
The queues page lets you manage PGMQ queues directly:
- Purge — removes all messages from the queue (the queue itself remains)
- Delete — permanently drops the queue from PGMQ (removes the queue table and metadata)
- Pause / Resume — pauses or resumes job processing for a queue
All destructive actions require confirmation. Pause/resume and delete are available on both the queue index and detail pages.
Dark mode
The dashboard supports dark mode via Tailwind CSS dark: classes. It respects your system preference on first visit and persists your choice via localStorage. Toggle with the sun/moon button in the nav bar.
Job stats and insights
The executor records every job completion to pgbus_job_stats (job class, queue, status, duration). The insights page visualizes this data with ApexCharts (loaded via CDN, zero npm dependencies).
rails generate pgbus:add_job_stats # Add the stats migration
rails generate pgbus:add_job_stats --database=pgbus
Stats collection is enabled by default (config.stats_enabled = true). Old stats are cleaned up by the dispatcher based on config.stats_retention (default: 30 days). If the migration hasn't been run yet, stat recording is silently skipped.
Database tables
Pgbus uses these tables (created via PGMQ and migrations):
| Table | Purpose |
|---|---|
q_pgbus_* |
PGMQ job queues (managed by PGMQ) |
a_pgbus_* |
PGMQ archive tables (managed by PGMQ, compacted by dispatcher) |
pgbus_processes |
Heartbeat tracking for workers/dispatcher/consumers |
pgbus_failed_events |
Failed event dispatch records |
pgbus_processed_events |
Idempotency deduplication (event_id, handler_class) |
pgbus_semaphores |
Concurrency control counting semaphores |
pgbus_blocked_executions |
Jobs waiting for a concurrency semaphore slot |
pgbus_batches |
Batch tracking with job counters and callback config |
pgbus_job_locks |
Job uniqueness locks (state, owner_pid, reaper correlation) |
pgbus_job_stats |
Job execution metrics (class, queue, status, duration) |
pgbus_queue_states |
Queue pause/resume and circuit breaker state |
pgbus_outbox_entries |
Transactional outbox entries pending publication |
pgbus_recurring_tasks |
Recurring job definitions |
pgbus_recurring_executions |
Recurring job execution history |
pgbus_presence_members |
Stream presence tracking (who is subscribed) |
pgbus_stream_stats |
Stream broadcast/connect/disconnect metrics (opt-in) |
Switching from another backend
Already using a different job processor? These guides walk you through the migration:
- Switch from Sidekiq — remove Redis, convert native workers, replace middleware with callbacks
- Switch from SolidQueue — similar architecture, swap config format, gain LISTEN/NOTIFY + worker recycling
- Switch from GoodJob — both PostgreSQL-native, swap advisory locks for PGMQ visibility timeouts
See docs/README.md for a full feature comparison table.
Reference
Architectural overview and the full list of configuration settings.
Architecture
Supervisor (fork manager)
├── Worker 1 (queues: [default, mailers], threads: 10, priority: 10)
├── Worker 2 (queues: [critical], threads: 5, single_active_consumer: true)
├── Dispatcher (maintenance: cleanup, compaction, reaping, circuit breaker)
├── Scheduler (recurring tasks via cron)
├── Consumer (event bus topics)
└── Outbox Poller (transactional outbox → PGMQ, when enabled)
PostgreSQL + PGMQ
├── pgbus_default (job queue)
├── pgbus_default_dlq (dead letter queue)
├── pgbus_critical (job queue)
├── pgbus_critical_dlq (dead letter queue)
├── pgbus_mailers (job queue)
└── pgbus_queue_states (pause/resume + circuit breaker state)
How a job flows through the system
- Enqueue: ActiveJob serializes the job to JSON, Pgbus sends it to the appropriate PGMQ queue
- Read: Workers poll queues (or wake instantly via LISTEN/NOTIFY) and claim messages with a visibility timeout
- Execute: The job is deserialized and executed within the Rails executor
- Archive/Retry: On success, the message is archived. On failure, the visibility timeout expires and the message becomes available again. PGMQ's
read_cttracks delivery attempts - Dead letter: When
read_ctexceedsmax_retries, the message is moved to the_dlqqueue for manual inspection
Configuration reference
| Option | Default | Description |
|---|---|---|
database_url |
nil |
PostgreSQL connection URL (auto-detected in Rails) |
queue_prefix |
"pgbus" |
Prefix for all PGMQ queue names |
default_queue |
"default" |
Default queue for jobs without explicit queue |
pool_size |
nil (auto) |
Connection pool size. Auto-tuned from worker thread counts: sum(workers.threads) + sum(event_consumers.threads) + 2. Set explicitly to override. |
workers |
[{queues: ["default"], threads: 5}] |
Worker capsule definitions. String DSL ("default: 5; critical: 10"), Array, or nil. |
event_consumers |
nil |
Event consumer process definitions (same format as workers) |
roles |
nil (all) |
Supervisor role filter — usually set via CLI flags (--workers-only etc.) |
polling_interval |
0.1 |
Seconds between polls (LISTEN/NOTIFY is primary) |
visibility_timeout |
30 |
Time before unacked message becomes visible again. Accepts seconds or ActiveSupport::Duration (e.g. 10.minutes) |
max_retries |
5 |
Failed reads before routing to dead letter queue |
retry_backoff |
5 |
Base delay in seconds for VT-based retry backoff (exponential: base * 2^(attempt-1)) |
retry_backoff_max |
300 |
Maximum retry delay in seconds (caps the exponential curve) |
retry_backoff_jitter |
0.15 |
Jitter factor (0-1) added to retry delays to spread retries |
max_jobs_per_worker |
nil |
Recycle worker after N jobs (nil = unlimited) |
max_memory_mb |
nil |
Recycle worker when memory exceeds N MB |
max_worker_lifetime |
nil |
Recycle worker after N seconds. Accepts seconds or Duration. |
listen_notify |
true |
Use PGMQ's LISTEN/NOTIFY for instant wake-up |
prefetch_limit |
nil |
Max in-flight messages per worker (nil = unlimited) |
dispatch_interval |
1.0 |
Seconds between dispatcher maintenance ticks |
circuit_breaker_enabled |
true |
Enable auto-pause on consecutive failures (threshold and backoff are tuned via Pgbus::CircuitBreaker constants) |
priority_levels |
nil |
Number of priority sub-queues (nil = disabled, 2-10) |
default_priority |
1 |
Default priority for jobs without explicit priority |
archive_retention |
7.days |
How long to keep archived messages. Accepts seconds, Duration, or nil to disable cleanup |
outbox_enabled |
false |
Enable transactional outbox poller process |
outbox_poll_interval |
1.0 |
Seconds between outbox poll cycles |
outbox_batch_size |
100 |
Max entries per outbox poll cycle |
outbox_retention |
1.day |
How long to keep published outbox entries. Accepts seconds, Duration, or nil to disable cleanup |
idempotency_ttl |
7.days |
How long to keep processed event records. Accepts seconds, Duration, or nil to disable cleanup |
base_controller_class |
"::ActionController::Base" |
Base class for dashboard controllers (string, constantized at load time) |
return_to_app_url |
nil |
URL for "back to app" button in dashboard nav (nil hides the button) |
web_auth |
nil |
Lambda for dashboard authentication |
web_refresh_interval |
5000 |
Dashboard auto-refresh interval in milliseconds |
web_live_updates |
true |
Enable Turbo Frames auto-refresh on dashboard |
stats_enabled |
true |
Record job execution stats for insights dashboard |
stats_retention |
30.days |
How long to keep job stats. Accepts seconds, Duration, or nil to disable cleanup |
streams_test_mode |
false |
Return a stub SSE response without hijack or background threads. Auto-enabled by Pgbus::Testing.fake!/.inline!. See SSE streams in tests. |
streams_stats_enabled |
false |
Record stream broadcast/connect/disconnect stats (opt-in, can be high volume) |
streams_path |
nil |
Custom URL path for the SSE endpoint (nil = auto-detected from engine mount) |
execution_mode |
:threads |
Global execution mode (:threads or :async). Per-worker override via capsule config. |
error_reporters |
[] |
Array of callables invoked on caught exceptions. Each receives (exception, context_hash). |
log_format |
:text |
Log formatter (:text or :json). Sets logger.formatter automatically. |
metrics_enabled |
true |
Enable Prometheus-compatible metrics on the dashboard |
Development
bundle install
bundle exec rake # Run tests + rubocop
bundle exec rspec # Run tests only
bundle exec rubocop # Run linter only
System tests use Playwright via Capybara:
bun install
bunx --bun playwright install chromium
bundle exec rspec spec/system/
End-to-end streams benchmarks (real Puma + real PGMQ + real SSE clients):
PGBUS_DATABASE_URL=postgres://user@host/db bundle exec rake bench:streams
The harness measures single-broadcast roundtrip latency, burst throughput, fanout to many clients, and concurrent connect under thundering herd. See benchmarks/streams_bench.rb.
License
The gem is available as open source under the terms of the MIT License.