Module: DurableStreams::Rails::Broadcastable
- Extended by:
- ActiveSupport::Concern
- Defined in:
- app/models/concerns/durable_streams/rails/broadcastable.rb,
lib/durable_streams/rails/broadcastable/test_helper.rb
Overview
Durable Streams can be broadcast directly from models that include this module (automatically done for Active Records via the engine). This makes it convenient to execute both synchronous and asynchronous State Protocol broadcasts from callbacks or controllers. Here’s an example:
class Comment < ApplicationRecord
belongs_to :post
after_create_commit :broadcast_later
private
def broadcast_later
stream_insert_later_to post
end
end
This broadcasts a State Protocol insert event to the stream derived from the post association. All clients subscribed to that stream will receive the event.
There are four basic operations you can broadcast: insert, update, upsert, and delete. As a rule, you should use the _later versions when broadcasting within a real-time path, like a controller or model callback, since those go through a background job.
Declarative streaming
For the common case of broadcasting creates, updates, and destroys, you can use the class-level streams_to declaration:
class Comment < ApplicationRecord
belongs_to :post
streams_to :post
end
This is equivalent to registering after_create_commit, after_update_commit, and after_destroy_commit callbacks that broadcast the appropriate State Protocol events.
When the stream target is the model itself, use the self-targeting streams declaration:
class Board < ApplicationRecord
streams
end
Suppressing broadcasts
Sometimes, you need to disable broadcasts in certain scenarios. You can use .suppressing_streams to create execution contexts where broadcasts are disabled:
Comment.suppressing_streams do
Comment.create!(post: post) # This won't broadcast the insert event
end
Defined Under Namespace
Modules: ClassMethods, TestHelper
Instance Method Summary collapse
-
#stream_delete ⇒ Object
Same as
#stream_delete_to, but the designated stream is automatically set to the current model. -
#stream_delete_later ⇒ Object
Same as
#stream_delete_later_to, but the designated stream is automatically set to the current model. -
#stream_delete_later_to(*streamables) ⇒ Object
Same as
#stream_delete_tobut run asynchronously via aDurableStreams::Rails::BroadcastJob. -
#stream_delete_to(*streamables) ⇒ Object
Broadcast a State Protocol delete event to the stream identified by the passed
streamables. -
#stream_insert ⇒ Object
Same as
#stream_insert_to, but the designated stream is automatically set to the current model. -
#stream_insert_later ⇒ Object
Same as
#stream_insert_later_to, but the designated stream is automatically set to the current model. -
#stream_insert_later_to(*streamables) ⇒ Object
Same as
#stream_insert_tobut run asynchronously via aDurableStreams::Rails::BroadcastJob. -
#stream_insert_to(*streamables) ⇒ Object
Broadcast a State Protocol insert event to the stream identified by the passed
streamables. -
#stream_update ⇒ Object
Same as
#stream_update_to, but the designated stream is automatically set to the current model. -
#stream_update_later ⇒ Object
Same as
#stream_update_later_to, but the designated stream is automatically set to the current model. -
#stream_update_later_to(*streamables) ⇒ Object
Same as
#stream_update_tobut run asynchronously via aDurableStreams::Rails::BroadcastJob. -
#stream_update_to(*streamables) ⇒ Object
Broadcast a State Protocol update event to the stream identified by the passed
streamables. -
#stream_upsert ⇒ Object
Same as
#stream_upsert_to, but the designated stream is automatically set to the current model. -
#stream_upsert_later ⇒ Object
Same as
#stream_upsert_later_to, but the designated stream is automatically set to the current model. -
#stream_upsert_later_to(*streamables) ⇒ Object
Same as
#stream_upsert_tobut run asynchronously via aDurableStreams::Rails::BroadcastJob. -
#stream_upsert_to(*streamables) ⇒ Object
Broadcast a State Protocol upsert event to the stream identified by the passed
streamables.
Instance Method Details
#stream_delete ⇒ Object
Same as #stream_delete_to, but the designated stream is automatically set to the current model.
154 155 156 |
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 154 def stream_delete stream_delete_to self end |
#stream_delete_later ⇒ Object
Same as #stream_delete_later_to, but the designated stream is automatically set to the current model.
194 195 196 |
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 194 def stream_delete_later stream_delete_later_to self end |
#stream_delete_later_to(*streamables) ⇒ Object
Same as #stream_delete_to but run asynchronously via a DurableStreams::Rails::BroadcastJob.
189 190 191 |
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 189 def stream_delete_later_to(*streamables) DurableStreams.broadcast_event_later_to(*streamables, **stream_event_attributes(operation: :delete, value: nil)) unless suppressed_streams? end |
#stream_delete_to(*streamables) ⇒ Object
Broadcast a State Protocol delete event to the stream identified by the passed streamables. The value is nil — only the key is needed for deletes. Example:
.stream_delete_to post
149 150 151 |
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 149 def stream_delete_to(*streamables) DurableStreams.broadcast_event_to(*streamables, **stream_event_attributes(operation: :delete, value: nil)) unless suppressed_streams? end |
#stream_insert ⇒ Object
Same as #stream_insert_to, but the designated stream is automatically set to the current model.
117 118 119 |
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 117 def stream_insert stream_insert_to self end |
#stream_insert_later ⇒ Object
Same as #stream_insert_later_to, but the designated stream is automatically set to the current model.
164 165 166 |
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 164 def stream_insert_later stream_insert_later_to self end |
#stream_insert_later_to(*streamables) ⇒ Object
Same as #stream_insert_to but run asynchronously via a DurableStreams::Rails::BroadcastJob.
159 160 161 |
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 159 def stream_insert_later_to(*streamables) DurableStreams.broadcast_event_later_to(*streamables, **stream_event_attributes(operation: :insert)) unless suppressed_streams? end |
#stream_insert_to(*streamables) ⇒ Object
Broadcast a State Protocol insert event to the stream identified by the passed streamables. Returns the txid that can be used for optimistic update confirmation. Example:
# Broadcasts {"type":"message","key":"5","value":{...},"headers":{"operation":"insert"}}
txid = .stream_insert_to post, :messages
render json: { txid: txid }
112 113 114 |
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 112 def stream_insert_to(*streamables) DurableStreams.broadcast_event_to(*streamables, **stream_event_attributes(operation: :insert)) unless suppressed_streams? end |
#stream_update ⇒ Object
Same as #stream_update_to, but the designated stream is automatically set to the current model.
129 130 131 |
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 129 def stream_update stream_update_to self end |
#stream_update_later ⇒ Object
Same as #stream_update_later_to, but the designated stream is automatically set to the current model.
174 175 176 |
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 174 def stream_update_later stream_update_later_to self end |
#stream_update_later_to(*streamables) ⇒ Object
Same as #stream_update_to but run asynchronously via a DurableStreams::Rails::BroadcastJob.
169 170 171 |
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 169 def stream_update_later_to(*streamables) DurableStreams.broadcast_event_later_to(*streamables, **stream_event_attributes(operation: :update)) unless suppressed_streams? end |
#stream_update_to(*streamables) ⇒ Object
Broadcast a State Protocol update event to the stream identified by the passed streamables. Example:
.stream_update_to post
124 125 126 |
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 124 def stream_update_to(*streamables) DurableStreams.broadcast_event_to(*streamables, **stream_event_attributes(operation: :update)) unless suppressed_streams? end |
#stream_upsert ⇒ Object
Same as #stream_upsert_to, but the designated stream is automatically set to the current model.
141 142 143 |
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 141 def stream_upsert stream_upsert_to self end |
#stream_upsert_later ⇒ Object
Same as #stream_upsert_later_to, but the designated stream is automatically set to the current model.
184 185 186 |
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 184 def stream_upsert_later stream_upsert_later_to self end |
#stream_upsert_later_to(*streamables) ⇒ Object
Same as #stream_upsert_to but run asynchronously via a DurableStreams::Rails::BroadcastJob.
179 180 181 |
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 179 def stream_upsert_later_to(*streamables) DurableStreams.broadcast_event_later_to(*streamables, **stream_event_attributes(operation: :upsert)) unless suppressed_streams? end |
#stream_upsert_to(*streamables) ⇒ Object
Broadcast a State Protocol upsert event to the stream identified by the passed streamables. Example:
.stream_upsert_to post
136 137 138 |
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 136 def stream_upsert_to(*streamables) DurableStreams.broadcast_event_to(*streamables, **stream_event_attributes(operation: :upsert)) unless suppressed_streams? end |