Module: DurableStreams::Rails::Broadcasts

Included in:
DurableStreams
Defined in:
lib/durable_streams/rails/broadcasts.rb

Overview

Provides the broadcast actions in synchronous and asynchronous form for the DurableStreams module. See DurableStreams::Rails::Broadcastable for the user-facing API that invokes these methods with most of the paperwork filled out already.

All broadcasts produce JSON. Two flavors are supported:

State Protocol events

broadcast_event_to and broadcast_event_later_to produce State Protocol events —the keyed insert/update/delete format consumed by @durable-streams/state and @tanstack/db for reactive collections on the client:

DurableStreams.broadcast_event_to :room, type: "message", key: "5",
  value: { content: "Hello" }, operation: :insert

Arbitrary JSON

broadcast_to sends free-form JSON with no State Protocol structure:

DurableStreams.broadcast_to :room, event: "typing", user_id: 5

Although the underlying Durable Streams server supports any content type (bytes, text, etc.), this gem only creates application/json streams. Non-JSON broadcasts would require extending append_to_stream to accept a configurable content type.

Instance Method Summary collapse

Instance Method Details

#broadcast_event_later_to(*streamables, type:, key:, value:, operation:) ⇒ Object



45
46
47
# File 'lib/durable_streams/rails/broadcasts.rb', line 45

def broadcast_event_later_to(*streamables, type:, key:, value:, operation:)
  build_broadcast_event_to(*streamables, type: type, key: key, value: value, operation: operation)&.enqueue
end

#broadcast_event_to(*streamables, type:, key:, value:, operation:) ⇒ Object

Broadcast a State Protocol event to one or more streamables. Returns the generated txid for optimistic update confirmation on the client, or nil if streamables are blank.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/durable_streams/rails/broadcasts.rb', line 28

def broadcast_event_to(*streamables, type:, key:, value:, operation:)
  streamables.flatten!
  streamables.compact_blank!

  if streamables.present?
    txid = SecureRandom.uuid

    append_to_stream(
      stream_name_from(streamables),
      { type: type, key: key.to_s, value: value,
        headers: { operation: operation, txid: txid } }.to_json
    )

    txid
  end
end

#broadcast_to(*streamables, **payload) ⇒ Object



76
77
78
79
80
81
82
83
# File 'lib/durable_streams/rails/broadcasts.rb', line 76

def broadcast_to(*streamables, **payload)
  streamables.flatten!
  streamables.compact_blank!

  if streamables.present?
    append_to_stream(stream_name_from(streamables), payload.to_json)
  end
end

#build_broadcast_event_to(*streamables, type:, key:, value:, operation:) ⇒ Object

Build the BroadcastJob for a State Protocol event without enqueuing it, so a fan-out to many distinct streams can be enqueued in a single Active Job insert via ActiveJob.perform_all_later — one bulk insert instead of one perform_later per recipient. Returns nil when streamables are blank, so a collection can be filter_mapped straight into perform_all_later:

jobs = residencies.filter_map { |residency|
  DurableStreams.build_broadcast_event_to residency.envoy, :residencies,
    type: "residency", key: residency.sync_id, value: residency.to_h, operation: :update
}
ActiveJob.perform_all_later(jobs)

Turbo has no counterpart: its broadcasts target one stream per call, so it never needs to bulk-enqueue. We fan out per recipient (one stream each), where N separate broadcast_event_later_to calls would be N enqueues. broadcast_event_later_to is defined in terms of this builder (...&.enqueue) so job construction lives in one place.



65
66
67
68
69
70
71
72
73
74
# File 'lib/durable_streams/rails/broadcasts.rb', line 65

def build_broadcast_event_to(*streamables, type:, key:, value:, operation:)
  streamables.flatten!
  streamables.compact_blank!

  if streamables.present?
    DurableStreams::Rails::BroadcastJob.new \
      stream_name_from(streamables),
      type: type, key: key, value: value, operation: operation.to_s
  end
end