Module: DurableStreams::Rails::Broadcastable

Extended by:
ActiveSupport::Concern
Defined in:
app/models/concerns/durable_streams/rails/broadcastable.rb,
lib/durable_streams/rails/broadcastable/test_helper.rb

Overview

Durable Streams can be broadcast directly from models that include this module (automatically done for Active Records via the engine). This makes it convenient to execute both synchronous and asynchronous State Protocol broadcasts from callbacks or controllers. Here’s an example:

class Comment < ApplicationRecord
  belongs_to :post

  after_create_commit :broadcast_later

  private
    def broadcast_later
      stream_insert_later_to post
    end
end

This broadcasts a State Protocol insert event to the stream derived from the post association. All clients subscribed to that stream will receive the event.

There are four basic operations you can broadcast: insert, update, upsert, and delete. As a rule, you should use the _later versions when broadcasting within a real-time path, like a controller or model callback, since those go through a background job.

Declarative streaming

For the common case of broadcasting creates, updates, and destroys, you can use the class-level streams_to declaration:

class Comment < ApplicationRecord
  belongs_to :post
  streams_to :post
end

This is equivalent to registering after_create_commit, after_update_commit, and after_destroy_commit callbacks that broadcast the appropriate State Protocol events.

When the stream target is the model itself, use the self-targeting streams declaration:

class Board < ApplicationRecord
  streams
end

Suppressing broadcasts

Sometimes, you need to disable broadcasts in certain scenarios. You can use .suppressing_streams to create execution contexts where broadcasts are disabled:

Comment.suppressing_streams do
  Comment.create!(post: post) # This won't broadcast the insert event
end

Defined Under Namespace

Modules: ClassMethods, TestHelper

Instance Method Summary collapse

Instance Method Details

#stream_deleteObject

Same as #stream_delete_to, but the designated stream is automatically set to the current model.



154
155
156
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 154

def stream_delete
  stream_delete_to self
end

#stream_delete_laterObject

Same as #stream_delete_later_to, but the designated stream is automatically set to the current model.



194
195
196
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 194

def stream_delete_later
  stream_delete_later_to self
end

#stream_delete_later_to(*streamables) ⇒ Object

Same as #stream_delete_to but run asynchronously via a DurableStreams::Rails::BroadcastJob.



189
190
191
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 189

def stream_delete_later_to(*streamables)
  DurableStreams.broadcast_event_later_to(*streamables, **stream_event_attributes(operation: :delete, value: nil)) unless suppressed_streams?
end

#stream_delete_to(*streamables) ⇒ Object

Broadcast a State Protocol delete event to the stream identified by the passed streamables. The value is nil — only the key is needed for deletes. Example:

message.stream_delete_to post


149
150
151
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 149

def stream_delete_to(*streamables)
  DurableStreams.broadcast_event_to(*streamables, **stream_event_attributes(operation: :delete, value: nil)) unless suppressed_streams?
end

#stream_insertObject

Same as #stream_insert_to, but the designated stream is automatically set to the current model.



117
118
119
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 117

def stream_insert
  stream_insert_to self
end

#stream_insert_laterObject

Same as #stream_insert_later_to, but the designated stream is automatically set to the current model.



164
165
166
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 164

def stream_insert_later
  stream_insert_later_to self
end

#stream_insert_later_to(*streamables) ⇒ Object

Same as #stream_insert_to but run asynchronously via a DurableStreams::Rails::BroadcastJob.



159
160
161
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 159

def stream_insert_later_to(*streamables)
  DurableStreams.broadcast_event_later_to(*streamables, **stream_event_attributes(operation: :insert)) unless suppressed_streams?
end

#stream_insert_to(*streamables) ⇒ Object

Broadcast a State Protocol insert event to the stream identified by the passed streamables. Returns the txid that can be used for optimistic update confirmation. Example:

# Broadcasts {"type":"message","key":"5","value":{...},"headers":{"operation":"insert"}}
txid = message.stream_insert_to post, :messages
render json: { txid: txid }


112
113
114
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 112

def stream_insert_to(*streamables)
  DurableStreams.broadcast_event_to(*streamables, **stream_event_attributes(operation: :insert)) unless suppressed_streams?
end

#stream_updateObject

Same as #stream_update_to, but the designated stream is automatically set to the current model.



129
130
131
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 129

def stream_update
  stream_update_to self
end

#stream_update_laterObject

Same as #stream_update_later_to, but the designated stream is automatically set to the current model.



174
175
176
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 174

def stream_update_later
  stream_update_later_to self
end

#stream_update_later_to(*streamables) ⇒ Object

Same as #stream_update_to but run asynchronously via a DurableStreams::Rails::BroadcastJob.



169
170
171
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 169

def stream_update_later_to(*streamables)
  DurableStreams.broadcast_event_later_to(*streamables, **stream_event_attributes(operation: :update)) unless suppressed_streams?
end

#stream_update_to(*streamables) ⇒ Object

Broadcast a State Protocol update event to the stream identified by the passed streamables. Example:

message.stream_update_to post


124
125
126
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 124

def stream_update_to(*streamables)
  DurableStreams.broadcast_event_to(*streamables, **stream_event_attributes(operation: :update)) unless suppressed_streams?
end

#stream_upsertObject

Same as #stream_upsert_to, but the designated stream is automatically set to the current model.



141
142
143
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 141

def stream_upsert
  stream_upsert_to self
end

#stream_upsert_laterObject

Same as #stream_upsert_later_to, but the designated stream is automatically set to the current model.



184
185
186
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 184

def stream_upsert_later
  stream_upsert_later_to self
end

#stream_upsert_later_to(*streamables) ⇒ Object

Same as #stream_upsert_to but run asynchronously via a DurableStreams::Rails::BroadcastJob.



179
180
181
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 179

def stream_upsert_later_to(*streamables)
  DurableStreams.broadcast_event_later_to(*streamables, **stream_event_attributes(operation: :upsert)) unless suppressed_streams?
end

#stream_upsert_to(*streamables) ⇒ Object

Broadcast a State Protocol upsert event to the stream identified by the passed streamables. Example:

message.stream_upsert_to post


136
137
138
# File 'app/models/concerns/durable_streams/rails/broadcastable.rb', line 136

def stream_upsert_to(*streamables)
  DurableStreams.broadcast_event_to(*streamables, **stream_event_attributes(operation: :upsert)) unless suppressed_streams?
end