Module: Pgbus

Defined in:
lib/pgbus.rb,
lib/pgbus/cli.rb,
lib/pgbus/batch.rb,
lib/pgbus/event.rb,
lib/pgbus/client.rb,
lib/pgbus/engine.rb,
lib/pgbus/outbox.rb,
lib/pgbus/streams.rb,
lib/pgbus/testing.rb,
lib/pgbus/version.rb,
lib/pgbus/bus_record.rb,
lib/pgbus/serializer.rb,
lib/pgbus/uniqueness.rb,
lib/pgbus/concurrency.rb,
lib/pgbus/dedup_cache.rb,
lib/pgbus/pgmq_schema.rb,
lib/pgbus/stat_buffer.rb,
lib/pgbus/streams/key.rb,
lib/pgbus/rate_counter.rb,
lib/pgbus/web/streamer.rb,
lib/pgbus/config_loader.rb,
lib/pgbus/configuration.rb,
lib/pgbus/log_formatter.rb,
lib/pgbus/outbox/poller.rb,
lib/pgbus/queue_factory.rb,
lib/pgbus/retry_backoff.rb,
lib/pgbus/error_reporter.rb,
lib/pgbus/process/worker.rb,
lib/pgbus/recurring/task.rb,
lib/pgbus/streams/cursor.rb,
lib/pgbus/web/stream_app.rb,
app/models/pgbus/job_lock.rb,
app/models/pgbus/job_stat.rb,
lib/pgbus/circuit_breaker.rb,
lib/pgbus/execution_pools.rb,
lib/pgbus/instrumentation.rb,
lib/pgbus/streams/filters.rb,
lib/pgbus/web/data_source.rb,
app/models/pgbus/semaphore.rb,
lib/pgbus/process/consumer.rb,
lib/pgbus/streams/envelope.rb,
lib/pgbus/streams/presence.rb,
lib/pgbus/testing/minitest.rb,
lib/pgbus/autovacuum_tuning.rb,
lib/pgbus/client/read_after.rb,
lib/pgbus/event_bus/handler.rb,
lib/pgbus/process/heartbeat.rb,
lib/pgbus/process/lifecycle.rb,
app/models/pgbus/batch_entry.rb,
app/models/pgbus/queue_state.rb,
app/models/pgbus/stream_stat.rb,
lib/pgbus/active_job/adapter.rb,
lib/pgbus/event_bus/registry.rb,
lib/pgbus/process/dispatcher.rb,
lib/pgbus/process/queue_lock.rb,
lib/pgbus/process/supervisor.rb,
lib/pgbus/recurring/schedule.rb,
lib/pgbus/streams/streamable.rb,
lib/pgbus/testing/assertions.rb,
lib/pgbus/web/authentication.rb,
app/models/pgbus/outbox_entry.rb,
lib/pgbus/active_job/executor.rb,
lib/pgbus/event_bus/publisher.rb,
lib/pgbus/process/wake_signal.rb,
lib/pgbus/recurring/scheduler.rb,
lib/pgbus/streams/signed_name.rb,
app/models/pgbus/process_entry.rb,
lib/pgbus/event_bus/subscriber.rb,
lib/pgbus/queue_name_validator.rb,
app/models/pgbus/recurring_task.rb,
app/models/pgbus/uniqueness_key.rb,
lib/pgbus/concurrency/semaphore.rb,
lib/pgbus/failed_event_recorder.rb,
lib/pgbus/recurring/command_job.rb,
lib/pgbus/web/streamer/instance.rb,
lib/pgbus/web/streamer/listener.rb,
lib/pgbus/web/streamer/registry.rb,
app/helpers/pgbus/streams_helper.rb,
app/models/pgbus/processed_event.rb,
lib/pgbus/process/signal_handler.rb,
lib/pgbus/web/metrics_serializer.rb,
lib/pgbus/web/streamer/heartbeat.rb,
lib/pgbus/web/streamer/io_writer.rb,
lib/pgbus/recurring/config_loader.rb,
lib/pgbus/web/streamer/connection.rb,
app/models/pgbus/blocked_execution.rb,
app/models/pgbus/application_record.rb,
lib/generators/pgbus/migration_path.rb,
lib/pgbus/configuration/capsule_dsl.rb,
lib/pgbus/process/consumer_priority.rb,
app/helpers/pgbus/application_helper.rb,
app/models/pgbus/recurring_execution.rb,
lib/pgbus/client/ensure_stream_queue.rb,
lib/pgbus/execution_pools/async_pool.rb,
lib/pgbus/recurring/already_recorded.rb,
app/controllers/pgbus/jobs_controller.rb,
lib/generators/pgbus/update_generator.rb,
lib/pgbus/execution_pools/thread_pool.rb,
lib/pgbus/generators/config_converter.rb,
lib/pgbus/streams/turbo_broadcastable.rb,
app/controllers/pgbus/locks_controller.rb,
lib/generators/pgbus/install_generator.rb,
app/controllers/pgbus/events_controller.rb,
app/controllers/pgbus/locale_controller.rb,
app/controllers/pgbus/outbox_controller.rb,
app/controllers/pgbus/queues_controller.rb,
lib/pgbus/concurrency/blocked_execution.rb,
lib/pgbus/generators/migration_detector.rb,
lib/pgbus/streams/turbo_stream_override.rb,
lib/pgbus/web/streamer/falcon_connection.rb,
app/controllers/pgbus/insights_controller.rb,
lib/generators/pgbus/add_outbox_generator.rb,
app/controllers/pgbus/api/stats_controller.rb,
app/controllers/pgbus/dashboard_controller.rb,
app/controllers/pgbus/frontends_controller.rb,
app/controllers/pgbus/processes_controller.rb,
lib/generators/pgbus/add_presence_generator.rb,
lib/generators/pgbus/upgrade_pgmq_generator.rb,
app/controllers/pgbus/api/metrics_controller.rb,
app/controllers/pgbus/application_controller.rb,
app/controllers/pgbus/dead_letter_controller.rb,
lib/generators/pgbus/add_job_locks_generator.rb,
lib/generators/pgbus/add_job_stats_generator.rb,
lib/generators/pgbus/add_recurring_generator.rb,
lib/pgbus/streams/watermark_cache_middleware.rb,
app/controllers/pgbus/api/insights_controller.rb,
lib/pgbus/generators/database_target_detector.rb,
lib/generators/pgbus/tune_autovacuum_generator.rb,
lib/pgbus/web/streamer/stream_event_dispatcher.rb,
lib/generators/pgbus/add_queue_states_generator.rb,
lib/generators/pgbus/add_stream_stats_generator.rb,
app/controllers/pgbus/recurring_tasks_controller.rb,
lib/generators/pgbus/migrate_job_locks_generator.rb,
lib/generators/pgbus/add_job_stats_latency_generator.rb,
lib/generators/pgbus/add_failed_events_index_generator.rb,
lib/generators/pgbus/add_job_stats_queue_index_generator.rb

Defined Under Namespace

Modules: ActiveJob, Api, ApplicationHelper, AutovacuumTuning, CLI, Concurrency, ConfigLoader, ErrorReporter, EventBus, ExecutionPools, Generators, Instrumentation, LogFormatter, Outbox, PgmqSchema, Process, QueueFactory, QueueNameValidator, Recurring, RetryBackoff, Serializer, Streams, StreamsHelper, Testing, Uniqueness, Web Classes: ApplicationController, ApplicationRecord, Batch, BatchEntry, BlockedExecution, BusRecord, CircuitBreaker, Client, ConcurrencyLimitExceeded, Configuration, ConfigurationError, DashboardController, DeadLetterController, DeadLetterError, DedupCache, Engine, Error, Event, EventsController, FailedEventRecorder, FrontendsController, InsightsController, JobLock, JobNotUnique, JobStat, JobsController, LocaleController, LocksController, OutboxController, OutboxEntry, ProcessEntry, ProcessedEvent, ProcessesController, QueueNotFoundError, QueueState, QueuesController, RateCounter, RecurringExecution, RecurringTask, RecurringTasksController, SchemaNotReady, Semaphore, SerializationError, StatBuffer, StreamStat, UniquenessKey

Constant Summary collapse

DEAD_LETTER_SUFFIX =

Suffix appended to a queue name to derive its dead-letter companion (e.g. “pgbus_default” -> “pgbus_default_dlq”). Hard-coded here because changing it on a running deployment would orphan every existing DLQ message; nothing in the codebase or in user reports has ever needed this to be configurable.

"_dlq"
VERSION =
"0.7.4"

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.stoppingObject



27
28
29
# File 'lib/pgbus.rb', line 27

def stopping
  @stopping || false
end

Class Method Details

.clientObject



84
85
86
# File 'lib/pgbus.rb', line 84

def client
  @client ||= Client.new(configuration)
end

.configurationObject



76
77
78
# File 'lib/pgbus.rb', line 76

def configuration
  @configuration ||= Configuration.new
end

.configure {|configuration| ... } ⇒ Object

Yields:



80
81
82
# File 'lib/pgbus.rb', line 80

def configure
  yield configuration
end

.loaderObject



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/pgbus.rb', line 31

def loader
  @loader ||= begin
    loader = Zeitwerk::Loader.for_gem
    loader.inflector.inflect(
      "pgbus" => "Pgbus",
      "cli" => "CLI",
      "dsl" => "DSL",
      "capsule_dsl" => "CapsuleDSL"
    )
    loader.ignore("#{__dir__}/generators")
    loader.ignore("#{__dir__}/active_job")
    loader.ignore("#{__dir__}/pgbus/testing")
    # lib/puma/plugin/pgbus_streams.rb is a Puma plugin — it's required
    # explicitly by the user from config/puma.rb via `plugin :pgbus_streams`.
    # Without this ignore, Zeitwerk scans lib/puma/ under the pgbus loader
    # root and tries to autoload Puma::Plugin, which collides with the real
    # Puma::Plugin class defined by the puma gem itself.
    loader.ignore("#{__dir__}/puma")
    loader
  end
end

.loggerObject



138
139
140
# File 'lib/pgbus.rb', line 138

def logger
  configuration.logger
end

.logger=(value) ⇒ Object



142
143
144
# File 'lib/pgbus.rb', line 142

def logger=(value)
  configuration.logger = value
end

.models_loaderObject

Separate loader for app/models used only in non-Rails contexts (specs, standalone scripts). When the Engine boots, Rails’ autoloader takes over app/models and this loader is torn down to avoid conflicts.



56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/pgbus.rb', line 56

def models_loader
  models_dir = File.expand_path("../app/models", __dir__)
  return nil unless File.directory?(models_dir)

  @models_loader ||= begin
    loader = Zeitwerk::Loader.new
    loader.tag = "pgbus-models"
    loader.push_dir(models_dir)
    loader.setup
    loader
  end
end

.reset!Object



123
124
125
126
127
128
# File 'lib/pgbus.rb', line 123

def reset!
  @client&.close
  @client = nil
  @configuration = nil
  @stream_cache = nil
end

.reset_client!Object

Discard the inherited PGMQ client after fork. Do NOT call close — the parent’s @pgmq_mutex is in undefined state post-fork and attempting to acquire it can deadlock. The next call to Pgbus.client will lazily create a fresh one.



134
135
136
# File 'lib/pgbus.rb', line 134

def reset_client!
  @client = nil
end

.stream(streamables) ⇒ Object

Entry point for the streams subsystem — ‘Pgbus.stream(name).broadcast(html)` or `Pgbus.stream(@order).current_msg_id`. Defined on Pgbus itself rather than inside lib/pgbus/streams.rb because that file is only Zeitwerk-loaded when Pgbus::Streams::Stream is first referenced — the chicken-and-egg problem means `Pgbus.stream(…)` would be undefined on the first call. Referencing Streams::Stream inside the method body forces Zeitwerk to load lib/pgbus/streams.rb lazily on first use, which is fine.

Caches Stream instances by logical name so high-frequency callers (e.g. Turbo::StreamsChannel.broadcast_stream_to inside an after_update_commit callback firing 1000x/sec) don’t allocate a new Stream + Mutex per broadcast. The cache is process-local; reset! clears it. The cache key is the resolved name string, not the raw streamables, so ‘Pgbus.stream(@order)` and `Pgbus.stream(@order)` in the same process return the same instance.



103
104
105
106
107
# File 'lib/pgbus.rb', line 103

def stream(streamables)
  name = Streams::Stream.name_from(streamables)
  @stream_cache ||= Concurrent::Map.new
  @stream_cache.compute_if_absent(name) { Streams::Stream.new(streamables) }
end

.stream_key(*parts) ⇒ Object

Compose a short, pgbus-safe stream identifier from any mix of records, strings, symbols, and arrays. Delegates to ‘Pgbus::Streams::Key.stream_key`; raises `ArgumentError` if the resulting key would overflow the pgbus queue-name budget. See `lib/pgbus/streams/key.rb` for the digest policy and rationale.

Pgbus.stream_key(chat, :messages)
# => "ai_chat_3a4f9c21b7d20e18:messages"

Pgbus.stream(Pgbus.stream_key(chat, :messages)).broadcast(html)


119
120
121
# File 'lib/pgbus.rb', line 119

def stream_key(*parts, **)
  Streams::Key.stream_key(*parts, **)
end

.teardown_models_loader!Object



69
70
71
72
73
74
# File 'lib/pgbus.rb', line 69

def teardown_models_loader!
  return unless @models_loader

  @models_loader.unregister
  @models_loader = nil
end