Module: Pgbus::Streams::Key
- Defined in:
- lib/pgbus/streams/key.rb
Overview
Short, pgbus-safe stream identifiers.
PGMQ queue names are bounded by two ceilings: PostgreSQL’s NAMEDATALEN (63 chars for ‘pgmq.q_<name>`) and pgmq-ruby’s own stricter runtime check (‘length >= 48` in `PGMQ::Client#validate_queue_name!`). The effective budget is the lower of the two, exposed as `QueueNameValidator::MAX_QUEUE_NAME_LENGTH` (currently 47). Any stream name composed from UUID primary keys and turbo-rails-style dom ids blows past that budget almost immediately:
"gid://app/Ai::Chat/9c14e8b2-94c3-4c6f-8ca1-f50d2f5e22ca:messages"
# => 63 chars, already too long before the "pgbus_" prefix is added.
‘stream_key` produces a deterministic short form suitable as a pgbus stream identifier. It normalizes each part, joins with “:”, and enforces the queue-name budget (derived from the configured `queue_prefix`) at the call site — raising ArgumentError rather than letting the failure surface as an opaque QueueNameValidator error deep inside `Pgbus.stream(…).broadcast(…)`.
Silent truncation is intentionally NOT supported: trimming a too-long key to fit would reintroduce the collision risk that the 64-bit digest is chosen to eliminate. Callers who overflow should shorten their own identifiers or adopt ‘Pgbus::Streams::Streamable` on their ActiveRecord models.
Usage:
Pgbus.stream_key(chat, :messages)
# => "ai_chat_3a4f9c21b7d20e18:messages"
Pgbus.stream_key([user, :notifications])
# => "user_5fa83c91d44a2701:notifications"
Pgbus.stream(Pgbus.stream_key(chat, :messages)).broadcast("<turbo-stream/>")
Collision horizon: the 64-bit SHA-256 prefix gives a birthday bound of roughly 5 billion records per model class before a 50% chance of collision. For multi-tenant apps where a collision would mean two records share a stream (and receive each other’s broadcasts), this is wide enough in practice. Callers with higher sensitivity can pass ‘digest_bits: 128`.
Constant Summary collapse
- DEFAULT_DIGEST_BITS =
64- MAX_DIGEST_BITS =
64-bit (default) SHA-256 prefix of the record’s primary key. Stdlib only, deterministic, and fixed-length. CRC32’s 32-bit output is intentionally not used here: its ~77k-row birthday bound is too tight for a multi-tenant stream identifier where a collision would route two records’ broadcasts to the same queue. Full output size of the backing digest, in bits. Capping digest_bits here matters because ‘SHA256.hexdigest` only produces 64 hex chars (256 bits) no matter what — slicing `[0, 128]` just returns all 64 chars — so a caller asking for `digest_bits: 512` would silently get the same output as `digest_bits: 256` and walk away believing they’d widened the collision horizon. Raise instead.
::Digest::SHA256.new.digest_length * 8
Class Method Summary collapse
-
.normalize(part, digest_bits: DEFAULT_DIGEST_BITS) ⇒ Object
Normalize a single streamable fragment to a pgbus-safe string.
-
.queue_name_budget ⇒ Object
Budget = effective pgbus queue-name limit - “<queue_prefix>_” length.
-
.short_id(record, digest_bits: DEFAULT_DIGEST_BITS) ⇒ Object
> 256.
-
.stream_key(*parts, digest_bits: DEFAULT_DIGEST_BITS) ⇒ Object
Compose a short pgbus-safe stream name from any mix of records, strings, symbols, and arrays.
-
.stream_key!(key) ⇒ Object
Accepts an already-built stream key verbatim, skipping the per-fragment colon guard (a pre-built key legitimately contains ‘:’ separators).
-
.validate_budget!(key) ⇒ Object
Returns the key when it fits the pgbus queue-name budget; raises ArgumentError with an actionable message otherwise.
Class Method Details
.normalize(part, digest_bits: DEFAULT_DIGEST_BITS) ⇒ Object
Normalize a single streamable fragment to a pgbus-safe string. Mirrors the shape accepted by Turbo::Streams::StreamName and Pgbus::Streams::Stream.name_from so the two code paths agree on the wire format.
-
Strings and symbols pass through verbatim.
-
ActiveRecord models become “<param_key>_<short_id>”.
-
Anything else responding to ‘to_gid_param` / `to_param` falls back to that; a UUID primary key would still overflow, which is why AR models are hashed above.
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/pgbus/streams/key.rb', line 161 def normalize(part, digest_bits: DEFAULT_DIGEST_BITS) case part when String, Symbol part.to_s else if defined?(::ActiveRecord::Base) && part.is_a?(::ActiveRecord::Base) "#{part.class.model_name.param_key}_#{short_id(part, digest_bits: digest_bits)}" elsif part.respond_to?(:to_stream_key) part.to_stream_key elsif part.respond_to?(:to_gid_param) part.to_gid_param elsif part.respond_to?(:to_param) part.to_param else part.to_s end end end |
.queue_name_budget ⇒ Object
Budget = effective pgbus queue-name limit - “<queue_prefix>_” length. Computed at call time (not a constant) so apps that override ‘config.queue_prefix` get the correct budget automatically.
184 185 186 187 |
# File 'lib/pgbus/streams/key.rb', line 184 def queue_name_budget QueueNameValidator::MAX_QUEUE_NAME_LENGTH - Pgbus.configuration.queue_prefix.length - 1 end |
.short_id(record, digest_bits: DEFAULT_DIGEST_BITS) ⇒ Object
> 256
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/pgbus/streams/key.rb', line 127 def short_id(record, digest_bits: DEFAULT_DIGEST_BITS) unless digest_bits.is_a?(Integer) && digest_bits.positive? && (digest_bits % 4).zero? && digest_bits <= MAX_DIGEST_BITS raise ArgumentError, "digest_bits must be a positive multiple of 4 and <= #{MAX_DIGEST_BITS} " \ "(SHA-256 produces #{MAX_DIGEST_BITS} bits; asking for more would silently truncate)" end # Unpersisted records all share id=nil, which hashes to a single # constant digest and would collapse every new instance of the # same class into one stream. Fail loud at the first unsaved # call site — the whole point of the 64-bit digest is to # eliminate collisions, so silently producing a shared key here # would reintroduce exactly what it was chosen to prevent. if record.id.nil? raise ArgumentError, "#{record.class.name} must be persisted before generating a stream key " \ "(record.id is nil — all unsaved records would collide on one stream)" end hex_chars = digest_bits / 4 ::Digest::SHA256.hexdigest(record.id.to_s)[0, hex_chars] end |
.stream_key(*parts, digest_bits: DEFAULT_DIGEST_BITS) ⇒ Object
Compose a short pgbus-safe stream name from any mix of records, strings, symbols, and arrays. Returns the joined key when it fits the pgbus queue-name budget; raises ArgumentError otherwise.
Fragments must not contain ‘:` — it’s the join separator, so ‘stream_key(“a:b”, :c)` and `stream_key(“a”, “b:c”)` would both produce `“a:b:c”` and collapse two logically distinct streams onto one queue. Colons inside fragments (typically from a `to_stream_key`/`to_gid_param` implementation that forgot to sanitize) raise an ArgumentError at the call site.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/pgbus/streams/key.rb', line 65 def stream_key(*parts, digest_bits: DEFAULT_DIGEST_BITS) flattened = Array(parts).flatten # Idempotency for an already-built key: a single String argument # is treated as a pre-built pgbus stream key and returned # unchanged (after the budget check). This lets a consumer hold # one `stream_key` value and pass it to both `turbo_stream_from` # and the broadcaster without the colon separator guard raising # on the second call. The guard only protects against ambiguous # *joins* (`stream_key('a:b', :c)` vs `stream_key('a', 'b:c')`), # and there is no second fragment here to collapse against, so the # hazard cannot arise. Symbols and records are NOT keys — a colon # in those never came from `stream_key` and stays a mistake. return stream_key!(flattened.first) if flattened.length == 1 && flattened.first.is_a?(String) fragments = flattened.map { |part| normalize(part, digest_bits: digest_bits) } fragments.each { |fragment| reject_colons!(fragment) } validate_budget!(fragments.join(":")) end |
.stream_key!(key) ⇒ Object
Accepts an already-built stream key verbatim, skipping the per-fragment colon guard (a pre-built key legitimately contains ‘:’ separators). Still enforces the queue-name budget so an oversized key fails at the call site rather than deep inside Client#ensure_stream_queue. Use this when you hold a key string and want to be explicit that no re-keying should happen — e.g. passing the same value to ‘turbo_stream_from` and a broadcaster.
92 93 94 95 96 |
# File 'lib/pgbus/streams/key.rb', line 92 def stream_key!(key) raise ArgumentError, "stream_key! key must be a String, got #{key.class}" unless key.is_a?(String) validate_budget!(key) end |
.validate_budget!(key) ⇒ Object
Returns the key when it fits the pgbus queue-name budget; raises ArgumentError with an actionable message otherwise. Shared by ‘stream_key` and `stream_key!` so both paths fail identically.
101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/pgbus/streams/key.rb', line 101 def validate_budget!(key) budget = queue_name_budget return key if key.length <= budget raise ArgumentError, "stream_key #{key.inspect} is #{key.length} chars, " \ "exceeds pgbus budget of #{budget} " \ "(queue_prefix=#{Pgbus.configuration.queue_prefix.inspect}, " \ "pgbus_max_queue_name_length=#{QueueNameValidator::MAX_QUEUE_NAME_LENGTH}). " \ "Shorten the streamables or use Pgbus::Streams::Streamable on the model." end |