Module: DurableStreams::Rails::Broadcasts
- Included in:
- DurableStreams
- Defined in:
- lib/durable_streams/rails/broadcasts.rb
Overview
Provides the broadcast actions in synchronous and asynchronous form for the DurableStreams module. See DurableStreams::Rails::Broadcastable for the user-facing API that invokes these methods with most of the paperwork filled out already.
All broadcasts produce JSON. Two flavors are supported:
State Protocol events
broadcast_event_to and broadcast_event_later_to produce State Protocol events —the keyed insert/update/delete format consumed by @durable-streams/state and @tanstack/db for reactive collections on the client:
DurableStreams.broadcast_event_to :room, type: "message", key: "5",
value: { content: "Hello" }, operation: :insert
Arbitrary JSON
broadcast_to sends free-form JSON with no State Protocol structure:
DurableStreams.broadcast_to :room, event: "typing", user_id: 5
Although the underlying Durable Streams server supports any content type (bytes, text, etc.), this gem only creates application/json streams. Non-JSON broadcasts would require extending append_to_stream to accept a configurable content type.
Instance Method Summary collapse
- #broadcast_event_later_to(*streamables, type:, key:, value:, operation:) ⇒ Object
-
#broadcast_event_to(*streamables, type:, key:, value:, operation:) ⇒ Object
Broadcast a State Protocol event to one or more streamables.
- #broadcast_to(*streamables, **payload) ⇒ Object
-
#build_broadcast_event_to(*streamables, type:, key:, value:, operation:) ⇒ Object
Build the
BroadcastJobfor a State Protocol event without enqueuing it, so a fan-out to many distinct streams can be enqueued in a single Active Job insert viaActiveJob.perform_all_later— one bulk insert instead of oneperform_laterper recipient.
Instance Method Details
#broadcast_event_later_to(*streamables, type:, key:, value:, operation:) ⇒ Object
45 46 47 |
# File 'lib/durable_streams/rails/broadcasts.rb', line 45 def broadcast_event_later_to(*streamables, type:, key:, value:, operation:) build_broadcast_event_to(*streamables, type: type, key: key, value: value, operation: operation)&.enqueue end |
#broadcast_event_to(*streamables, type:, key:, value:, operation:) ⇒ Object
Broadcast a State Protocol event to one or more streamables. Returns the generated txid for optimistic update confirmation on the client, or nil if streamables are blank.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/durable_streams/rails/broadcasts.rb', line 28 def broadcast_event_to(*streamables, type:, key:, value:, operation:) streamables.flatten! streamables.compact_blank! if streamables.present? txid = SecureRandom.uuid append_to_stream( stream_name_from(streamables), { type: type, key: key.to_s, value: value, headers: { operation: operation, txid: txid } }.to_json ) txid end end |
#broadcast_to(*streamables, **payload) ⇒ Object
76 77 78 79 80 81 82 83 |
# File 'lib/durable_streams/rails/broadcasts.rb', line 76 def broadcast_to(*streamables, **payload) streamables.flatten! streamables.compact_blank! if streamables.present? append_to_stream(stream_name_from(streamables), payload.to_json) end end |
#build_broadcast_event_to(*streamables, type:, key:, value:, operation:) ⇒ Object
Build the BroadcastJob for a State Protocol event without enqueuing it, so a fan-out to many distinct streams can be enqueued in a single Active Job insert via ActiveJob.perform_all_later — one bulk insert instead of one perform_later per recipient. Returns nil when streamables are blank, so a collection can be filter_mapped straight into perform_all_later:
jobs = residencies.filter_map { |residency|
DurableStreams.build_broadcast_event_to residency.envoy, :residencies,
type: "residency", key: residency.sync_id, value: residency.to_h, operation: :update
}
ActiveJob.perform_all_later(jobs)
Turbo has no counterpart: its broadcasts target one stream per call, so it never needs to bulk-enqueue. We fan out per recipient (one stream each), where N separate broadcast_event_later_to calls would be N enqueues. broadcast_event_later_to is defined in terms of this builder (...&.enqueue) so job construction lives in one place.
65 66 67 68 69 70 71 72 73 74 |
# File 'lib/durable_streams/rails/broadcasts.rb', line 65 def build_broadcast_event_to(*streamables, type:, key:, value:, operation:) streamables.flatten! streamables.compact_blank! if streamables.present? DurableStreams::Rails::BroadcastJob.new \ stream_name_from(streamables), type: type, key: key, value: value, operation: operation.to_s end end |