Module: Pgbus::Streams::Key

Defined in:
lib/pgbus/streams/key.rb

Overview

Short, pgbus-safe stream identifiers.

PGMQ queue names are bounded by two ceilings: PostgreSQL’s NAMEDATALEN (63 chars for ‘pgmq.q_<name>`) and pgmq-ruby’s own stricter runtime check (‘length >= 48` in `PGMQ::Client#validate_queue_name!`). The effective budget is the lower of the two, exposed as `QueueNameValidator::MAX_QUEUE_NAME_LENGTH` (currently 47). Any stream name composed from UUID primary keys and turbo-rails-style dom ids blows past that budget almost immediately:

"gid://app/Ai::Chat/9c14e8b2-94c3-4c6f-8ca1-f50d2f5e22ca:messages"
# => 63 chars, already too long before the "pgbus_" prefix is added.

‘stream_key` produces a deterministic short form suitable as a pgbus stream identifier. It normalizes each part, joins with “:”, and enforces the queue-name budget (derived from the configured `queue_prefix`) at the call site — raising ArgumentError rather than letting the failure surface as an opaque QueueNameValidator error deep inside `Pgbus.stream(…).broadcast(…)`.

Silent truncation is intentionally NOT supported: trimming a too-long key to fit would reintroduce the collision risk that the 64-bit digest is chosen to eliminate. Callers who overflow should shorten their own identifiers or adopt ‘Pgbus::Streams::Streamable` on their ActiveRecord models.

Usage:

Pgbus.stream_key(chat, :messages)
  # => "ai_chat_3a4f9c21b7d20e18:messages"

Pgbus.stream_key([user, :notifications])
  # => "user_5fa83c91d44a2701:notifications"

Pgbus.stream(Pgbus.stream_key(chat, :messages)).broadcast("<turbo-stream/>")

Collision horizon: the 64-bit SHA-256 prefix gives a birthday bound of roughly 5 billion records per model class before a 50% chance of collision. For multi-tenant apps where a collision would mean two records share a stream (and receive each other’s broadcasts), this is wide enough in practice. Callers with higher sensitivity can pass ‘digest_bits: 128`.

Constant Summary collapse

DEFAULT_DIGEST_BITS =
64
MAX_DIGEST_BITS =

64-bit (default) SHA-256 prefix of the record’s primary key. Stdlib only, deterministic, and fixed-length. CRC32’s 32-bit output is intentionally not used here: its ~77k-row birthday bound is too tight for a multi-tenant stream identifier where a collision would route two records’ broadcasts to the same queue. Full output size of the backing digest, in bits. Capping digest_bits here matters because ‘SHA256.hexdigest` only produces 64 hex chars (256 bits) no matter what — slicing `[0, 128]` just returns all 64 chars — so a caller asking for `digest_bits: 512` would silently get the same output as `digest_bits: 256` and walk away believing they’d widened the collision horizon. Raise instead.

::Digest::SHA256.new.digest_length * 8

Class Method Summary collapse

Class Method Details

.normalize(part, digest_bits: DEFAULT_DIGEST_BITS) ⇒ Object

Normalize a single streamable fragment to a pgbus-safe string. Mirrors the shape accepted by Turbo::Streams::StreamName and Pgbus::Streams::Stream.name_from so the two code paths agree on the wire format.

  • Strings and symbols pass through verbatim.

  • ActiveRecord models become “<param_key>_<short_id>”.

  • Anything else responding to ‘to_gid_param` / `to_param` falls back to that; a UUID primary key would still overflow, which is why AR models are hashed above.



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/pgbus/streams/key.rb', line 128

def normalize(part, digest_bits: DEFAULT_DIGEST_BITS)
  case part
  when String, Symbol
    part.to_s
  else
    if defined?(::ActiveRecord::Base) && part.is_a?(::ActiveRecord::Base)
      "#{part.class.model_name.param_key}_#{short_id(part, digest_bits: digest_bits)}"
    elsif part.respond_to?(:to_stream_key)
      part.to_stream_key
    elsif part.respond_to?(:to_gid_param)
      part.to_gid_param
    elsif part.respond_to?(:to_param)
      part.to_param
    else
      part.to_s
    end
  end
end

.queue_name_budgetObject

Budget = effective pgbus queue-name limit - “<queue_prefix>_” length. Computed at call time (not a constant) so apps that override ‘config.queue_prefix` get the correct budget automatically.



151
152
153
154
# File 'lib/pgbus/streams/key.rb', line 151

def queue_name_budget
  QueueNameValidator::MAX_QUEUE_NAME_LENGTH -
    Pgbus.configuration.queue_prefix.length - 1
end

.short_id(record, digest_bits: DEFAULT_DIGEST_BITS) ⇒ Object

> 256



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/pgbus/streams/key.rb', line 94

def short_id(record, digest_bits: DEFAULT_DIGEST_BITS)
  unless digest_bits.is_a?(Integer) && digest_bits.positive? &&
         (digest_bits % 4).zero? && digest_bits <= MAX_DIGEST_BITS
    raise ArgumentError,
          "digest_bits must be a positive multiple of 4 and <= #{MAX_DIGEST_BITS} " \
          "(SHA-256 produces #{MAX_DIGEST_BITS} bits; asking for more would silently truncate)"
  end

  # Unpersisted records all share id=nil, which hashes to a single
  # constant digest and would collapse every new instance of the
  # same class into one stream. Fail loud at the first unsaved
  # call site — the whole point of the 64-bit digest is to
  # eliminate collisions, so silently producing a shared key here
  # would reintroduce exactly what it was chosen to prevent.
  if record.id.nil?
    raise ArgumentError,
          "#{record.class.name} must be persisted before generating a stream key " \
          "(record.id is nil — all unsaved records would collide on one stream)"
  end

  hex_chars = digest_bits / 4
  ::Digest::SHA256.hexdigest(record.id.to_s)[0, hex_chars]
end

.stream_key(*parts, digest_bits: DEFAULT_DIGEST_BITS) ⇒ Object

Compose a short pgbus-safe stream name from any mix of records, strings, symbols, and arrays. Returns the joined key when it fits the pgbus queue-name budget; raises ArgumentError otherwise.

Fragments must not contain ‘:` — it’s the join separator, so ‘stream_key(“a:b”, :c)` and `stream_key(“a”, “b:c”)` would both produce `“a:b:c”` and collapse two logically distinct streams onto one queue. Colons inside fragments (typically from a `to_stream_key`/`to_gid_param` implementation that forgot to sanitize) raise an ArgumentError at the call site.

Raises:

  • (ArgumentError)


65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/pgbus/streams/key.rb', line 65

def stream_key(*parts, digest_bits: DEFAULT_DIGEST_BITS)
  fragments = Array(parts).flatten.map { |part| normalize(part, digest_bits: digest_bits) }
  fragments.each { |fragment| reject_colons!(fragment) }
  key = fragments.join(":")
  budget = queue_name_budget
  return key if key.length <= budget

  raise ArgumentError,
        "stream_key #{key.inspect} is #{key.length} chars, " \
        "exceeds pgbus budget of #{budget} " \
        "(queue_prefix=#{Pgbus.configuration.queue_prefix.inspect}, " \
        "pgbus_max_queue_name_length=#{QueueNameValidator::MAX_QUEUE_NAME_LENGTH}). " \
        "Shorten the streamables or use Pgbus::Streams::Streamable on the model."
end