Module: Pgbus

Defined in:
lib/pgbus.rb,
lib/pgbus/cli.rb,
lib/pgbus/batch.rb,
lib/pgbus/event.rb,
lib/pgbus/client.rb,
lib/pgbus/engine.rb,
lib/pgbus/outbox.rb,
lib/pgbus/streams.rb,
lib/pgbus/testing.rb,
lib/pgbus/version.rb,
lib/pgbus/bus_record.rb,
lib/pgbus/serializer.rb,
lib/pgbus/uniqueness.rb,
lib/pgbus/concurrency.rb,
lib/pgbus/dedup_cache.rb,
lib/pgbus/pgmq_schema.rb,
lib/pgbus/stat_buffer.rb,
lib/pgbus/streams/key.rb,
lib/pgbus/rate_counter.rb,
lib/pgbus/web/streamer.rb,
lib/pgbus/config_loader.rb,
lib/pgbus/configuration.rb,
lib/pgbus/log_formatter.rb,
lib/pgbus/outbox/poller.rb,
lib/pgbus/queue_factory.rb,
lib/pgbus/retry_backoff.rb,
lib/pgbus/error_reporter.rb,
lib/pgbus/process/worker.rb,
lib/pgbus/recurring/task.rb,
lib/pgbus/streams/cursor.rb,
lib/pgbus/web/stream_app.rb,
app/models/pgbus/job_lock.rb,
app/models/pgbus/job_stat.rb,
lib/pgbus/circuit_breaker.rb,
lib/pgbus/execution_pools.rb,
lib/pgbus/instrumentation.rb,
lib/pgbus/streams/filters.rb,
lib/pgbus/web/data_source.rb,
app/models/pgbus/semaphore.rb,
lib/pgbus/process/consumer.rb,
lib/pgbus/streams/envelope.rb,
lib/pgbus/streams/presence.rb,
lib/pgbus/testing/minitest.rb,
lib/pgbus/autovacuum_tuning.rb,
lib/pgbus/client/read_after.rb,
lib/pgbus/event_bus/handler.rb,
lib/pgbus/process/heartbeat.rb,
lib/pgbus/process/lifecycle.rb,
lib/pgbus/table_maintenance.rb,
app/models/pgbus/batch_entry.rb,
app/models/pgbus/queue_state.rb,
app/models/pgbus/stream_stat.rb,
lib/pgbus/active_job/adapter.rb,
lib/pgbus/event_bus/registry.rb,
lib/pgbus/process/dispatcher.rb,
lib/pgbus/process/queue_lock.rb,
lib/pgbus/process/supervisor.rb,
lib/pgbus/recurring/schedule.rb,
lib/pgbus/streams/streamable.rb,
lib/pgbus/testing/assertions.rb,
lib/pgbus/web/authentication.rb,
app/models/pgbus/outbox_entry.rb,
lib/pgbus/active_job/executor.rb,
lib/pgbus/event_bus/publisher.rb,
lib/pgbus/process/wake_signal.rb,
lib/pgbus/recurring/scheduler.rb,
lib/pgbus/streams/signed_name.rb,
app/models/pgbus/process_entry.rb,
lib/pgbus/event_bus/subscriber.rb,
lib/pgbus/queue_name_validator.rb,
app/models/pgbus/recurring_task.rb,
app/models/pgbus/uniqueness_key.rb,
lib/pgbus/concurrency/semaphore.rb,
lib/pgbus/failed_event_recorder.rb,
lib/pgbus/recurring/command_job.rb,
lib/pgbus/web/streamer/instance.rb,
lib/pgbus/web/streamer/listener.rb,
lib/pgbus/web/streamer/registry.rb,
app/helpers/pgbus/streams_helper.rb,
app/models/pgbus/processed_event.rb,
lib/pgbus/integrations/appsignal.rb,
lib/pgbus/process/signal_handler.rb,
lib/pgbus/web/metrics_serializer.rb,
lib/pgbus/web/streamer/heartbeat.rb,
lib/pgbus/web/streamer/io_writer.rb,
lib/pgbus/recurring/config_loader.rb,
lib/pgbus/web/streamer/connection.rb,
app/models/pgbus/blocked_execution.rb,
app/models/pgbus/application_record.rb,
lib/generators/pgbus/migration_path.rb,
lib/pgbus/configuration/capsule_dsl.rb,
lib/pgbus/process/consumer_priority.rb,
app/helpers/pgbus/application_helper.rb,
app/models/pgbus/recurring_execution.rb,
lib/pgbus/client/ensure_stream_queue.rb,
lib/pgbus/execution_pools/async_pool.rb,
lib/pgbus/recurring/already_recorded.rb,
app/controllers/pgbus/jobs_controller.rb,
lib/generators/pgbus/update_generator.rb,
lib/pgbus/execution_pools/thread_pool.rb,
lib/pgbus/generators/config_converter.rb,
lib/pgbus/streams/turbo_broadcastable.rb,
app/controllers/pgbus/locks_controller.rb,
lib/generators/pgbus/install_generator.rb,
lib/pgbus/integrations/appsignal/probe.rb,
app/controllers/pgbus/events_controller.rb,
app/controllers/pgbus/locale_controller.rb,
app/controllers/pgbus/outbox_controller.rb,
app/controllers/pgbus/queues_controller.rb,
lib/pgbus/concurrency/blocked_execution.rb,
lib/pgbus/generators/migration_detector.rb,
lib/pgbus/streams/turbo_stream_override.rb,
lib/pgbus/web/streamer/falcon_connection.rb,
app/controllers/pgbus/insights_controller.rb,
lib/generators/pgbus/add_outbox_generator.rb,
app/controllers/pgbus/api/stats_controller.rb,
app/controllers/pgbus/dashboard_controller.rb,
app/controllers/pgbus/frontends_controller.rb,
app/controllers/pgbus/processes_controller.rb,
lib/generators/pgbus/add_presence_generator.rb,
lib/generators/pgbus/upgrade_pgmq_generator.rb,
lib/pgbus/integrations/appsignal/subscriber.rb,
app/controllers/pgbus/api/metrics_controller.rb,
app/controllers/pgbus/application_controller.rb,
app/controllers/pgbus/dead_letter_controller.rb,
lib/generators/pgbus/add_job_locks_generator.rb,
lib/generators/pgbus/add_job_stats_generator.rb,
lib/generators/pgbus/add_recurring_generator.rb,
lib/pgbus/streams/watermark_cache_middleware.rb,
app/controllers/pgbus/api/insights_controller.rb,
lib/pgbus/generators/database_target_detector.rb,
lib/generators/pgbus/tune_autovacuum_generator.rb,
lib/generators/pgbus/tune_fillfactor_generator.rb,
lib/pgbus/web/streamer/stream_event_dispatcher.rb,
lib/generators/pgbus/add_queue_states_generator.rb,
lib/generators/pgbus/add_stream_stats_generator.rb,
app/controllers/pgbus/recurring_tasks_controller.rb,
lib/generators/pgbus/migrate_job_locks_generator.rb,
lib/generators/pgbus/add_job_stats_latency_generator.rb,
lib/generators/pgbus/add_failed_events_index_generator.rb,
lib/generators/pgbus/add_job_stats_queue_index_generator.rb

Defined Under Namespace

Modules: ActiveJob, Api, ApplicationHelper, AutovacuumTuning, CLI, Concurrency, ConfigLoader, ErrorReporter, EventBus, ExecutionPools, Generators, Instrumentation, Integrations, LogFormatter, Outbox, PgmqSchema, Process, QueueFactory, QueueNameValidator, Recurring, RetryBackoff, Serializer, Streams, StreamsHelper, TableMaintenance, Testing, Uniqueness, Web Classes: ApplicationController, ApplicationRecord, Batch, BatchEntry, BlockedExecution, BusRecord, CircuitBreaker, Client, ConcurrencyLimitExceeded, Configuration, ConfigurationError, DashboardController, DeadLetterController, DeadLetterError, DedupCache, Engine, Error, Event, EventsController, FailedEventRecorder, FrontendsController, InsightsController, JobLock, JobNotUnique, JobStat, JobsController, LocaleController, LocksController, OutboxController, OutboxEntry, ProcessEntry, ProcessedEvent, ProcessesController, QueueNotFoundError, QueueState, QueuesController, RateCounter, RecurringExecution, RecurringTask, RecurringTasksController, SchemaNotReady, Semaphore, SerializationError, StatBuffer, StreamStat, UniquenessKey

Constant Summary collapse

DEAD_LETTER_SUFFIX =

Suffix appended to a queue name to derive its dead-letter companion (e.g. “pgbus_default” -> “pgbus_default_dlq”). Hard-coded here because changing it on a running deployment would orphan every existing DLQ message; nothing in the codebase or in user reports has ever needed this to be configurable.

"_dlq"
VERSION =
"0.7.9"

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.stoppingObject



27
28
29
# File 'lib/pgbus.rb', line 27

def stopping
  @stopping || false
end

Class Method Details

.clientObject



88
89
90
# File 'lib/pgbus.rb', line 88

def client
  @client ||= Client.new(configuration)
end

.configurationObject



80
81
82
# File 'lib/pgbus.rb', line 80

def configuration
  @configuration ||= Configuration.new
end

.configure {|configuration| ... } ⇒ Object

Yields:



84
85
86
# File 'lib/pgbus.rb', line 84

def configure
  yield configuration
end

.loaderObject



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/pgbus.rb', line 31

def loader
  @loader ||= begin
    loader = Zeitwerk::Loader.for_gem
    loader.inflector.inflect(
      "pgbus" => "Pgbus",
      "cli" => "CLI",
      "dsl" => "DSL",
      "capsule_dsl" => "CapsuleDSL"
    )
    loader.ignore("#{__dir__}/generators")
    loader.ignore("#{__dir__}/active_job")
    loader.ignore("#{__dir__}/pgbus/testing")
    # Vendor integrations are loaded conditionally (when the vendor gem
    # is present) by lib/pgbus/engine.rb. Keeping them out of Zeitwerk
    # means we don't reference vendor constants at autoload time.
    loader.ignore("#{__dir__}/pgbus/integrations")
    # lib/puma/plugin/pgbus_streams.rb is a Puma plugin — it's required
    # explicitly by the user from config/puma.rb via `plugin :pgbus_streams`.
    # Without this ignore, Zeitwerk scans lib/puma/ under the pgbus loader
    # root and tries to autoload Puma::Plugin, which collides with the real
    # Puma::Plugin class defined by the puma gem itself.
    loader.ignore("#{__dir__}/puma")
    loader
  end
end

.loggerObject



142
143
144
# File 'lib/pgbus.rb', line 142

def logger
  configuration.logger
end

.logger=(value) ⇒ Object



146
147
148
# File 'lib/pgbus.rb', line 146

def logger=(value)
  configuration.logger = value
end

.models_loaderObject

Separate loader for app/models used only in non-Rails contexts (specs, standalone scripts). When the Engine boots, Rails’ autoloader takes over app/models and this loader is torn down to avoid conflicts.



60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/pgbus.rb', line 60

def models_loader
  models_dir = File.expand_path("../app/models", __dir__)
  return nil unless File.directory?(models_dir)

  @models_loader ||= begin
    loader = Zeitwerk::Loader.new
    loader.tag = "pgbus-models"
    loader.push_dir(models_dir)
    loader.setup
    loader
  end
end

.reset!Object



127
128
129
130
131
132
# File 'lib/pgbus.rb', line 127

def reset!
  @client&.close
  @client = nil
  @configuration = nil
  @stream_cache = nil
end

.reset_client!Object

Discard the inherited PGMQ client after fork. Do NOT call close — the parent’s @pgmq_mutex is in undefined state post-fork and attempting to acquire it can deadlock. The next call to Pgbus.client will lazily create a fresh one.



138
139
140
# File 'lib/pgbus.rb', line 138

def reset_client!
  @client = nil
end

.stream(streamables) ⇒ Object

Entry point for the streams subsystem — ‘Pgbus.stream(name).broadcast(html)` or `Pgbus.stream(@order).current_msg_id`. Defined on Pgbus itself rather than inside lib/pgbus/streams.rb because that file is only Zeitwerk-loaded when Pgbus::Streams::Stream is first referenced — the chicken-and-egg problem means `Pgbus.stream(…)` would be undefined on the first call. Referencing Streams::Stream inside the method body forces Zeitwerk to load lib/pgbus/streams.rb lazily on first use, which is fine.

Caches Stream instances by logical name so high-frequency callers (e.g. Turbo::StreamsChannel.broadcast_stream_to inside an after_update_commit callback firing 1000x/sec) don’t allocate a new Stream + Mutex per broadcast. The cache is process-local; reset! clears it. The cache key is the resolved name string, not the raw streamables, so ‘Pgbus.stream(@order)` and `Pgbus.stream(@order)` in the same process return the same instance.



107
108
109
110
111
# File 'lib/pgbus.rb', line 107

def stream(streamables)
  name = Streams::Stream.name_from(streamables)
  @stream_cache ||= Concurrent::Map.new
  @stream_cache.compute_if_absent(name) { Streams::Stream.new(streamables) }
end

.stream_key(*parts) ⇒ Object

Compose a short, pgbus-safe stream identifier from any mix of records, strings, symbols, and arrays. Delegates to ‘Pgbus::Streams::Key.stream_key`; raises `ArgumentError` if the resulting key would overflow the pgbus queue-name budget. See `lib/pgbus/streams/key.rb` for the digest policy and rationale.

Pgbus.stream_key(chat, :messages)
# => "ai_chat_3a4f9c21b7d20e18:messages"

Pgbus.stream(Pgbus.stream_key(chat, :messages)).broadcast(html)


123
124
125
# File 'lib/pgbus.rb', line 123

def stream_key(*parts, **)
  Streams::Key.stream_key(*parts, **)
end

.teardown_models_loader!Object



73
74
75
76
77
78
# File 'lib/pgbus.rb', line 73

def teardown_models_loader!
  return unless @models_loader

  @models_loader.unregister
  @models_loader = nil
end