Module: Pgbus
- Defined in:
- lib/pgbus.rb,
lib/pgbus/cli.rb,
lib/pgbus/batch.rb,
lib/pgbus/event.rb,
lib/pgbus/client.rb,
lib/pgbus/engine.rb,
lib/pgbus/outbox.rb,
lib/pgbus/streams.rb,
lib/pgbus/testing.rb,
lib/pgbus/version.rb,
lib/pgbus/bus_record.rb,
lib/pgbus/serializer.rb,
lib/pgbus/uniqueness.rb,
lib/pgbus/concurrency.rb,
lib/pgbus/dedup_cache.rb,
lib/pgbus/pgmq_schema.rb,
lib/pgbus/stat_buffer.rb,
lib/pgbus/streams/key.rb,
lib/pgbus/rate_counter.rb,
lib/pgbus/web/streamer.rb,
lib/pgbus/config_loader.rb,
lib/pgbus/configuration.rb,
lib/pgbus/log_formatter.rb,
lib/pgbus/outbox/poller.rb,
lib/pgbus/queue_factory.rb,
lib/pgbus/retry_backoff.rb,
lib/pgbus/error_reporter.rb,
lib/pgbus/process/worker.rb,
lib/pgbus/recurring/task.rb,
lib/pgbus/streams/cursor.rb,
lib/pgbus/web/stream_app.rb,
app/models/pgbus/job_lock.rb,
app/models/pgbus/job_stat.rb,
lib/pgbus/circuit_breaker.rb,
lib/pgbus/execution_pools.rb,
lib/pgbus/instrumentation.rb,
lib/pgbus/streams/filters.rb,
lib/pgbus/web/data_source.rb,
app/models/pgbus/semaphore.rb,
lib/pgbus/process/consumer.rb,
lib/pgbus/streams/envelope.rb,
lib/pgbus/streams/presence.rb,
lib/pgbus/testing/minitest.rb,
lib/pgbus/autovacuum_tuning.rb,
lib/pgbus/client/read_after.rb,
lib/pgbus/event_bus/handler.rb,
lib/pgbus/process/heartbeat.rb,
lib/pgbus/process/lifecycle.rb,
app/models/pgbus/batch_entry.rb,
app/models/pgbus/queue_state.rb,
app/models/pgbus/stream_stat.rb,
lib/pgbus/active_job/adapter.rb,
lib/pgbus/event_bus/registry.rb,
lib/pgbus/process/dispatcher.rb,
lib/pgbus/process/queue_lock.rb,
lib/pgbus/process/supervisor.rb,
lib/pgbus/recurring/schedule.rb,
lib/pgbus/streams/streamable.rb,
lib/pgbus/testing/assertions.rb,
lib/pgbus/web/authentication.rb,
app/models/pgbus/outbox_entry.rb,
lib/pgbus/active_job/executor.rb,
lib/pgbus/event_bus/publisher.rb,
lib/pgbus/process/wake_signal.rb,
lib/pgbus/recurring/scheduler.rb,
lib/pgbus/streams/signed_name.rb,
app/models/pgbus/process_entry.rb,
lib/pgbus/event_bus/subscriber.rb,
lib/pgbus/queue_name_validator.rb,
app/models/pgbus/recurring_task.rb,
app/models/pgbus/uniqueness_key.rb,
lib/pgbus/concurrency/semaphore.rb,
lib/pgbus/failed_event_recorder.rb,
lib/pgbus/recurring/command_job.rb,
lib/pgbus/web/streamer/instance.rb,
lib/pgbus/web/streamer/listener.rb,
lib/pgbus/web/streamer/registry.rb,
app/helpers/pgbus/streams_helper.rb,
app/models/pgbus/processed_event.rb,
lib/pgbus/process/signal_handler.rb,
lib/pgbus/web/metrics_serializer.rb,
lib/pgbus/web/streamer/heartbeat.rb,
lib/pgbus/web/streamer/io_writer.rb,
lib/pgbus/recurring/config_loader.rb,
lib/pgbus/web/streamer/connection.rb,
app/models/pgbus/blocked_execution.rb,
app/models/pgbus/application_record.rb,
lib/generators/pgbus/migration_path.rb,
lib/pgbus/configuration/capsule_dsl.rb,
lib/pgbus/process/consumer_priority.rb,
app/helpers/pgbus/application_helper.rb,
app/models/pgbus/recurring_execution.rb,
lib/pgbus/client/ensure_stream_queue.rb,
lib/pgbus/execution_pools/async_pool.rb,
lib/pgbus/recurring/already_recorded.rb,
app/controllers/pgbus/jobs_controller.rb,
lib/generators/pgbus/update_generator.rb,
lib/pgbus/execution_pools/thread_pool.rb,
lib/pgbus/generators/config_converter.rb,
lib/pgbus/streams/turbo_broadcastable.rb,
app/controllers/pgbus/locks_controller.rb,
lib/generators/pgbus/install_generator.rb,
app/controllers/pgbus/events_controller.rb,
app/controllers/pgbus/locale_controller.rb,
app/controllers/pgbus/outbox_controller.rb,
app/controllers/pgbus/queues_controller.rb,
lib/pgbus/concurrency/blocked_execution.rb,
lib/pgbus/generators/migration_detector.rb,
lib/pgbus/streams/turbo_stream_override.rb,
lib/pgbus/web/streamer/falcon_connection.rb,
app/controllers/pgbus/insights_controller.rb,
lib/generators/pgbus/add_outbox_generator.rb,
app/controllers/pgbus/api/stats_controller.rb,
app/controllers/pgbus/dashboard_controller.rb,
app/controllers/pgbus/frontends_controller.rb,
app/controllers/pgbus/processes_controller.rb,
lib/generators/pgbus/add_presence_generator.rb,
lib/generators/pgbus/upgrade_pgmq_generator.rb,
app/controllers/pgbus/api/metrics_controller.rb,
app/controllers/pgbus/application_controller.rb,
app/controllers/pgbus/dead_letter_controller.rb,
lib/generators/pgbus/add_job_locks_generator.rb,
lib/generators/pgbus/add_job_stats_generator.rb,
lib/generators/pgbus/add_recurring_generator.rb,
lib/pgbus/streams/watermark_cache_middleware.rb,
app/controllers/pgbus/api/insights_controller.rb,
lib/pgbus/generators/database_target_detector.rb,
lib/generators/pgbus/tune_autovacuum_generator.rb,
lib/pgbus/web/streamer/stream_event_dispatcher.rb,
lib/generators/pgbus/add_queue_states_generator.rb,
lib/generators/pgbus/add_stream_stats_generator.rb,
app/controllers/pgbus/recurring_tasks_controller.rb,
lib/generators/pgbus/migrate_job_locks_generator.rb,
lib/generators/pgbus/add_job_stats_latency_generator.rb,
lib/generators/pgbus/add_failed_events_index_generator.rb,
lib/generators/pgbus/add_job_stats_queue_index_generator.rb
Defined Under Namespace
Modules: ActiveJob, Api, ApplicationHelper, AutovacuumTuning, CLI, Concurrency, ConfigLoader, ErrorReporter, EventBus, ExecutionPools, Generators, Instrumentation, LogFormatter, Outbox, PgmqSchema, Process, QueueFactory, QueueNameValidator, Recurring, RetryBackoff, Serializer, Streams, StreamsHelper, Testing, Uniqueness, Web
Classes: ApplicationController, ApplicationRecord, Batch, BatchEntry, BlockedExecution, BusRecord, CircuitBreaker, Client, ConcurrencyLimitExceeded, Configuration, ConfigurationError, DashboardController, DeadLetterController, DeadLetterError, DedupCache, Engine, Error, Event, EventsController, FailedEventRecorder, FrontendsController, InsightsController, JobLock, JobNotUnique, JobStat, JobsController, LocaleController, LocksController, OutboxController, OutboxEntry, ProcessEntry, ProcessedEvent, ProcessesController, QueueNotFoundError, QueueState, QueuesController, RateCounter, RecurringExecution, RecurringTask, RecurringTasksController, SchemaNotReady, Semaphore, SerializationError, StatBuffer, StreamStat, UniquenessKey
Constant Summary
collapse
- DEAD_LETTER_SUFFIX =
Suffix appended to a queue name to derive its dead-letter companion (e.g. “pgbus_default” -> “pgbus_default_dlq”). Hard-coded here because changing it on a running deployment would orphan every existing DLQ message; nothing in the codebase or in user reports has ever needed this to be configurable.
"_dlq"
- VERSION =
"0.7.4"
Class Attribute Summary collapse
Class Method Summary
collapse
Class Attribute Details
.stopping ⇒ Object
27
28
29
|
# File 'lib/pgbus.rb', line 27
def stopping
@stopping || false
end
|
Class Method Details
.client ⇒ Object
84
85
86
|
# File 'lib/pgbus.rb', line 84
def client
@client ||= Client.new(configuration)
end
|
.configuration ⇒ Object
76
77
78
|
# File 'lib/pgbus.rb', line 76
def configuration
@configuration ||= Configuration.new
end
|
80
81
82
|
# File 'lib/pgbus.rb', line 80
def configure
yield configuration
end
|
.loader ⇒ Object
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
# File 'lib/pgbus.rb', line 31
def loader
@loader ||= begin
loader = Zeitwerk::Loader.for_gem
loader.inflector.inflect(
"pgbus" => "Pgbus",
"cli" => "CLI",
"dsl" => "DSL",
"capsule_dsl" => "CapsuleDSL"
)
loader.ignore("#{__dir__}/generators")
loader.ignore("#{__dir__}/active_job")
loader.ignore("#{__dir__}/pgbus/testing")
loader.ignore("#{__dir__}/puma")
loader
end
end
|
.logger ⇒ Object
138
139
140
|
# File 'lib/pgbus.rb', line 138
def logger
configuration.logger
end
|
.logger=(value) ⇒ Object
142
143
144
|
# File 'lib/pgbus.rb', line 142
def logger=(value)
configuration.logger = value
end
|
.models_loader ⇒ Object
Separate loader for app/models used only in non-Rails contexts (specs, standalone scripts). When the Engine boots, Rails’ autoloader takes over app/models and this loader is torn down to avoid conflicts.
56
57
58
59
60
61
62
63
64
65
66
67
|
# File 'lib/pgbus.rb', line 56
def models_loader
models_dir = File.expand_path("../app/models", __dir__)
return nil unless File.directory?(models_dir)
@models_loader ||= begin
loader = Zeitwerk::Loader.new
loader.tag = "pgbus-models"
loader.push_dir(models_dir)
loader.setup
loader
end
end
|
.reset! ⇒ Object
123
124
125
126
127
128
|
# File 'lib/pgbus.rb', line 123
def reset!
@client&.close
@client = nil
@configuration = nil
@stream_cache = nil
end
|
.reset_client! ⇒ Object
Discard the inherited PGMQ client after fork. Do NOT call close — the parent’s @pgmq_mutex is in undefined state post-fork and attempting to acquire it can deadlock. The next call to Pgbus.client will lazily create a fresh one.
134
135
136
|
# File 'lib/pgbus.rb', line 134
def reset_client!
@client = nil
end
|
.stream(streamables) ⇒ Object
Entry point for the streams subsystem — ‘Pgbus.stream(name).broadcast(html)` or `Pgbus.stream(@order).current_msg_id`. Defined on Pgbus itself rather than inside lib/pgbus/streams.rb because that file is only Zeitwerk-loaded when Pgbus::Streams::Stream is first referenced — the chicken-and-egg problem means `Pgbus.stream(…)` would be undefined on the first call. Referencing Streams::Stream inside the method body forces Zeitwerk to load lib/pgbus/streams.rb lazily on first use, which is fine.
Caches Stream instances by logical name so high-frequency callers (e.g. Turbo::StreamsChannel.broadcast_stream_to inside an after_update_commit callback firing 1000x/sec) don’t allocate a new Stream + Mutex per broadcast. The cache is process-local; reset! clears it. The cache key is the resolved name string, not the raw streamables, so ‘Pgbus.stream(@order)` and `Pgbus.stream(@order)` in the same process return the same instance.
103
104
105
106
107
|
# File 'lib/pgbus.rb', line 103
def stream(streamables)
name = Streams::Stream.name_from(streamables)
@stream_cache ||= Concurrent::Map.new
@stream_cache.compute_if_absent(name) { Streams::Stream.new(streamables) }
end
|
.stream_key(*parts) ⇒ Object
Compose a short, pgbus-safe stream identifier from any mix of records, strings, symbols, and arrays. Delegates to ‘Pgbus::Streams::Key.stream_key`; raises `ArgumentError` if the resulting key would overflow the pgbus queue-name budget. See `lib/pgbus/streams/key.rb` for the digest policy and rationale.
Pgbus.stream_key(chat, :messages)
Pgbus.stream(Pgbus.stream_key(chat, :messages)).broadcast(html)
119
120
121
|
# File 'lib/pgbus.rb', line 119
def stream_key(*parts, **)
Streams::Key.stream_key(*parts, **)
end
|
.teardown_models_loader! ⇒ Object
69
70
71
72
73
74
|
# File 'lib/pgbus.rb', line 69
def teardown_models_loader!
return unless @models_loader
@models_loader.unregister
@models_loader = nil
end
|