Module: Rage::SSE

Defined in:
lib/rage/sse/sse.rb

Defined Under Namespace

Classes: ConnectionProxy, Message, Stream

Class Method Summary collapse

Class Method Details

.broadcast(streamable, data) ⇒ Object

Broadcast a message to all clients subscribed to a given stream.

Examples:

Rage::SSE.broadcast("#{current_user.id}-notifications", "You have a new notification!")
Rage::SSE.broadcast([current_user.id, "notifications"], { title: "New Notification", body: "You have a new notification!" })

Parameters:

  • streamable (#id, String, Symbol, Numeric, Array)

    The identifier of the stream to broadcast to.

  • data (String, #to_json, Message)

    The message to broadcast.



75
76
77
78
79
80
81
82
83
84
85
# File 'lib/rage/sse/sse.rb', line 75

def self.broadcast(streamable, data)
  Rage::Telemetry.tracer.span_sse_stream_broadcast(stream: streamable) do
    stream_name = Rage::Internal.stream_name_for(streamable)
    serialized_data = __serialize(data)

    InternalBroadcast.broadcast(stream_name, serialized_data, Iodine::PubSub::CLUSTER) if Iodine.running?
    @__adapter&.publish(PUBSUB_BROADCASTER_ID, stream_name, serialized_data)
  end

  true
end

.close_stream(streamable) ⇒ Object

Close an unbounded SSE stream. Unbounded streams will remain open until either the client disconnects or the server explicitly closes them.

Examples:

Rage::SSE.close_stream("#{current_user.id}-notifications")
Rage::SSE.close_stream([current_user.id, "notifications"])

Parameters:

  • streamable (#id, String, Symbol, Numeric, Array)

    The identifier of the stream to close.



58
59
60
61
62
63
64
65
# File 'lib/rage/sse/sse.rb', line 58

def self.close_stream(streamable)
  stream_name = Rage::Internal.stream_name_for(streamable)

  InternalBroadcast.broadcast(stream_name, CLOSE_STREAM_MSG, Iodine::PubSub::CLUSTER) if Iodine.running?
  @__adapter&.publish(PUBSUB_BROADCASTER_ID, stream_name, CLOSE_STREAM_MSG)

  true
end

.message(data, id: nil, event: nil, retry: nil) ⇒ Message

A factory method for creating Server-Sent Events.

Examples:

render sse: Rage::SSE.message(current_user.profile, id: current_user.id)

Parameters:

  • data (String, #to_json)

    The data field for the SSE event. If the object provided is not a string, it will be serialized to JSON.

  • id (String, nil) (defaults to: nil)

    The id field for the SSE event. This can be used to track messages.

  • event (String, nil) (defaults to: nil)

    The event field for the SSE event. This can be used to define custom event types.

  • retry (Integer, nil) (defaults to: nil)

    The retry field for the SSE event, in milliseconds. This value is used to instruct the client how long to wait before attempting to reconnect.

Returns:

  • (Message)

    The formatted SSE event.



13
14
15
# File 'lib/rage/sse/sse.rb', line 13

def self.message(data, id: nil, event: nil, retry: nil)
  Message.new(data:, id:, event:, retry:)
end

.stream(streamable) ⇒ Stream

A factory method for creating unbounded SSE streams.

Examples:

render sse: Rage::SSE.stream("#{current_user.id}-notifications")
render sse: Rage::SSE.stream([current_user.id, "notifications"])

Parameters:

  • streamable (#id, String, Symbol, Numeric, Array)

    An object to generate the stream name from.

Returns:

  • (Stream)

    A new SSE stream instance.



25
26
27
# File 'lib/rage/sse/sse.rb', line 25

def self.stream(streamable)
  Stream.new(streamable:)
end