Module: Pgbus::Streams::Key
- Defined in:
- lib/pgbus/streams/key.rb
Overview
Short, pgbus-safe stream identifiers.
PGMQ queue names are bounded by two ceilings: PostgreSQL’s NAMEDATALEN (63 chars for ‘pgmq.q_<name>`) and pgmq-ruby’s own stricter runtime check (‘length >= 48` in `PGMQ::Client#validate_queue_name!`). The effective budget is the lower of the two, exposed as `QueueNameValidator::MAX_QUEUE_NAME_LENGTH` (currently 47). Any stream name composed from UUID primary keys and turbo-rails-style dom ids blows past that budget almost immediately:
"gid://app/Ai::Chat/9c14e8b2-94c3-4c6f-8ca1-f50d2f5e22ca:messages"
# => 63 chars, already too long before the "pgbus_" prefix is added.
‘stream_key` produces a deterministic short form suitable as a pgbus stream identifier. It normalizes each part, joins with “:”, and enforces the queue-name budget (derived from the configured `queue_prefix`) at the call site — raising ArgumentError rather than letting the failure surface as an opaque QueueNameValidator error deep inside `Pgbus.stream(…).broadcast(…)`.
Silent truncation is intentionally NOT supported: trimming a too-long key to fit would reintroduce the collision risk that the 64-bit digest is chosen to eliminate. Callers who overflow should shorten their own identifiers or adopt ‘Pgbus::Streams::Streamable` on their ActiveRecord models.
Usage:
Pgbus.stream_key(chat, :messages)
# => "ai_chat_3a4f9c21b7d20e18:messages"
Pgbus.stream_key([user, :notifications])
# => "user_5fa83c91d44a2701:notifications"
Pgbus.stream(Pgbus.stream_key(chat, :messages)).broadcast("<turbo-stream/>")
Collision horizon: the 64-bit SHA-256 prefix gives a birthday bound of roughly 5 billion records per model class before a 50% chance of collision. For multi-tenant apps where a collision would mean two records share a stream (and receive each other’s broadcasts), this is wide enough in practice. Callers with higher sensitivity can pass ‘digest_bits: 128`.
Constant Summary collapse
- DEFAULT_DIGEST_BITS =
64- MAX_DIGEST_BITS =
64-bit (default) SHA-256 prefix of the record’s primary key. Stdlib only, deterministic, and fixed-length. CRC32’s 32-bit output is intentionally not used here: its ~77k-row birthday bound is too tight for a multi-tenant stream identifier where a collision would route two records’ broadcasts to the same queue. Full output size of the backing digest, in bits. Capping digest_bits here matters because ‘SHA256.hexdigest` only produces 64 hex chars (256 bits) no matter what — slicing `[0, 128]` just returns all 64 chars — so a caller asking for `digest_bits: 512` would silently get the same output as `digest_bits: 256` and walk away believing they’d widened the collision horizon. Raise instead.
::Digest::SHA256.new.digest_length * 8
Class Method Summary collapse
-
.normalize(part, digest_bits: DEFAULT_DIGEST_BITS) ⇒ Object
Normalize a single streamable fragment to a pgbus-safe string.
-
.queue_name_budget ⇒ Object
Budget = effective pgbus queue-name limit - “<queue_prefix>_” length.
-
.short_id(record, digest_bits: DEFAULT_DIGEST_BITS) ⇒ Object
> 256.
-
.stream_key(*parts, digest_bits: DEFAULT_DIGEST_BITS) ⇒ Object
Compose a short pgbus-safe stream name from any mix of records, strings, symbols, and arrays.
Class Method Details
.normalize(part, digest_bits: DEFAULT_DIGEST_BITS) ⇒ Object
Normalize a single streamable fragment to a pgbus-safe string. Mirrors the shape accepted by Turbo::Streams::StreamName and Pgbus::Streams::Stream.name_from so the two code paths agree on the wire format.
-
Strings and symbols pass through verbatim.
-
ActiveRecord models become “<param_key>_<short_id>”.
-
Anything else responding to ‘to_gid_param` / `to_param` falls back to that; a UUID primary key would still overflow, which is why AR models are hashed above.
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/pgbus/streams/key.rb', line 128 def normalize(part, digest_bits: DEFAULT_DIGEST_BITS) case part when String, Symbol part.to_s else if defined?(::ActiveRecord::Base) && part.is_a?(::ActiveRecord::Base) "#{part.class.model_name.param_key}_#{short_id(part, digest_bits: digest_bits)}" elsif part.respond_to?(:to_stream_key) part.to_stream_key elsif part.respond_to?(:to_gid_param) part.to_gid_param elsif part.respond_to?(:to_param) part.to_param else part.to_s end end end |
.queue_name_budget ⇒ Object
Budget = effective pgbus queue-name limit - “<queue_prefix>_” length. Computed at call time (not a constant) so apps that override ‘config.queue_prefix` get the correct budget automatically.
151 152 153 154 |
# File 'lib/pgbus/streams/key.rb', line 151 def queue_name_budget QueueNameValidator::MAX_QUEUE_NAME_LENGTH - Pgbus.configuration.queue_prefix.length - 1 end |
.short_id(record, digest_bits: DEFAULT_DIGEST_BITS) ⇒ Object
> 256
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/pgbus/streams/key.rb', line 94 def short_id(record, digest_bits: DEFAULT_DIGEST_BITS) unless digest_bits.is_a?(Integer) && digest_bits.positive? && (digest_bits % 4).zero? && digest_bits <= MAX_DIGEST_BITS raise ArgumentError, "digest_bits must be a positive multiple of 4 and <= #{MAX_DIGEST_BITS} " \ "(SHA-256 produces #{MAX_DIGEST_BITS} bits; asking for more would silently truncate)" end # Unpersisted records all share id=nil, which hashes to a single # constant digest and would collapse every new instance of the # same class into one stream. Fail loud at the first unsaved # call site — the whole point of the 64-bit digest is to # eliminate collisions, so silently producing a shared key here # would reintroduce exactly what it was chosen to prevent. if record.id.nil? raise ArgumentError, "#{record.class.name} must be persisted before generating a stream key " \ "(record.id is nil — all unsaved records would collide on one stream)" end hex_chars = digest_bits / 4 ::Digest::SHA256.hexdigest(record.id.to_s)[0, hex_chars] end |
.stream_key(*parts, digest_bits: DEFAULT_DIGEST_BITS) ⇒ Object
Compose a short pgbus-safe stream name from any mix of records, strings, symbols, and arrays. Returns the joined key when it fits the pgbus queue-name budget; raises ArgumentError otherwise.
Fragments must not contain ‘:` — it’s the join separator, so ‘stream_key(“a:b”, :c)` and `stream_key(“a”, “b:c”)` would both produce `“a:b:c”` and collapse two logically distinct streams onto one queue. Colons inside fragments (typically from a `to_stream_key`/`to_gid_param` implementation that forgot to sanitize) raise an ArgumentError at the call site.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/pgbus/streams/key.rb', line 65 def stream_key(*parts, digest_bits: DEFAULT_DIGEST_BITS) fragments = Array(parts).flatten.map { |part| normalize(part, digest_bits: digest_bits) } fragments.each { |fragment| reject_colons!(fragment) } key = fragments.join(":") budget = queue_name_budget return key if key.length <= budget raise ArgumentError, "stream_key #{key.inspect} is #{key.length} chars, " \ "exceeds pgbus budget of #{budget} " \ "(queue_prefix=#{Pgbus.configuration.queue_prefix.inspect}, " \ "pgbus_max_queue_name_length=#{QueueNameValidator::MAX_QUEUE_NAME_LENGTH}). " \ "Shorten the streamables or use Pgbus::Streams::Streamable on the model." end |