Pgbus

PostgreSQL-native job processing and event bus for Rails, built on PGMQ.

Why Pgbus? If you already run PostgreSQL, you don't need Redis for background jobs. Pgbus gives you ActiveJob integration, AMQP-style topic routing, dead letter queues, worker memory management, and a live dashboard -- all backed by your existing database.

Ruby

Table of contents

Features

  • ActiveJob adapter -- drop-in replacement, zero config migration from other backends
  • Turbo Streams replacement -- pgbus_stream_from drops into turbo-rails apps with no ActionCable, no Redis, no lost messages on reconnect. Includes transactional broadcasts (deferred until commit), backlog replay on connect, server-side audience filtering, and presence tracking. Fixes rails/rails#52420, hotwired/turbo#1261, and hotwired/turbo-rails#674.
  • Event bus -- publish/subscribe with AMQP-style topic routing (orders.#, payments.*)
  • Dead letter queues -- automatic DLQ routing after configurable retries
  • Worker recycling -- memory, job count, and lifetime limits prevent runaway processes
  • LISTEN/NOTIFY -- instant wake-up, polling as fallback only
  • Idempotent events -- deduplication via (event_id, handler_class) unique index with in-memory cache
  • Live dashboard -- Turbo Frames auto-refresh with throughput rate, no ActionCable required
  • Supervisor/worker model -- forked processes with heartbeat monitoring and lifecycle state machine
  • Priority queues -- route jobs to priority sub-queues, highest-priority-first processing
  • Circuit breaker -- auto-pause queues after consecutive failures, exponential backoff
  • Queue pause/resume -- manual or automatic via dashboard
  • Prefetch flow control -- cap in-flight messages per worker to prevent overload
  • Archive compaction -- automatic purge of old archived messages
  • Transactional outbox -- publish events atomically inside database transactions
  • Single active consumer -- advisory-lock-based exclusive queue processing for strict ordering
  • Consumer priority -- higher-priority workers get first dibs, lower-priority workers back off
  • Job uniqueness -- prevent duplicate jobs with reaper-based crash recovery, no TTL-driven expiry
  • Retry backoff -- exponential backoff with jitter for VT-based retries, per-job overrides
  • Error reporting -- pluggable error reporters for APM integration (Appsignal, Sentry, etc.)
  • Structured logging -- JSON log formatter with component extraction and thread-local context
  • Queue health -- dead tuple monitoring, autovacuum tuning, Prometheus metrics

Requirements

Installation

Add to your Gemfile:

gem "pgbus"

Then install the PGMQ extension in your database:

CREATE EXTENSION IF NOT EXISTS pgmq;

Quick start

1. Configure (optional)

Pgbus works with zero config in Rails -- it uses your existing ActiveRecord connection. For custom setups, drop a Ruby initializer:

# config/initializers/pgbus.rb
Pgbus.configure do |c|
  c.queue_prefix      = "myapp"
  c.max_retries       = 5
  c.visibility_timeout = 30.seconds   # ActiveSupport::Duration accepted
  c.idempotency_ttl   = 7.days

  # Worker recycling — prevents long-lived processes from leaking memory
  c.max_jobs_per_worker = 10_000
  c.max_memory_mb       = 512
  c.max_worker_lifetime = 1.hour

  # Capsule string DSL — Sidekiq-style "queues: threads; queues: threads"
  c.workers = "default, mailers: 10; critical: 5"

  # Or use named capsules with advanced options
  c.capsule :ordered, queues: %w[ordered_events], threads: 1, single_active_consumer: true
end

The capsule string DSL is the shortest form for the common case. Use c.capsule when you need named capsules with advanced options like single_active_consumer or consumer_priority. See Routing and ordering for the full set.

Upgrading from an older pgbus? Run rails generate pgbus:update. It does two things in one pass:

  • Converts any legacy config/pgbus.yml to a Ruby initializer at config/initializers/pgbus.rb (skipped if the initializer already exists).
  • Inspects your live database and adds any missing pgbus migrations to db/migrate (or db/pgbus_migrate if you use connects_to). The generator detects your separate-database config automatically from Pgbus.configuration.connects_to or by scanning the initializer / config/application.rb, so you don't have to re-specify --database=pgbus every time.

Useful flags: --dry-run (print the plan without creating files), --skip-config, --skip-migrations, --quiet. Running it on a database with no pgbus tables at all will redirect you to pgbus:install instead of stacking individual add_* migrations.

2. Use as ActiveJob backend

# config/application.rb
config.active_job.queue_adapter = :pgbus

That's it. Your existing jobs work unchanged:

class OrderConfirmationJob < ApplicationJob
  queue_as :mailers

  def perform(order)
    OrderMailer.confirmation(order).deliver_now
  end
end

# Enqueue
OrderConfirmationJob.perform_later(order)

# Schedule
OrderConfirmationJob.set(wait: 5.minutes).perform_later(order)

3. Event bus (optional)

Publish events with AMQP-style topic routing:

# Publish an event
Pgbus::EventBus::Publisher.publish(
  "orders.created",
  { order_id: order.id, total: order.total }
)

# Publish with delay
Pgbus::EventBus::Publisher.publish_later(
  "invoices.due",
  { invoice_id: invoice.id },
  delay: 30.days
)

Subscribe with handlers:

# app/handlers/order_created_handler.rb
class OrderCreatedHandler < Pgbus::EventBus::Handler
  idempotent!  # Deduplicate by (event_id, handler_class)

  def handle(event)
    order_id = event.payload["order_id"]
    Analytics.track_order(order_id)
    InventoryService.reserve(order_id)
  end
end

# Register in an initializer
Pgbus::EventBus::Registry.instance.subscribe(
  "orders.created",
  OrderCreatedHandler
)

# Wildcard patterns
Pgbus::EventBus::Registry.instance.subscribe(
  "orders.#",           # matches orders.created, orders.updated, orders.shipped.confirmed
  OrderAuditHandler
)

4. Start workers

bundle exec pgbus start

This boots a supervisor that manages:

  • Workers -- process ActiveJob queues
  • Dispatcher -- runs maintenance tasks (idempotency cleanup, stale process reaping)
  • Consumers -- process event bus messages

5. Mount the dashboard

# config/routes.rb
mount Pgbus::Engine => "/pgbus"

The dashboard shows queues, jobs, processes, failures, dead letter messages, and event subscribers. It auto-refreshes via Turbo Frames with no WebSocket dependency.

Protect it in production with a simple auth lambda:

Pgbus.configure do |config|
  config.web_auth = ->(request) {
    request.env["warden"].user&.admin?
  }
end

Or inherit from your own authenticated controller (like mission_control-jobs):

Pgbus.configure do |config|
  config.base_controller_class = "Admin::BaseController"
end

When base_controller_class is set, all dashboard controllers inherit from that class instead of ActionController::Base. This is the recommended approach when mounting the dashboard inside an authenticated namespace -- your base controller's before_action filters, helper methods, and authentication logic apply automatically without monkey-patching.

Add a "back to app" button in the dashboard nav to return to your main application:

Pgbus.configure do |config|
  config.return_to_app_url = "/admin"
end

Reliability

These features stop bad jobs from cascading into outages: deduplication, concurrency caps, automatic queue pausing on repeated failures, in-flight backpressure, and worker recycling.

Job uniqueness

Prevent duplicate jobs from running. Unlike limits_concurrency (which controls how many jobs with the same key run), uniqueness guarantees at most one job with a given key exists in the system at any time.

class ImportOrderJob < ApplicationJob
  ensures_uniqueness strategy: :until_executed,
                     key: ->(order_id) { "import-order-#{order_id}" },
                     on_conflict: :reject

  def perform(order_id)
    # Only ONE instance per order_id can exist — from enqueue through completion.
    # If another ImportOrderJob for this order_id is already enqueued or running,
    # the duplicate is rejected immediately.
  end
end

Strategies

Strategy Lock acquired Lock released Prevents
:until_executed At enqueue On completion or DLQ Duplicate enqueue AND execution
:while_executing At execution start On completion or DLQ Duplicate execution only

Conflict policies

Policy Behavior
:reject Raise Pgbus::JobNotUnique (default)
:discard Silently drop the duplicate
:log Log a warning and drop

Lock lifecycle

The lock is never released by a timer. It is held as long as the job exists in the system:

Enqueue ──→ pgbus_job_locks (state: queued, owner_pid: nil)
                  │
  Worker picks up job
                  │
                  ▼
           claim_for_execution! (state: executing, owner_pid: PID)
                  │
          ┌───────┴───────┐
          ▼               ▼
      Success           Crash
      release!        (lock orphaned)
      (row deleted)       │
                          ▼
                    Reaper checks:
                    Is owner_pid in pgbus_processes
                    with fresh heartbeat?
                          │
                    ┌─────┴─────┐
                    No          Yes
                    ▼            ▼
                release!      (keep lock,
                (orphaned)     job is running)

Crash recovery works through the reaper (runs every 5 minutes in the dispatcher). It cross-references owner_pid in pgbus_job_locks against pgbus_processes heartbeats. If the owning worker has no fresh heartbeat, the lock is orphaned and released — the PGMQ message's visibility timeout will expire and the job will be retried by another worker.

A last-resort TTL (default 24 hours) handles the case where the entire pgbus supervisor is dead and the reaper itself can't run.

Uniqueness vs concurrency controls

ensures_uniqueness limits_concurrency
Purpose Prevent duplicate jobs Limit concurrent execution slots
Lock type Binary lock (one or none) Counting semaphore (up to N)
At enqueue :until_executed blocks duplicates Checks semaphore, blocks/discards/raises
At execution :while_executing blocks duplicate runs Not checked (semaphore acquired at enqueue)
Duplicate in queue :until_executed: impossible. :while_executing: allowed, only one runs Allowed up to N, rest blocked
Crash recovery Reaper checks heartbeats Semaphore expires_at + dispatcher cleanup
Use when "This exact job must not run twice" "At most N of these can run at once"

When to use which:

  • Payment processing, order import, unique email sends → ensures_uniqueness
  • Rate-limited API calls, resource-constrained tasks → limits_concurrency
  • Both at once → combine them (they use separate tables, no conflicts)

Setup

rails generate pgbus:add_job_locks                  # Add the migration
rails generate pgbus:add_job_locks --database=pgbus # For separate database

Concurrency controls

Limit how many jobs with the same key can run concurrently:

class ProcessOrderJob < ApplicationJob
  limits_concurrency to: 1,
                     key: ->(order_id) { "ProcessOrder-#{order_id}" },
                     duration: 15.minutes,
                     on_conflict: :block

  def perform(order_id)
    # Only one job per order_id runs at a time
  end
end

Options

Option Default Description
to: (required) Maximum concurrent jobs for the same key
key: Job class name Proc receiving job arguments, returns a string key
duration: 15.minutes Safety expiry for the semaphore (crashed worker recovery)
on_conflict: :block What to do when the limit is reached

Conflict strategies

Strategy Behavior
:block Hold the job in a blocked queue. It is automatically released when a slot opens or the semaphore expires.
:discard Silently drop the job.
:raise Raise Pgbus::ConcurrencyLimitExceeded so the caller can handle it.

How concurrency works

  1. Enqueue: The adapter checks a semaphore table for the concurrency key. If under the limit, it increments the counter and sends the job to PGMQ. If at the limit, it applies the on_conflict strategy.
  2. Complete: After a job succeeds or is dead-lettered, the executor signals the concurrency system via an ensure block (guaranteeing the signal fires even if the archive step fails). It first tries to promote a blocked job (atomic delete + enqueue in a single transaction). If nothing to promote, it releases the semaphore slot.
  3. Safety net: The dispatcher periodically cleans up expired semaphores and orphaned blocked executions to recover from crashed workers.

Concurrency compared to other backends

Pgbus, SolidQueue, GoodJob, and Sidekiq all offer concurrency controls, but with fundamentally different locking strategies and trade-offs.

Architecture comparison
Pgbus SolidQueue GoodJob Sidekiq Enterprise
Lock backend PostgreSQL rows (pgbus_semaphores table) PostgreSQL rows (solid_queue_semaphores) PostgreSQL advisory locks (pg_advisory_xact_lock) Redis sorted sets (lease-based)
Lock granularity Counting semaphore (allows N concurrent) Counting semaphore (allows N concurrent) Count query under advisory lock Sorted set entries with TTL
Acquire mechanism Atomic INSERT ... ON CONFLICT DO UPDATE WHERE value < max (single SQL) UPDATE ... SET value = value + 1 WHERE value < limit pg_advisory_xact_lock then SELECT COUNT(*) in rolled-back txn Redis Lua script (atomic check-and-add)
At-limit behavior :block (hold in queue), :discard, or :raise Blocks in solid_queue_blocked_executions Enqueue: silently dropped. Perform: retry with backoff (forever) Reschedule with backoff (raises OverLimit, middleware re-enqueues)
Blocked job storage pgbus_blocked_executions table with priority ordering solid_queue_blocked_executions table No blocked queue — retries via ActiveJob retry mechanism No blocked queue — job returns to Redis queue with delay
Release on completion ensure block: promote next blocked job or decrement semaphore Inline after finished/failed_with (inside same transaction as of PR #689) Release advisory lock via pg_advisory_unlock Lease auto-expires from sorted set
Crash recovery Semaphore expires_at + dispatcher expire_stale cleanup Semaphore expires_at + concurrency maintenance task Advisory locks auto-release on session disconnect TTL-based lease expiry (default 5 min)
Message lifecycle PGMQ visibility timeout (FOR UPDATE SKIP LOCKED) — message stays in queue until archived AR-backed claimed_executions table AR-backed good_jobs table with advisory lock per row Redis list + sorted set
Key design differences

Pgbus uses PGMQ's native FOR UPDATE SKIP LOCKED for message claiming and a separate semaphore table for concurrency control. This two-layer approach means the message queue and concurrency system are independent — PGMQ handles exactly-once delivery, the semaphore handles admission control. The semaphore acquire is a single atomic SQL (INSERT ... ON CONFLICT DO UPDATE WHERE value < max), avoiding the need for explicit row locks.

SolidQueue uses AR models for everything — jobs, claimed executions, and semaphores all live in PostgreSQL tables. This means the entire lifecycle can be wrapped in AR transactions. However, as documented in rails/solid_queue#689, this model is vulnerable to race conditions when semaphore expiry, job completion, and blocked-job release interleave across transactions. Pgbus avoids several of these by design: PGMQ's visibility timeout handles message recovery without a claimed_executions table, and there is no "release during shutdown" codepath.

GoodJob takes a different approach entirely: advisory locks. Each job dequeue acquires a session-level advisory lock on the job row, and concurrency checks use transaction-scoped advisory locks on the concurrency key. This means the check and the perform are serialized at the database level. The downside is that advisory locks are session-scoped — if a connection is returned to the pool without unlocking, the lock persists. GoodJob handles this by auto-releasing on session disconnect, but connection pool sharing between web and worker can cause surprising behavior.

Sidekiq Enterprise uses Redis sorted sets with TTL-based leases. Each concurrent slot is a sorted set entry with an expiry timestamp. This is fast and simple but has no durability guarantee — Redis failover can lose leases, temporarily allowing over-limit execution. The sidekiq-unique-jobs gem (open-source) uses a similar Lua-script approach but with more lock strategies (:until_executing, :while_executing, :until_and_while_executing) and configurable conflict handlers (:reject, :reschedule, :replace, :raise).

Race condition resilience
Scenario Pgbus SolidQueue GoodJob Sidekiq
Worker crash mid-execution PGMQ visibility timeout expires → message re-read. Semaphore expires via expire_stale. claimed_execution survives → supervisor's process pruning calls fail_all_with. Advisory lock released on session disconnect. Lease TTL expires in Redis.
Blocked job released while original still executing Not possible — promote only happens in signal_concurrency, which only runs after job success/DLQ. Fixed in PR #689 — now checks for claimed executions before releasing. N/A — no blocked queue; retries independently. N/A — no blocked queue.
Archive succeeds but signal fails ensure block guarantees signal fires even if archive raises. For SIGKILL: semaphore expires via dispatcher. Fixed in PR #689 — unblock_next_job moved inside same transaction as finished. Advisory lock released by session disconnect. Lease auto-expires.
Concurrent enqueue and signal race Semaphore acquire is a single atomic SQL — no read-then-write gap. Fixed in PR #689 — FOR UPDATE lock on semaphore row serializes enqueue with signal. pg_advisory_xact_lock serializes the concurrency check. Redis Lua script is atomic.

Circuit breaker and queue pause/resume

Pgbus automatically pauses queues that fail repeatedly, preventing cascading failures.

Pgbus.configure do |config|
  config.circuit_breaker_enabled = true   # default
end

The trip threshold (5 consecutive failures), base backoff (30s), and max backoff (600s) are tuned via constants on Pgbus::CircuitBreaker. Override the constants in an initializer if you need different values — they are not exposed as configuration because tweaking them at runtime has never proved useful in practice.

When a queue hits the failure threshold:

  1. The circuit breaker auto-pauses the queue with exponential backoff
  2. After the backoff expires, the queue auto-resumes and the trip counter resets
  3. If failures continue, each trip doubles the backoff (capped at MAX_BACKOFF)

You can also manually pause/resume queues from the dashboard. The pause state is stored in the pgbus_queue_states table and survives restarts.

rails generate pgbus:add_queue_states           # Add the queue_states migration
rails generate pgbus:add_queue_states --database=pgbus  # For separate database

Prefetch flow control

Cap the number of in-flight (claimed but unfinished) messages per worker:

Pgbus.configure do |config|
  config.prefetch_limit = 20  # nil = unlimited (default)
end

The worker tracks in-flight messages with an atomic counter and only fetches min(idle_threads, prefetch_available) messages per cycle. The counter is decremented in an ensure block so it never gets stuck.

Worker recycling

Pgbus workers recycle themselves to prevent memory bloat. This is the main reliability difference vs. solid_queue, which leaves workers alive forever.

Pgbus.configure do |config|
  config.max_jobs_per_worker = 10_000  # Restart after 10k jobs
  config.max_memory_mb = 512           # Restart if memory exceeds 512MB
  config.max_worker_lifetime = 1.hour  # Restart after 1 hour
end

When a limit is hit, the worker drains its thread pool, exits, and the supervisor forks a fresh process. RSS memory is sampled from /proc/self/statm (Linux) or ps -o rss (macOS).

Retry backoff

When a job fails, Pgbus extends the PGMQ visibility timeout with exponential backoff so retries are spread out instead of bunched at fixed intervals:

Pgbus.configure do |config|
  config.retry_backoff     = 5       # base delay (seconds)
  config.retry_backoff_max = 300     # cap at 5 minutes
  config.retry_backoff_jitter = 0.15 # +-15% randomization
end

The delay formula is base * 2^(attempt-1) * (1 + random_jitter). For a job that fails 4 times with defaults: ~5s, ~10s, ~20s, ~40s before hitting DLQ on the 5th read.

Jobs can override the global settings per-class:

class FragileApiJob < ApplicationJob
  include Pgbus::RetryBackoff::JobMixin

  pgbus_retry_backoff base: 10, max: 600, jitter: 0.2

  def perform(...)
    # ...
  end
end

Async execution mode (fibers)

Workers can optionally execute jobs as fibers instead of threads. This is ideal for I/O-bound workloads (HTTP calls, email delivery, LLM API calls) where jobs spend most of their time waiting on network I/O.

Pgbus.configure do |config|
  # Global: all workers use async mode
  config.execution_mode = :async

  # Or per-worker: mix thread and async workers
  config.workers = [
    { queues: %w[webhooks emails], threads: 100, execution_mode: :async },
    { queues: %w[default], threads: 5 }  # stays thread-based
  ]
end

Prerequisites:

  1. Add gem "async" to your Gemfile
  2. Set config.active_support.isolation_level = :fiber in your Rails app

Why it reduces connections: In thread mode, each thread holds a database connection while waiting on I/O. With 50 threads, that's 50 connections. In async mode, 50 fibers share 3-5 connections because fibers yield during I/O and only one runs at a time.

CLI flag: pgbus start --execution-mode async

Safety: Messages stay in PGMQ with visibility timeout protection regardless of execution mode. If a fiber or worker crashes, the visibility timeout expires and messages become available for re-read. No data loss risk.

Not recommended for: CPU-bound jobs (image processing, heavy computation). These block the single reactor thread and should use thread mode.

Routing and ordering

How messages flow between producers and the workers that handle them: priority sub-queues, consumer priority for active/standby workers, and single-active-consumer for strict ordering.

Priority queues

Route jobs to priority sub-queues so high-priority work is processed first:

Pgbus.configure do |config|
  config.priority_levels = 3    # Creates _p0, _p1, _p2 sub-queues per logical queue
  config.default_priority = 1   # Jobs without explicit priority go to _p1
end

Workers read from _p0 (highest) first, then _p1, then _p2. Only when higher-priority sub-queues are empty does the worker read from lower ones.

Use ActiveJob's built-in priority attribute:

class CriticalAlertJob < ApplicationJob
  queue_as :default
  queue_with_priority 0  # Highest priority

  def perform(alert_id)
    # ...
  end
end

class ReportJob < ApplicationJob
  queue_as :default
  queue_with_priority 2  # Lowest priority

  def perform(report_id)
    # ...
  end
end

When priority_levels is nil (default), priority queues are disabled and all jobs go to a single queue per logical name.

Consumer priority

When multiple workers subscribe to the same queues, higher-priority workers process messages first. Lower-priority workers back off (3x polling interval) when a higher-priority worker is active.

Pgbus.configure do |c|
  c.capsule :primary,  queues: %w[default], threads: 10, consumer_priority: 10
  c.capsule :fallback, queues: %w[default], threads: 5,  consumer_priority: 0
end

Priority is stored in heartbeat metadata. Workers check the pgbus_processes table to discover higher-priority peers. When a high-priority worker goes stale (no heartbeat for 5 minutes), lower-priority workers automatically resume normal polling.

Single active consumer

For queues that require strict ordering, enable single active consumer mode. Only one worker process can read from a queue at a time — others skip it and process other queues.

Pgbus.configure do |c|
  c.capsule :ordered_primary,  queues: %w[ordered_events], threads: 1, single_active_consumer: true
  c.capsule :ordered_standby,  queues: %w[ordered_events], threads: 1, single_active_consumer: true
end

Uses PostgreSQL session-level advisory locks (pg_try_advisory_lock). The lock is non-blocking — workers that can't acquire it simply skip the queue. Locks auto-release on connection close (including crashes), so failover is automatic. The standby capsule takes over within one polling tick if the primary dies.

Persistence and batching

How Pgbus integrates with your application's transactions and tracks groups of related work: outbox for atomic publish, batches for fan-out coordination, archive compaction for keeping the queue tables small.

Batches

Coordinate groups of jobs with callbacks when all complete:

batch = Pgbus::Batch.new(
  on_finish: BatchFinishedJob,
  on_success: BatchSucceededJob,
  on_discard: BatchFailedJob,
  description: "Import users",
  properties: { initiated_by: current_user.id }
)

batch.enqueue do
  users.each { |user| ImportUserJob.perform_later(user.id) }
end

Callbacks

Callback Fired when
on_finish All jobs completed (success or discard)
on_success All jobs completed successfully (zero discarded)
on_discard At least one job was dead-lettered

Callback jobs receive the batch properties hash as their argument:

class BatchFinishedJob < ApplicationJob
  def perform(properties)
    user = User.find(properties["initiated_by"])
    ImportMailer.complete(user).deliver_later
  end
end

How batches work

  1. Batch.new(...) creates a tracking row in pgbus_batches with status: "pending"
  2. batch.enqueue { ... } tags each enqueued job with the pgbus_batch_id in its payload
  3. After each job completes or is dead-lettered, the executor atomically updates the batch counters
  4. When completed_jobs + discarded_jobs == total_jobs, the batch status flips to "finished" and callback jobs are enqueued
  5. The dispatcher cleans up finished batches older than 7 days

Transactional outbox

Publish events atomically inside your database transactions. A background poller moves outbox entries to PGMQ.

rails generate pgbus:add_outbox                  # Add the outbox migration
rails generate pgbus:add_outbox --database=pgbus # For separate database
Pgbus.configure do |config|
  config.outbox_enabled = true
  config.outbox_poll_interval = 1.0  # seconds
  config.outbox_batch_size = 100
  config.outbox_retention = 1.day    # ActiveSupport::Duration also accepted
end

Usage:

ActiveRecord::Base.transaction do
  order = Order.create!(params)

  # Published atomically with the order — if the transaction rolls back,
  # the outbox entry is also rolled back. No lost or phantom events.
  Pgbus::Outbox.publish("default", { order_id: order.id })

  # For topic-based event bus:
  Pgbus::Outbox.publish_event("orders.created", { order_id: order.id })
end

The outbox poller uses FOR UPDATE SKIP LOCKED inside a transaction to claim entries, publishes them to PGMQ, and marks them as published. Failed entries are skipped and retried next cycle.

Archive compaction

PGMQ archive tables grow unbounded. Pgbus automatically purges old entries:

Pgbus.configure do |config|
  config.archive_retention = 7.days               # ActiveSupport::Duration (default 7 days)
end

The compaction loop runs every hour and deletes up to 1000 rows per queue per cycle. Both knobs live as constants on Pgbus::Process::Dispatcher (ARCHIVE_COMPACTION_INTERVAL, ARCHIVE_COMPACTION_BATCH_SIZE) — they have never been worth surfacing as configuration. The dispatcher runs archive compaction as part of its maintenance loop, deleting archived messages older than archive_retention in batches to avoid long-running transactions.

Observability

Error reporting, structured logging, and queue health monitoring.

Error reporting

By default, Pgbus logs caught exceptions and continues. To route them to your APM service (Appsignal, Sentry, Honeybadger, etc.), push callable reporters onto config.error_reporters:

Pgbus.configure do |c|
  c.error_reporters << ->(ex, ctx) {
    Appsignal.set_error(ex) { |t| t.set_tags(ctx) }
  }
end

Each reporter receives (exception, context_hash). The context hash includes keys like action, queue, job_class, and msg_id depending on the call site. Reporters that accept a third argument also receive the Pgbus configuration object.

Reporters are wired into all critical rescue paths: job execution failures, worker fetch/process errors, dispatcher maintenance, supervisor fork failures, circuit breaker trips, outbox publish errors, and failed event recording. Non-critical paths (dashboard queries, stat recording) remain log-only.

ErrorReporter.report is guaranteed to never raise — if a reporter or the logger itself throws, the error is swallowed silently. This preserves fault-tolerance invariants at every rescue site.

Structured logging

Pgbus ships two log formatters inspired by Sidekiq's Logger::Formatters:

Pgbus.configure do |c|
  c.log_format = :json   # or :text (default)
end

Text format (default):

INFO 2025-01-15T10:30:00.000Z pid=1234 tid=abc queue=default: Starting job

JSON format:

{"ts":"2025-01-15T10:30:00.000Z","pid":1234,"tid":"abc","lvl":"INFO","component":"Pgbus","msg":"Starting job","ctx":{"queue":"default"}}

The JSON formatter extracts [Pgbus] and [Pgbus::Web] prefixes from log messages into a separate component field so the msg field stays clean for log aggregators. Thread-local context can be added via Pgbus::LogFormatter.with_context(queue: "default") { ... } and appears under the ctx key.

You can also set a formatter directly on the logger:

Pgbus.configure do |c|
  c.logger.formatter = Pgbus::LogFormatter::JSON.new
end

Queue health monitoring

The dashboard includes a Queue Health panel showing PostgreSQL vacuum stats per PGMQ table: dead tuple counts, live tuple counts, bloat ratio (dead / total), last vacuum age, and MVCC horizon age. The same stats appear on individual queue detail pages.

Autovacuum tuning

PGMQ queue tables have high insert/delete churn that overwhelms PostgreSQL's default autovacuum settings. Pgbus applies aggressive per-table tuning automatically:

  • New queues at runtime: Client#ensure_single_queue applies tuning after pgmq.create()
  • Existing installations: rails generate pgbus:update detects untuned tables
  • Fresh installs: The install migration includes tuning for the default queue

To apply tuning manually or after db:schema:load (which loses ALTER TABLE settings):

rails generate pgbus:tune_autovacuum                  # Generate migration
rails generate pgbus:tune_autovacuum --database=pgbus # For separate database

The pgbus:tune_autovacuum rake task also hooks into db:schema:load automatically.

Prometheus metrics

When config.metrics_enabled = true (default), the dashboard exposes Prometheus-compatible gauges:

Metric Description
pgbus_table_dead_tuples Dead tuple count per PGMQ table
pgbus_table_live_tuples Live tuple count per PGMQ table
pgbus_table_bloat_ratio Dead / (dead + live) per table
pgbus_table_last_vacuum_age_seconds Seconds since last vacuum per table
pgbus_oldest_transaction_age_seconds MVCC horizon pin risk
pgbus_worker_pool_capacity Total worker thread slots
pgbus_worker_pool_busy Currently busy worker threads
pgbus_worker_pool_utilization Busy / capacity ratio

Real-time broadcasts (turbo-streams replacement)

Pgbus ships a drop-in replacement for turbo-rails' turbo_stream_from helper that fixes several well-known ActionCable correctness bugs by using PGMQ message IDs as a replay cursor. Same API as turbo-rails. No Redis. No ActionCable. No lost messages on reconnect.

Bugs fixed:

  • rails/rails#52420 -- "page born stale": a broadcast that fires between controller render and WebSocket subscribe is silently lost with ActionCable. Pgbus captures a PGMQ msg_id watermark at render time and replays any messages published in the gap via the SSE Last-Event-ID mechanism.
  • hotwired/turbo#1261 -- missed messages on reconnect. Pgbus persists the cursor on the client (EventSource's built-in Last-Event-ID) and replays from the PGMQ archive on every reconnect.
  • hotwired/turbo-rails#674 -- no way to detect disconnect. Pgbus dispatches pgbus:open, pgbus:gap-detected, and pgbus:close DOM events on the stream element.

Usage

Swap turbo_stream_from for pgbus_stream_from in your view:

<%# Before %>
<%= turbo_stream_from @order %>

<%# After %>
<%= pgbus_stream_from @order %>

Everything else stays the same. The model concern keeps working unchanged:

class Order < ApplicationRecord
  broadcasts_to ->(order) { [order., :orders] }
end

broadcasts_to, broadcast_replace_to, broadcasts_refreshes, broadcast_append_later_to, and every other Turbo::Broadcastable helper funnels through a single Turbo::StreamsChannel.broadcast_stream_to method that pgbus monkey-patches at engine boot. The signed-stream-name verification reuses Turbo.signed_stream_verifier_key so existing signed tokens Just Work.

Add the Puma plugin to config/puma.rb so SSE connections drain cleanly on deploy:

# config/puma.rb
plugin :pgbus_streams

Without the plugin, Puma closes hijacked SSE sockets abruptly during graceful restart, which looks to browsers like a network error and triggers an immediate reconnect. With the plugin, the streamer writes a pgbus:shutdown sentinel before the socket closes; browsers reconnect to the new worker and replay missed messages via Last-Event-ID.

Requirements

  • Puma 6.1+ or Falcon. Streams use rack.hijack by default. Puma 6.1+ supports it via partial hijack (thread-releasing, see puma/puma#1009). Falcon supports both the hijack path (via protocol-rack's emulation) and a native streaming body path (streams_falcon_streaming_body = true) that integrates with Falcon's fiber scheduler for better backpressure and connection lifecycle management. Unicorn, Pitchfork, and Passenger return HTTP 501 from the streams endpoint.
  • PostgreSQL LISTEN/NOTIFY. config.listen_notify = true (the default). Stream queues override PGMQ's 250ms NOTIFY throttle to 0 so every broadcast fires individually.
  • HTTP/2 or HTTP/3 in production. SSE has a 6-connection-per-origin limit on HTTP/1.1; HTTP/2 lifts it. Falcon supports HTTP/2 natively without a reverse proxy.

Configuration

Pgbus.configure do |c|
  c.streams_enabled                = true          # default
  c.streams_queue_prefix           = "pgbus_stream"
  c.streams_default_retention      = 5 * 60        # 5 minutes
  c.streams_retention              = {             # per-stream overrides
    /^chat_/        => 7 * 24 * 3600,              # 7 days for chat history
    "presence_room" => 30                          # 30 seconds for presence
  }
  c.streams_heartbeat_interval     = 15            # seconds
  c.streams_max_connections        = 2_000         # per web-server process (Puma worker or Falcon process)
  c.streams_idle_timeout           = 3_600         # close idle connections after 1h
  c.streams_listen_health_check_ms = 250           # PG LISTEN keepalive + ensure_listening ack budget
  c.streams_write_deadline_ms      = 5_000         # write_nonblock deadline
  c.streams_falcon_streaming_body  = false         # opt-in: Falcon-native streaming body
end

Falcon-native streaming body (opt-in)

When running on Falcon, enable native streaming body support for better integration with Falcon's fiber scheduler:

c.streams_falcon_streaming_body = true

With this flag, StreamApp returns [200, headers, Writable] instead of hijacking the socket. Falcon drives the response lifecycle with proper backpressure, connection cleanup, and fiber-scheduled IO. SSE writes go through Protocol::HTTP::Body::Writable which is fiber-safe and yields to other fibers when blocked.

Without the flag (default), Falcon uses the same rack.hijack path as Puma via protocol-rack's emulation. Both paths are tested and work correctly — the streaming body path is an optimization for Falcon deployments that want tighter scheduler integration.

How it works

Stream broadcasts are stored in PGMQ queues prefixed pgbus_stream_*. Each broadcast is assigned a monotonic msg_id by PGMQ. The pgbus_stream_from helper captures the current MAX(msg_id) at render time and embeds it in the HTML as since-id. When the SSE client connects, it sends that cursor as ?since= on the first request and as Last-Event-ID on reconnects. The streamer replays from pgmq.q_* (live) UNION pgmq.a_* (archive) for any msg_id > cursor, then switches to LISTEN/NOTIFY for the live path. There is no message identity gap between the render and the subscribe — the cursor model guarantees every broadcast is delivered exactly once, in order, even across reconnects.

One Puma worker (or Falcon reactor) hosts one Pgbus::Web::Streamer::Instance singleton with three threads (Listener / Dispatcher / Heartbeat) and one dedicated PG connection for LISTEN. Hijacked SSE sockets are held outside the web server's thread pool on Puma (confirmed by an integration test that fires 20 concurrent hijacked connections and observes them complete in parallel on an 8-thread Puma server, puma/puma#1009) and inside a fiber on Falcon (one fiber per hijacked connection, scheduler-backed non-blocking IO).

Don't use ActionController::Live for pgbus streams. It's the conventional Rails answer for SSE and it's the wrong one. Live blocks inside @app.call(env) for the lifetime of the connection, which ties up a Puma thread per subscriber — the exact problem the rack.hijack-based architecture above exists to avoid. Pgbus's streams endpoint is a mounted Rack app (not a Rails controller) so the temptation isn't even available, and a rake pgbus:streams:lint_no_live task fails CI if any pgbus controller includes it. If you're tempted to wire SSE through a Rails controller, use pgbus_stream_from in your view instead — the helper handles the cursor, replay, reconnect, and Puma-thread-release concerns for you.

Per-stream retention is handled by the main pgbus dispatcher process on the same interval as the dispatcher's ARCHIVE_COMPACTION_INTERVAL constant. Streams default to a 5-minute retention because SSE clients reconnect within seconds; chat-style applications override the retention to days via streams_retention.

Stream name helpers

Apps using UUID primary keys with turbo-rails-style dom IDs can hit PGMQ's 47-character queue-name ceiling ("gid://app/Ai::Chat/9c14e8b2-...:messages" exceeds the limit before the pgbus_ prefix is even added). Pgbus provides helpers to generate short, collision-safe stream names:

# In your ApplicationRecord
class ApplicationRecord < ActiveRecord::Base
  primary_abstract_class
  include Pgbus::Streams::Streamable
end

This gives every model short_id (16-hex SHA-256 prefix of the GlobalID) and to_stream_key:

chat = Ai::Chat.find("9c14e8b2-...")
chat.short_id        # => "ai_chat_a3f8c1e9d2b47610"
chat.to_stream_key   # => "ai_chat_a3f8c1e9d2b47610"

# Compose multi-part stream names
Pgbus.stream_key(chat, :messages)  # => "ai_chat_a3f8c1e9d2b47610_messages"

# Use in views
<%= pgbus_stream_from Pgbus.stream_key(@chat, :messages) %>

The budget is computed from config.queue_prefix at call time so prefix overrides adjust automatically. If a stream name exceeds the budget, Pgbus::Streams::StreamNameTooLong is raised immediately with the offending name, computed budget, and a pointer to Pgbus.stream_key — before PGMQ is ever touched.

Transactional broadcasts

This is the feature no other Rails real-time stack can offer. A broadcast issued inside an open ActiveRecord transaction is deferred until the transaction commits. If it rolls back, the broadcast silently drops — clients never see the change that the database never persisted.

ActiveRecord::Base.transaction do
  @order.update!(status: "shipped")
  @order.broadcast_replace_to :account           # ← deferred until commit
  RelatedService.update_counters!(@order)        # ← might raise, rolling back the update
end
# If RelatedService raised, the database state is unchanged AND no SSE client
# ever saw a "shipped" broadcast. The broadcast and the data mutation are
# atomic with respect to each other.

ActionCable can't do this because its broadcast path goes through Redis pub/sub, which has no concept of your application's transaction boundary. Pgbus detects the open AR transaction via ActiveRecord::Base.connection.current_transaction.after_commit, which is a first-class Rails API — no outbox table, no background worker, no extra storage.

Outside an open transaction, broadcasts are synchronous and return the assigned msg_id as before. Inside a transaction, they return nil (the id isn't known until commit time).

Replaying history on connect (replay:)

By default pgbus_stream_from @room captures MAX(msg_id) at render time and replays only broadcasts published after that — the page-born-stale fix. For chat-history-style applications where the page should show backlog on load, pass replay::

<%# Show the last 50 messages on load, then stream live %>
<%= pgbus_stream_from @room, replay: 50 %>

<%# Show everything in PGMQ retention on load %>
<%= pgbus_stream_from @room, replay: :all %>

<%# Default behavior (post-render only) — same as omitting the option %>
<%= pgbus_stream_from @room, replay: :watermark %>

The replay cap is applied server-side: the helper computes since_id = max(0, current_msg_id - N) for integer N and writes that into the HTML attribute. The client just reads the attribute and sends it as ?since= on connect. Nothing else changes about the transport.

How much history is actually available depends on the stream's retention setting (streams_retention or streams_default_retention, both in seconds). A chat stream configured with streams_retention = { /^chat_/ => 7.days } will replay up to seven days of history with replay: :all; a notification stream with the 5-minute default will only go back five minutes.

Server-side audience filtering

Some broadcasts shouldn't reach every subscriber on a stream. Pgbus supports per-connection filtering via a registry of named predicates evaluated against each connection's authorize-hook context:

# config/initializers/pgbus_streams.rb
Pgbus::Streams.filters.register(:admin_only) { |user| user&.admin? }
Pgbus::Streams.filters.register(:workspace_member) do |user, stream_name|
  user&.workspace_ids&.include?(stream_name.split(":").last.to_i)
end

The authorize hook on Pgbus::Web::StreamApp doubles as a context provider — return any non-boolean value (typically a User model) and pgbus will pass it to the filter predicate when evaluating broadcasts:

Pgbus::Web::StreamApp.new(authorize: ->(env, _stream_name) {
  user = User.find_by(id: env["rack.session"][:user_id])
  return false unless user
  user  # ← context attached to the connection
})

Then label broadcasts with the filter you want to apply:

@order.broadcast_replace_to :account                                # delivered to everyone
Pgbus.stream("ops").broadcast(html, visible_to: :admin_only)        # admins only

Failure semantics:

  • Unknown filter label → fail-CLOSED with a warning log. Audience filtering is a data-isolation feature; failing open on a typo would turn a restricted broadcast into a public one. The warning log is loud enough that typos still get noticed in dev ("why are no subscribers receiving my broadcast?" → check the log).
  • Filter predicate raises → fail-CLOSED. A buggy predicate that crashes is treated as "deny" so private data doesn't leak on an exception path.
  • No visible_to on the broadcast → no filter applied; everyone sees it.

The filter registry is process-local. Each Puma worker (or Falcon reactor) has its own copy populated at boot. Filter predicates run on the subscriber side — the predicate itself can't be serialized through PGMQ, so the broadcast carries only the label name.

Presence

Pgbus tracks who is currently subscribed to a stream via a pgbus_presence_members table. This is the standard "X people are in this room" feature that chat apps and collaboration tools need:

rails generate pgbus:add_presence
rails db:migrate
class RoomsController < ApplicationController
  def show
    @room = Room.find(params[:id])
    Pgbus.stream(@room).presence.join(
      member_id: current_user.id.to_s,
      metadata: { name: current_user.name, avatar: current_user.avatar_url }
    ) do |member|
      render_to_string(partial: "presence/joined", locals: { member: member })
    end
  end

  def destroy
    Pgbus.stream(@room).presence.leave(member_id: current_user.id.to_s) do |member|
      "<turbo-stream action=\"remove\" target=\"presence-#{member['id']}\"></turbo-stream>"
    end
  end
end

The block passed to join/leave is rendered into HTML and broadcast through the regular pgbus stream pipeline — so it shows up in every connected client's DOM in real time, alongside the normal broadcasts_to output. Reading the current member list:

Pgbus.stream(@room).presence.members
# => [{ "id" => "7", "metadata" => {...}, "joined_at" => "...", "last_seen_at" => "..." }]

Pgbus.stream(@room).presence.count
# => 5

Heartbeat and expiry. Members that don't ping presence.touch(member_id: ...) periodically can be expired by a sweeper:

# Run from a cron, ActiveJob, or after each subscriber heartbeat
Pgbus.stream(@room).presence.sweep!(older_than: 60.seconds.ago)

The sweep uses DELETE ... RETURNING so multiple workers running it concurrently won't double-emit leave events.

Deliberately left to the application:

  • Join/leave is explicit, not connection-driven. The controller decides who is "present" — a connected SSE client is not always a present user (think tab-in-background, multi-tab dedup).
  • The stale-member sweep is manual. Run it from a cron, an ActiveJob, or your existing heartbeat — pgbus does not assume one over the others.
  • The DOM markup for join/leave is whatever your join/leave block returns. Pgbus does not impose a fixed presence schema on <pgbus-stream-source>.

Stream stats (opt-in)

Pgbus can record one row in pgbus_stream_stats per broadcast, connect, and disconnect so the /pgbus/insights dashboard shows stream throughput alongside job throughput. This is disabled by default because stream event volume can dwarf job volume in chat-style apps — enable it deliberately when you want the observability.

# config/initializers/pgbus.rb
Pgbus.configure do |c|
  c.streams_stats_enabled = true
end

Then run the migration generator once:

rails generate pgbus:add_stream_stats                  # Add the migration
rails generate pgbus:add_stream_stats --database=pgbus # For separate database
rails db:migrate

The Insights tab gains a "Real-time Streams" section with counts of broadcasts / connects / disconnects, an "active" estimate (connects − disconnects in the selected window), average fanout per broadcast, and a "Top Streams by Broadcast Volume" table. The existing stats_retention config covers cleanup, so there is no separate retention knob.

Overhead on a real Puma + PGMQ setup (bundle exec rake bench:streams): the most visible cost is an INSERT per connect/disconnect pair, which shows up under thundering-herd connect scenarios (K=50 concurrent connects: ~+20% per-connect latency). Steady-state broadcast and fanout numbers stay in the run-to-run noise band. Enable it if Insights is useful; leave it off if the write traffic worries you.

Testing

Pgbus ships opt-in test helpers for both RSpec and Minitest. The testing module is never autoloaded by Zeitwerk -- you must require it explicitly, so it cannot leak into production.

RSpec setup

Add one line to your spec/rails_helper.rb (or spec/spec_helper.rb):

# spec/rails_helper.rb
require "pgbus/testing/rspec"

This does three things:

  1. Loads Pgbus::Testing with the in-memory EventStore and mode management
  2. Registers the have_published_event matcher
  3. Includes Pgbus::Testing::Assertions into all example groups

You still need to activate a testing mode and clear the store per test. Add a before/after block:

# spec/rails_helper.rb
require "pgbus/testing/rspec"

RSpec.configure do |config|
  config.before { Pgbus::Testing.fake! }
  config.after do
    Pgbus::Testing.disabled!
    Pgbus::Testing.store.clear!
  end
end

Or scope it to specific groups:

RSpec.configure do |config|
  config.before(:each, :pgbus) { Pgbus::Testing.fake! }
  config.after(:each, :pgbus) do
    Pgbus::Testing.disabled!
    Pgbus::Testing.store.clear!
  end
end

# Usage:
RSpec.describe OrderService, :pgbus do
  it "publishes an event" do
    expect { described_class.create!(attrs) }
      .to have_published_event("orders.created")
  end
end

Minitest / TestUnit setup

Add the require and include to your test/test_helper.rb:

# test/test_helper.rb
require "pgbus/testing/minitest"

class ActiveSupport::TestCase
  include Pgbus::Testing::MinitestHelpers
end

MinitestHelpers hooks into Minitest's lifecycle automatically:

  • before_setup -- activates :fake mode and clears the event store before each test
  • Includes all assertion helpers (assert_pgbus_published, assert_no_pgbus_published, pgbus_published_events, perform_published_events)

No additional setup/teardown blocks are needed -- the module handles it.

Event bus assertions

Both RSpec and Minitest share the same assertion helpers via Pgbus::Testing::Assertions:

# Assert that a block publishes exactly N events
assert_pgbus_published(count: 1, routing_key: "orders.created") do
  OrderService.create!(attrs)
end

# Assert that a block publishes zero events
assert_no_pgbus_published(routing_key: "orders.created") do
  OrderService.preview(attrs)
end

# Inspect captured events directly
events = pgbus_published_events(routing_key: "orders.created")
assert_equal 1, events.size
assert_equal({ "id" => 42 }, events.first.payload)

# Capture events, then dispatch them to registered handlers
perform_published_events do
  OrderService.create!(attrs)
end
# After the block, all captured events have been dispatched to their
# matching handlers synchronously -- useful for testing side effects

RSpec matchers

The have_published_event matcher supports chainable constraints:

# Basic: assert any event was published with the given routing key
expect { publish_order(order) }
  .to have_published_event("orders.created")

# With payload matching (uses RSpec's values_match?, so hash_including works)
expect { publish_order(order) }
  .to have_published_event("orders.created")
  .with_payload(hash_including("id" => order.id))

# With header matching
expect { publish_order(order) }
  .to have_published_event("orders.created")
  .with_headers(hash_including("x-tenant" => "acme"))

# Exact count
expect { publish_order(order) }
  .to have_published_event("orders.created")
  .exactly(1)

# Combine all constraints
expect { publish_order(order) }
  .to have_published_event("orders.created")
  .with_payload(hash_including("id" => order.id))
  .with_headers(hash_including("x-tenant" => "acme"))
  .exactly(1)

# Negated
expect { publish_order(order) }
  .not_to have_published_event("orders.cancelled")

Testing modes

Three modes control how Pgbus::EventBus::Publisher.publish behaves:

Mode Behavior Use case
:fake Captures events in-memory, no PGMQ calls, no handler dispatch Most unit/integration tests
:inline Captures events AND immediately dispatches to matching handlers Testing side effects (handler logic)
:disabled Pass-through to real publisher (production behavior) Default; integration tests with real PGMQ

Switch modes globally or scoped to a block:

# Global (persists until changed)
Pgbus::Testing.fake!
Pgbus::Testing.inline!
Pgbus::Testing.disabled!

# Scoped (restores previous mode after block)
Pgbus::Testing.inline! do
  OrderService.create!(attrs)  # handlers fire synchronously
end
# mode is restored to whatever it was before

# Query current mode
Pgbus::Testing.fake?     # => true/false
Pgbus::Testing.inline?   # => true/false
Pgbus::Testing.disabled? # => true/false

The :inline mode skips delayed publishes (delay: > 0) -- those are captured in the store but not dispatched. Use Pgbus::Testing.store.drain! to manually dispatch all captured events including delayed ones.

SSE streams in tests

When using use_transactional_fixtures = true (the default in Rails), pgbus SSE streams are incompatible with transactional test isolation. The rack.hijack mechanism spawns background threads that acquire their own database connections outside the test transaction, which causes:

  • Connection pool exhaustion after enough system tests
  • CI hangs (tests freeze waiting for a connection)
  • Errno::EPIPE errors when the browser navigates away

Automatic fix with Pgbus::Testing: When you activate :fake or :inline mode (as shown above), pgbus automatically enables streams_test_mode. The SSE endpoint returns a stub response (valid SSE headers + a comment + immediate close) without hijacking, without spawning background threads, and without acquiring any database connections. The <pgbus-stream-source> custom element still renders and connects, but no PGMQ polling occurs.

If you're using the RSpec or Minitest setup shown above, you don't need to do anything extra -- streams are safe automatically.

Manual configuration (if you don't use Pgbus::Testing):

# config/initializers/pgbus.rb
Pgbus.configure do |c|
  c.streams_test_mode = true if Rails.env.test?
end

Or toggle it per test:

# RSpec
before { Pgbus.configuration.streams_test_mode = true }
after  { Pgbus.configuration.streams_test_mode = false }

# Minitest
setup    { Pgbus.configuration.streams_test_mode = true }
teardown { Pgbus.configuration.streams_test_mode = false }

What streams_test_mode does: The StreamApp short-circuits after signature verification and authorization checks but before any hijack, streaming body, or capacity logic. It returns:

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache, no-transform

: pgbus test mode — connection accepted, no polling

This is a valid SSE response that the browser's EventSource will accept. No Streamer singleton is created, no PG LISTEN connection is opened, and no dispatcher/heartbeat/listener threads are spawned.

Testing actual stream delivery: If you need to verify end-to-end SSE message delivery in integration tests, disable streams_test_mode and use the PumaTestHarness from the pgbus test support:

require "pgbus/testing"

Pgbus::Testing.disabled! do
  # streams_test_mode is automatically disabled
  # Use real Puma + real PGMQ for end-to-end stream tests
end

See spec/integration/streams/ in the pgbus source for examples of integration tests that exercise the full SSE pipeline with a real Puma server.

Operations

Day-to-day running of Pgbus: starting and stopping processes, observing what is happening on the dashboard, the database tables Pgbus relies on, and how to migrate from an existing job backend.

CLI

pgbus start     # Start supervisor with workers + dispatcher + scheduler
pgbus status    # Show running processes
pgbus queues    # List queues with depth/metrics
pgbus version   # Print version
pgbus help      # Show help

Role flags (split deployments)

By default, pgbus start boots every role in one supervisor (workers, dispatcher, scheduler, event consumers, outbox poller). For containerized deployments where each role lives in a separate process, use the role flags:

pgbus start --workers-only      # Only worker processes
pgbus start --scheduler-only    # Only the recurring-task scheduler
pgbus start --dispatcher-only   # Only the maintenance dispatcher

These flags are mutually exclusive. The auto-tuned pool_size adjusts to the role: a --scheduler-only deployment with 50 worker threads configured only opens the connections it actually needs (1 for the scheduler), not 51.

Capsule selection

--capsule NAME boots a single named capsule. Combine with --workers-only to run one capsule per container:

pgbus start --workers-only --capsule critical
pgbus start --workers-only --capsule default

The capsule name is the :name you passed to c.capsule in your initializer (or the first queue token when using the string DSL).

Dashboard

The dashboard is a mountable Rails engine at /pgbus with:

  • Overview — queue depths, enqueued count, active processes, failure count, throughput rate
  • Queues — per-queue metrics, purge/pause/resume/delete actions
  • Jobs — enqueued and failed jobs, retry/discard actions
  • Dead letter — DLQ messages with retry/discard, bulk actions
  • Processes — active workers/dispatcher/consumers with heartbeat status
  • Events — registered subscribers and processed events
  • Outbox — transactional outbox entries pending publication
  • Locks — active job uniqueness locks with state (queued/executing), owner PID@hostname, age
  • Insights — throughput chart (jobs/min), status distribution donut, slowest job classes table

All tables use Turbo Frames for periodic auto-refresh without page reloads. Destructive actions use styled confirmation dialogs (not browser confirm()), and flash messages appear as auto-dismissing toast notifications.

Queue management

The queues page lets you manage PGMQ queues directly:

  • Purge — removes all messages from the queue (the queue itself remains)
  • Delete — permanently drops the queue from PGMQ (removes the queue table and metadata)
  • Pause / Resume — pauses or resumes job processing for a queue

All destructive actions require confirmation. Pause/resume and delete are available on both the queue index and detail pages.

Dark mode

The dashboard supports dark mode via Tailwind CSS dark: classes. It respects your system preference on first visit and persists your choice via localStorage. Toggle with the sun/moon button in the nav bar.

Job stats and insights

The executor records every job completion to pgbus_job_stats (job class, queue, status, duration). The insights page visualizes this data with ApexCharts (loaded via CDN, zero npm dependencies).

rails generate pgbus:add_job_stats           # Add the stats migration
rails generate pgbus:add_job_stats --database=pgbus

Stats collection is enabled by default (config.stats_enabled = true). Old stats are cleaned up by the dispatcher based on config.stats_retention (default: 30 days). If the migration hasn't been run yet, stat recording is silently skipped.

Database tables

Pgbus uses these tables (created via PGMQ and migrations):

Table Purpose
q_pgbus_* PGMQ job queues (managed by PGMQ)
a_pgbus_* PGMQ archive tables (managed by PGMQ, compacted by dispatcher)
pgbus_processes Heartbeat tracking for workers/dispatcher/consumers
pgbus_failed_events Failed event dispatch records
pgbus_processed_events Idempotency deduplication (event_id, handler_class)
pgbus_semaphores Concurrency control counting semaphores
pgbus_blocked_executions Jobs waiting for a concurrency semaphore slot
pgbus_batches Batch tracking with job counters and callback config
pgbus_job_locks Job uniqueness locks (state, owner_pid, reaper correlation)
pgbus_job_stats Job execution metrics (class, queue, status, duration)
pgbus_queue_states Queue pause/resume and circuit breaker state
pgbus_outbox_entries Transactional outbox entries pending publication
pgbus_recurring_tasks Recurring job definitions
pgbus_recurring_executions Recurring job execution history
pgbus_presence_members Stream presence tracking (who is subscribed)
pgbus_stream_stats Stream broadcast/connect/disconnect metrics (opt-in)

Switching from another backend

Already using a different job processor? These guides walk you through the migration:

  • Switch from Sidekiq — remove Redis, convert native workers, replace middleware with callbacks
  • Switch from SolidQueue — similar architecture, swap config format, gain LISTEN/NOTIFY + worker recycling
  • Switch from GoodJob — both PostgreSQL-native, swap advisory locks for PGMQ visibility timeouts

See docs/README.md for a full feature comparison table.

Reference

Architectural overview and the full list of configuration settings.

Architecture

Supervisor (fork manager)
  ├── Worker 1        (queues: [default, mailers], threads: 10, priority: 10)
  ├── Worker 2        (queues: [critical], threads: 5, single_active_consumer: true)
  ├── Dispatcher      (maintenance: cleanup, compaction, reaping, circuit breaker)
  ├── Scheduler       (recurring tasks via cron)
  ├── Consumer        (event bus topics)
  └── Outbox Poller   (transactional outbox → PGMQ, when enabled)

PostgreSQL + PGMQ
  ├── pgbus_default          (job queue)
  ├── pgbus_default_dlq      (dead letter queue)
  ├── pgbus_critical         (job queue)
  ├── pgbus_critical_dlq     (dead letter queue)
  ├── pgbus_mailers          (job queue)
  └── pgbus_queue_states     (pause/resume + circuit breaker state)

How a job flows through the system

  1. Enqueue: ActiveJob serializes the job to JSON, Pgbus sends it to the appropriate PGMQ queue
  2. Read: Workers poll queues (or wake instantly via LISTEN/NOTIFY) and claim messages with a visibility timeout
  3. Execute: The job is deserialized and executed within the Rails executor
  4. Archive/Retry: On success, the message is archived. On failure, the visibility timeout expires and the message becomes available again. PGMQ's read_ct tracks delivery attempts
  5. Dead letter: When read_ct exceeds max_retries, the message is moved to the _dlq queue for manual inspection

Configuration reference

Option Default Description
database_url nil PostgreSQL connection URL (auto-detected in Rails)
queue_prefix "pgbus" Prefix for all PGMQ queue names
default_queue "default" Default queue for jobs without explicit queue
pool_size nil (auto) Connection pool size. Auto-tuned from worker thread counts: sum(workers.threads) + sum(event_consumers.threads) + 2. Set explicitly to override.
workers [{queues: ["default"], threads: 5}] Worker capsule definitions. String DSL ("default: 5; critical: 10"), Array, or nil.
event_consumers nil Event consumer process definitions (same format as workers)
roles nil (all) Supervisor role filter — usually set via CLI flags (--workers-only etc.)
polling_interval 0.1 Seconds between polls (LISTEN/NOTIFY is primary)
visibility_timeout 30 Time before unacked message becomes visible again. Accepts seconds or ActiveSupport::Duration (e.g. 10.minutes)
max_retries 5 Failed reads before routing to dead letter queue
retry_backoff 5 Base delay in seconds for VT-based retry backoff (exponential: base * 2^(attempt-1))
retry_backoff_max 300 Maximum retry delay in seconds (caps the exponential curve)
retry_backoff_jitter 0.15 Jitter factor (0-1) added to retry delays to spread retries
max_jobs_per_worker nil Recycle worker after N jobs (nil = unlimited)
max_memory_mb nil Recycle worker when memory exceeds N MB
max_worker_lifetime nil Recycle worker after N seconds. Accepts seconds or Duration.
listen_notify true Use PGMQ's LISTEN/NOTIFY for instant wake-up
prefetch_limit nil Max in-flight messages per worker (nil = unlimited)
dispatch_interval 1.0 Seconds between dispatcher maintenance ticks
circuit_breaker_enabled true Enable auto-pause on consecutive failures (threshold and backoff are tuned via Pgbus::CircuitBreaker constants)
priority_levels nil Number of priority sub-queues (nil = disabled, 2-10)
default_priority 1 Default priority for jobs without explicit priority
archive_retention 7.days How long to keep archived messages. Accepts seconds, Duration, or nil to disable cleanup
outbox_enabled false Enable transactional outbox poller process
outbox_poll_interval 1.0 Seconds between outbox poll cycles
outbox_batch_size 100 Max entries per outbox poll cycle
outbox_retention 1.day How long to keep published outbox entries. Accepts seconds, Duration, or nil to disable cleanup
idempotency_ttl 7.days How long to keep processed event records. Accepts seconds, Duration, or nil to disable cleanup
base_controller_class "::ActionController::Base" Base class for dashboard controllers (string, constantized at load time)
return_to_app_url nil URL for "back to app" button in dashboard nav (nil hides the button)
web_auth nil Lambda for dashboard authentication
web_refresh_interval 5000 Dashboard auto-refresh interval in milliseconds
web_live_updates true Enable Turbo Frames auto-refresh on dashboard
stats_enabled true Record job execution stats for insights dashboard
stats_retention 30.days How long to keep job stats. Accepts seconds, Duration, or nil to disable cleanup
streams_test_mode false Return a stub SSE response without hijack or background threads. Auto-enabled by Pgbus::Testing.fake!/.inline!. See SSE streams in tests.
streams_stats_enabled false Record stream broadcast/connect/disconnect stats (opt-in, can be high volume)
streams_path nil Custom URL path for the SSE endpoint (nil = auto-detected from engine mount)
execution_mode :threads Global execution mode (:threads or :async). Per-worker override via capsule config.
error_reporters [] Array of callables invoked on caught exceptions. Each receives (exception, context_hash).
log_format :text Log formatter (:text or :json). Sets logger.formatter automatically.
metrics_enabled true Enable Prometheus-compatible metrics on the dashboard

Development

bundle install
bundle exec rake          # Run tests + rubocop
bundle exec rspec         # Run tests only
bundle exec rubocop       # Run linter only

System tests use Playwright via Capybara:

bun install
bunx --bun playwright install chromium
bundle exec rspec spec/system/

End-to-end streams benchmarks (real Puma + real PGMQ + real SSE clients):

PGBUS_DATABASE_URL=postgres://user@host/db bundle exec rake bench:streams

The harness measures single-broadcast roundtrip latency, burst throughput, fanout to many clients, and concurrent connect under thundering herd. See benchmarks/streams_bench.rb.

License

The gem is available as open source under the terms of the MIT License.