Module: Pgbus
- Defined in:
- lib/pgbus.rb,
lib/pgbus/cli.rb,
lib/pgbus/mcp.rb,
lib/pgbus/batch.rb,
lib/pgbus/event.rb,
lib/pgbus/client.rb,
lib/pgbus/engine.rb,
lib/pgbus/outbox.rb,
lib/pgbus/streams.rb,
lib/pgbus/testing.rb,
lib/pgbus/version.rb,
lib/pgbus/bus_record.rb,
lib/pgbus/mcp/runner.rb,
lib/pgbus/mcp/server.rb,
lib/pgbus/serializer.rb,
lib/pgbus/uniqueness.rb,
lib/pgbus/concurrency.rb,
lib/pgbus/dedup_cache.rb,
lib/pgbus/pgmq_schema.rb,
lib/pgbus/stat_buffer.rb,
lib/pgbus/streams/key.rb,
lib/pgbus/mcp/rack_app.rb,
lib/pgbus/mcp/redactor.rb,
lib/pgbus/rate_counter.rb,
lib/pgbus/web/streamer.rb,
lib/pgbus/config_loader.rb,
lib/pgbus/configuration.rb,
lib/pgbus/log_formatter.rb,
lib/pgbus/mcp/base_tool.rb,
lib/pgbus/outbox/poller.rb,
lib/pgbus/queue_factory.rb,
lib/pgbus/retry_backoff.rb,
lib/pgbus/error_reporter.rb,
lib/pgbus/process/worker.rb,
lib/pgbus/recurring/task.rb,
lib/pgbus/streams/cursor.rb,
lib/pgbus/web/stream_app.rb,
app/models/pgbus/job_lock.rb,
app/models/pgbus/job_stat.rb,
lib/pgbus/circuit_breaker.rb,
lib/pgbus/execution_pools.rb,
lib/pgbus/instrumentation.rb,
lib/pgbus/streams/filters.rb,
lib/pgbus/web/data_source.rb,
app/models/pgbus/semaphore.rb,
lib/pgbus/process/consumer.rb,
lib/pgbus/streams/envelope.rb,
lib/pgbus/streams/presence.rb,
lib/pgbus/streams/renderer.rb,
lib/pgbus/testing/minitest.rb,
lib/pgbus/autovacuum_tuning.rb,
lib/pgbus/client/read_after.rb,
lib/pgbus/event_bus/handler.rb,
lib/pgbus/process/heartbeat.rb,
lib/pgbus/process/lifecycle.rb,
lib/pgbus/streams/coalescer.rb,
lib/pgbus/table_maintenance.rb,
app/models/pgbus/batch_entry.rb,
app/models/pgbus/queue_state.rb,
app/models/pgbus/stream_stat.rb,
lib/pgbus/active_job/adapter.rb,
lib/pgbus/event_bus/registry.rb,
lib/pgbus/mcp/tools/dlq_tool.rb,
lib/pgbus/process/dispatcher.rb,
lib/pgbus/process/queue_lock.rb,
lib/pgbus/process/supervisor.rb,
lib/pgbus/recurring/schedule.rb,
lib/pgbus/streams/streamable.rb,
lib/pgbus/testing/assertions.rb,
lib/pgbus/web/authentication.rb,
app/models/pgbus/outbox_entry.rb,
lib/pgbus/active_job/executor.rb,
lib/pgbus/event_bus/publisher.rb,
lib/pgbus/mcp/health_analyzer.rb,
lib/pgbus/mcp/tools/jobs_tool.rb,
lib/pgbus/process/wake_signal.rb,
lib/pgbus/recurring/scheduler.rb,
lib/pgbus/streams/signed_name.rb,
app/models/pgbus/process_entry.rb,
lib/pgbus/client/notify_stream.rb,
lib/pgbus/event_bus/subscriber.rb,
lib/pgbus/mcp/tools/locks_tool.rb,
lib/pgbus/mcp/tools/stats_tool.rb,
lib/pgbus/queue_name_validator.rb,
app/models/pgbus/recurring_task.rb,
app/models/pgbus/uniqueness_key.rb,
lib/pgbus/concurrency/semaphore.rb,
lib/pgbus/failed_event_recorder.rb,
lib/pgbus/mcp/tools/health_tool.rb,
lib/pgbus/mcp/tools/queues_tool.rb,
lib/pgbus/recurring/command_job.rb,
lib/pgbus/web/streamer/instance.rb,
lib/pgbus/web/streamer/listener.rb,
lib/pgbus/web/streamer/registry.rb,
app/helpers/pgbus/streams_helper.rb,
app/models/pgbus/processed_event.rb,
lib/pgbus/integrations/appsignal.rb,
lib/pgbus/process/signal_handler.rb,
lib/pgbus/web/metrics_serializer.rb,
lib/pgbus/web/streamer/heartbeat.rb,
lib/pgbus/web/streamer/io_writer.rb,
lib/pgbus/process/notify_listener.rb,
lib/pgbus/recurring/config_loader.rb,
lib/pgbus/web/streamer/connection.rb,
app/models/pgbus/blocked_execution.rb,
lib/pgbus/mcp/tools/processes_tool.rb,
lib/pgbus/mcp/tools/recurring_tool.rb,
app/models/pgbus/application_record.rb,
lib/generators/pgbus/migration_path.rb,
lib/pgbus/configuration/capsule_dsl.rb,
lib/pgbus/mcp/tools/dlq_detail_tool.rb,
lib/pgbus/mcp/tools/job_detail_tool.rb,
lib/pgbus/mcp/tools/throughput_tool.rb,
lib/pgbus/process/consumer_priority.rb,
app/helpers/pgbus/application_helper.rb,
app/models/pgbus/recurring_execution.rb,
lib/pgbus/client/ensure_stream_queue.rb,
lib/pgbus/execution_pools/async_pool.rb,
lib/pgbus/recurring/already_recorded.rb,
app/controllers/pgbus/jobs_controller.rb,
lib/generators/pgbus/update_generator.rb,
lib/pgbus/execution_pools/thread_pool.rb,
lib/pgbus/generators/config_converter.rb,
lib/pgbus/mcp/tools/queue_detail_tool.rb,
lib/pgbus/streams/turbo_broadcastable.rb,
lib/pgbus/web/streamer/stream_counter.rb,
app/controllers/pgbus/locks_controller.rb,
lib/generators/pgbus/install_generator.rb,
lib/pgbus/integrations/appsignal/probe.rb,
app/controllers/pgbus/events_controller.rb,
app/controllers/pgbus/locale_controller.rb,
app/controllers/pgbus/outbox_controller.rb,
app/controllers/pgbus/queues_controller.rb,
lib/pgbus/concurrency/blocked_execution.rb,
lib/pgbus/generators/migration_detector.rb,
lib/pgbus/streams/turbo_stream_override.rb,
app/controllers/pgbus/batches_controller.rb,
lib/pgbus/streams/broadcastable_override.rb,
lib/pgbus/web/streamer/falcon_connection.rb,
app/controllers/pgbus/insights_controller.rb,
lib/generators/pgbus/add_outbox_generator.rb,
app/controllers/pgbus/api/stats_controller.rb,
app/controllers/pgbus/dashboard_controller.rb,
app/controllers/pgbus/frontends_controller.rb,
app/controllers/pgbus/processes_controller.rb,
lib/generators/pgbus/add_presence_generator.rb,
lib/generators/pgbus/upgrade_pgmq_generator.rb,
lib/pgbus/integrations/appsignal/subscriber.rb,
app/controllers/pgbus/api/metrics_controller.rb,
app/controllers/pgbus/application_controller.rb,
app/controllers/pgbus/dead_letter_controller.rb,
lib/generators/pgbus/add_job_locks_generator.rb,
lib/generators/pgbus/add_job_stats_generator.rb,
lib/generators/pgbus/add_recurring_generator.rb,
lib/pgbus/streams/watermark_cache_middleware.rb,
app/controllers/pgbus/api/insights_controller.rb,
lib/pgbus/generators/database_target_detector.rb,
lib/generators/pgbus/tune_autovacuum_generator.rb,
lib/generators/pgbus/tune_fillfactor_generator.rb,
lib/pgbus/web/streamer/stream_event_dispatcher.rb,
lib/generators/pgbus/add_queue_states_generator.rb,
lib/generators/pgbus/add_stream_stats_generator.rb,
app/controllers/pgbus/recurring_tasks_controller.rb,
lib/generators/pgbus/migrate_job_locks_generator.rb,
lib/generators/pgbus/add_job_stats_latency_generator.rb,
lib/generators/pgbus/add_failed_events_index_generator.rb,
lib/generators/pgbus/add_job_stats_queue_index_generator.rb
Defined Under Namespace
Modules: ActiveJob, Api, ApplicationHelper, AutovacuumTuning, CLI, Concurrency, ConfigLoader, ErrorReporter, EventBus, ExecutionPools, Generators, Instrumentation, Integrations, LogFormatter, MCP, Outbox, PgmqSchema, Process, QueueFactory, QueueNameValidator, Recurring, RetryBackoff, Serializer, Streams, StreamsHelper, TableMaintenance, Testing, Uniqueness, Web
Classes: ApplicationController, ApplicationRecord, Batch, BatchEntry, BatchesController, BlockedExecution, BusRecord, CircuitBreaker, Client, ConcurrencyLimitExceeded, Configuration, ConfigurationError, DashboardController, DeadLetterController, DeadLetterError, DedupCache, Engine, Error, Event, EventsController, FailedEventRecorder, FrontendsController, InsightsController, JobLock, JobNotUnique, JobStat, JobsController, LocaleController, LocksController, OutboxController, OutboxEntry, ProcessEntry, ProcessedEvent, ProcessesController, QueueNotFoundError, QueueState, QueuesController, RateCounter, ReadTimeoutError, RecurringExecution, RecurringTask, RecurringTasksController, SchemaNotReady, Semaphore, SerializationError, StatBuffer, StreamStat, UniquenessKey
Constant Summary
collapse
- DEAD_LETTER_SUFFIX =
Suffix appended to a queue name to derive its dead-letter companion (e.g. “pgbus_default” -> “pgbus_default_dlq”). Hard-coded here because changing it on a running deployment would orphan every existing DLQ message; nothing in the codebase or in user reports has ever needed this to be configurable.
"_dlq"
- VERSION =
"0.9.5"
Class Attribute Summary collapse
Class Method Summary
collapse
Class Attribute Details
.stopping ⇒ Object
28
29
30
|
# File 'lib/pgbus.rb', line 28
def stopping
@stopping || false
end
|
Class Method Details
.client ⇒ Object
97
98
99
|
# File 'lib/pgbus.rb', line 97
def client
@client ||= Client.new(configuration)
end
|
.configuration ⇒ Object
89
90
91
|
# File 'lib/pgbus.rb', line 89
def configuration
@configuration ||= Configuration.new
end
|
93
94
95
|
# File 'lib/pgbus.rb', line 93
def configure
yield configuration
end
|
.loader ⇒ Object
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/pgbus.rb', line 32
def loader
@loader ||= begin
loader = Zeitwerk::Loader.for_gem
loader.inflector.inflect(
"pgbus" => "Pgbus",
"cli" => "CLI",
"dsl" => "DSL",
"capsule_dsl" => "CapsuleDSL",
"mcp" => "MCP"
)
loader.ignore("#{__dir__}/generators")
loader.ignore("#{__dir__}/active_job")
loader.ignore("#{__dir__}/pgbus/testing")
loader.ignore("#{__dir__}/pgbus/mcp")
loader.ignore("#{__dir__}/pgbus/mcp.rb")
loader.ignore("#{__dir__}/pgbus/integrations")
loader.ignore("#{__dir__}/puma")
loader
end
end
|
.logger ⇒ Object
168
169
170
|
# File 'lib/pgbus.rb', line 168
def logger
configuration.logger
end
|
.logger=(value) ⇒ Object
172
173
174
|
# File 'lib/pgbus.rb', line 172
def logger=(value)
configuration.logger = value
end
|
.models_loader ⇒ Object
Separate loader for app/models used only in non-Rails contexts (specs, standalone scripts). When the Engine boots, Rails’ autoloader takes over app/models and this loader is torn down to avoid conflicts.
69
70
71
72
73
74
75
76
77
78
79
80
|
# File 'lib/pgbus.rb', line 69
def models_loader
models_dir = File.expand_path("../app/models", __dir__)
return nil unless File.directory?(models_dir)
@models_loader ||= begin
loader = Zeitwerk::Loader.new
loader.tag = "pgbus-models"
loader.push_dir(models_dir)
loader.setup
loader
end
end
|
.reset! ⇒ Object
153
154
155
156
157
158
|
# File 'lib/pgbus.rb', line 153
def reset!
@client&.close
@client = nil
@configuration = nil
@stream_cache = nil
end
|
.reset_client! ⇒ Object
Discard the inherited PGMQ client after fork. Do NOT call close — the parent’s @pgmq_mutex is in undefined state post-fork and attempting to acquire it can deadlock. The next call to Pgbus.client will lazily create a fresh one.
164
165
166
|
# File 'lib/pgbus.rb', line 164
def reset_client!
@client = nil
end
|
.stream(streamables, durable: nil) ⇒ Object
Entry point for the streams subsystem — ‘Pgbus.stream(name).broadcast(html)` or `Pgbus.stream(@order).current_msg_id`. Defined on Pgbus itself rather than inside lib/pgbus/streams.rb because that file is only Zeitwerk-loaded when Pgbus::Streams::Stream is first referenced — the chicken-and-egg problem means `Pgbus.stream(…)` would be undefined on the first call. Referencing Streams::Stream inside the method body forces Zeitwerk to load lib/pgbus/streams.rb lazily on first use, which is fine.
Caches Stream instances by logical name so high-frequency callers (e.g. Turbo::StreamsChannel.broadcast_stream_to inside an after_update_commit callback firing 1000x/sec) don’t allocate a new Stream + Mutex per broadcast. The cache is process-local; reset! clears it. The cache key is the resolved name string, not the raw streamables, so ‘Pgbus.stream(@order)` and `Pgbus.stream(@order)` in the same process return the same instance. `durable:` defaults to `nil` so the configuration resolves the mode: `streams_durable_patterns` first (exact string or regex match), then `streams_default_broadcast_mode`. Pass `durable: true`/`false` explicitly to bypass the resolver for this Stream instance.
120
121
122
123
124
125
126
|
# File 'lib/pgbus.rb', line 120
def stream(streamables, durable: nil)
name = Streams::Stream.name_from(streamables)
resolved = durable.nil? ? configuration.stream_durable?(name) : durable
cache_key = "#{name}:#{resolved ? "d" : "e"}"
@stream_cache ||= Concurrent::Map.new
@stream_cache.compute_if_absent(cache_key) { Streams::Stream.new(streamables, durable: resolved) }
end
|
.stream_key(*parts) ⇒ Object
Compose a short, pgbus-safe stream identifier from any mix of records, strings, symbols, and arrays. Delegates to ‘Pgbus::Streams::Key.stream_key`; raises `ArgumentError` if the resulting key would overflow the pgbus queue-name budget. See `lib/pgbus/streams/key.rb` for the digest policy and rationale.
Pgbus.stream_key(chat, :messages)
Pgbus.stream(Pgbus.stream_key(chat, :messages)).broadcast(html)
138
139
140
|
# File 'lib/pgbus.rb', line 138
def stream_key(*parts, **)
Streams::Key.stream_key(*parts, **)
end
|
.stream_key!(key) ⇒ Object
Accepts an already-built stream key verbatim (no re-keying), only enforcing the queue-name budget. Use when you hold a key string and want to pass it to both ‘turbo_stream_from` and a broadcaster without the colon-separator guard raising on the second call.
key = Pgbus.stream_key(chat, :messages)
Pgbus.stream(Pgbus.stream_key!(key)).broadcast(html)
.teardown_models_loader! ⇒ Object
82
83
84
85
86
87
|
# File 'lib/pgbus.rb', line 82
def teardown_models_loader!
return unless @models_loader
@models_loader.unregister
@models_loader = nil
end
|