Module: DurableStreams

Extended by:
ActiveSupport::Autoload, Rails::Broadcasts, Rails::Feeds, Rails::StreamName, Rails::StreamProvisioner
Defined in:
app/jobs/durable_streams/rails/broadcast_job.rb,
lib/durable_streams-rails.rb,
lib/durable_streams/rails/engine.rb,
lib/durable_streams/rails/version.rb,
lib/durable_streams/rails/server_config.rb,
lib/durable_streams/rails/broadcastable/test_helper.rb,
lib/generators/durable_streams/install/install_generator.rb

Overview

The job that powers all the stream_*_later_to broadcasts available in DurableStreams::Rails::Broadcastable.

Defined Under Namespace

Modules: Generators, Rails

Constant Summary

Constants included from Rails::StreamName

Rails::StreamName::PERMISSIONS

Class Attribute Summary collapse

Class Method Summary collapse

Methods included from Rails::Broadcasts

broadcast_event_later_to, broadcast_event_to, broadcast_to

Class Attribute Details

.signing_keyObject



30
31
32
# File 'lib/durable_streams-rails.rb', line 30

def signing_key
  @signing_key or raise ArgumentError, "DurableStreams requires a signing_key (ES256 private key PEM)"
end

Class Method Details

.signed_stream_primitives(*streamables, permissions: [ :read ], identity: nil, expires_in: DurableStreams.signed_stream_url_expires_in) ⇒ Object

Returns the stream URL and JWT token as separate values.

The URL is the stable endpoint that never changes. The token is a short-lived ES256 JWT that must be refreshed before expiry. Returning them separately lets the frontend pass the token as a function-valued param to the Durable Streams client, enabling automatic token refresh on reconnect.

url, token = DurableStreams.signed_stream_primitives(room, :messages)
# url   => "http://localhost:4437/v1/streams/gid://app/Room/1/messages"
# token => "eyJ..."


61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/durable_streams-rails.rb', line 61

def signed_stream_primitives(*streamables, permissions: [ :read ], identity: nil, expires_in: DurableStreams.signed_stream_url_expires_in)
  ensure_known_permissions!(permissions)

  path = stream_name_from(streamables)
  ensure_stream_exists(path)

  claims = {
    "stream" => path,
    "permissions" => permissions.map(&:to_s),
    "identity" => identity&.to_s
  }.compact

  token = DurableStreams::Rails::Gatekeeper.encode(claims, expires_in: expires_in)
  [ "#{client_url}/#{path}", token ]
end

.signed_stream_url(*streamables, permissions: [ :read ], identity: nil, expires_in: DurableStreams.signed_stream_url_expires_in) ⇒ Object

Generates a signed URL that grants a client read (or read/write) access to a Durable Stream identified by the given streamables. The URL embeds an ES256 JWT with the stream name, permissions, and expiry.

Calls ensure_stream_exists to guarantee the stream has been created on the server (PUT, cached after the first call per stream per process — see StreamProvisioner). No other network requests.

DurableStreams.signed_stream_url(room, :messages)
# => "http://localhost:4437/v1/streams/gid://app/Room/1/messages?token=..."


45
46
47
48
# File 'lib/durable_streams-rails.rb', line 45

def signed_stream_url(*streamables, permissions: [ :read ], identity: nil, expires_in: DurableStreams.signed_stream_url_expires_in)
  url, token = signed_stream_primitives(*streamables, permissions: permissions, identity: identity, expires_in: expires_in)
  "#{url}?token=#{token}"
end

.stream_offset(*streamables) ⇒ Object

Returns the current tail offset of the stream as an opaque string. The tail offset is the position where the next append will land — a client that connects from this offset will receive every event appended after it.

Uses the same *streamables argument convention as signed_stream_url.

Network cost

Calls ensure_stream_exists (PUT, cached after first call per stream per process — see StreamProvisioner), then one HEAD request per call. HEAD is the cheapest HTTP request (no body, headers only). On the internal network between Rails and the stream server, this is sub-millisecond.

The offset must be fresh — caching would risk serving a stale position that skips events. This is the correct price for knowing “where is the stream right now.”

DurableStreams.stream_offset(room, :messages)
# => "0000000001-0000000000"


96
97
98
99
100
# File 'lib/durable_streams-rails.rb', line 96

def stream_offset(*streamables)
  path = stream_name_from(streamables)
  ensure_stream_exists(path)
  feed_head_offset(path)
end