Module: DurableStreams::Rails::Broadcasts

Included in:
DurableStreams
Defined in:
lib/durable_streams/rails/broadcasts.rb

Overview

Provides the broadcast actions in synchronous and asynchronous form for the DurableStreams module. See DurableStreams::Rails::Broadcastable for the user-facing API that invokes these methods with most of the paperwork filled out already.

All broadcasts produce JSON. Two flavors are supported:

State Protocol events

broadcast_event_to and broadcast_event_later_to produce State Protocol events —the keyed insert/update/delete format consumed by @durable-streams/state and @tanstack/db for reactive collections on the client:

DurableStreams.broadcast_event_to :room, type: "message", key: "5",
  value: { content: "Hello" }, operation: :insert

Arbitrary JSON

broadcast_to sends free-form JSON with no State Protocol structure:

DurableStreams.broadcast_to :room, event: "typing", user_id: 5

Although the underlying Durable Streams server supports any content type (bytes, text, etc.), this gem only creates application/json streams. Non-JSON broadcasts would require extending append_to_stream to accept a configurable content type.

Instance Method Summary collapse

Instance Method Details

#broadcast_event_later_to(*streamables, type:, key:, value:, operation:) ⇒ Object



45
46
47
48
49
50
51
52
53
54
# File 'lib/durable_streams/rails/broadcasts.rb', line 45

def broadcast_event_later_to(*streamables, type:, key:, value:, operation:)
  streamables.flatten!
  streamables.compact_blank!

  if streamables.present?
    DurableStreams::Rails::BroadcastJob.perform_later \
      stream_name_from(streamables),
      type: type, key: key, value: value, operation: operation.to_s
  end
end

#broadcast_event_to(*streamables, type:, key:, value:, operation:) ⇒ Object

Broadcast a State Protocol event to one or more streamables. Returns the generated txid for optimistic update confirmation on the client, or nil if streamables are blank.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/durable_streams/rails/broadcasts.rb', line 28

def broadcast_event_to(*streamables, type:, key:, value:, operation:)
  streamables.flatten!
  streamables.compact_blank!

  if streamables.present?
    txid = SecureRandom.uuid

    append_to_stream(
      stream_name_from(streamables),
      { type: type, key: key.to_s, value: value,
        headers: { operation: operation, txid: txid } }.to_json
    )

    txid
  end
end

#broadcast_to(*streamables, **payload) ⇒ Object



56
57
58
59
60
61
62
63
# File 'lib/durable_streams/rails/broadcasts.rb', line 56

def broadcast_to(*streamables, **payload)
  streamables.flatten!
  streamables.compact_blank!

  if streamables.present?
    append_to_stream(stream_name_from(streamables), payload.to_json)
  end
end