Module: Y::ActionCable::Sync
- Defined in:
- lib/y/action_cable/sync.rb
Overview
y-websocket protocol over ActionCable.
Include this module in an ActionCable channel to sync Y.js documents (and awareness/presence) with browser clients. Messages are the standard y-protocols binary messages, base64-encoded in a JSON envelope:
{ "update" => "<base64 bytes>", "id" => 42 } # client -> server
{ "update" => "<base64 bytes>" } # server -> subscribers
{ "ack" => 42 } # server -> sender
Example:
class DocumentChannel < ApplicationCable::Channel
include Y::ActionCable::Sync
on_load { |key| Document.find_by(key: key)&.content }
# on_change runs in the channel instance's context, so instance methods
# (current_user, params, ...) are available:
on_change { |key, update| Document.record!(key, update, by: current_user) }
def subscribed
sync_subscribed params[:id]
end
def receive(data)
sync_receive(data)
end
end
There is no unsubscribe hook: the server keeps no per-connection document or presence state, so a disconnect needs no server-side cleanup.
The concern is store-backed and fail-closed: every document update is
validated against on_load, recorded through on_change, then broadcast.
No authoritative document state is kept in ActionCable process memory.
Defined Under Namespace
Modules: ClassMethods
Constant Summary collapse
- MSG_KIND_SYNC_STEP1 =
Frame kinds we act on, from Y.message_kind. Its other codes (0 for a drop: malformed/truncated/multi-message/unknown, and 4 for an awareness query) fall through to a no-op in the dispatch below.
1- MSG_KIND_UPDATE =
2- MSG_KIND_AWARENESS =
3- DEFAULT_MAX_FRAME_BYTES =
Default incoming-frame size cap (decoded bytes). Generous enough for a large initial SyncStep2, small enough to bound a single message's allocation/parse cost. Override per channel with
max_frame_bytes. 8 * 1024 * 1024
Class Method Summary collapse
Instance Method Summary collapse
-
#sync_receive(data, key = nil) ⇒ Object
Call from
receive. -
#sync_subscribed(key) ⇒ Object
Call from
subscribed.
Class Method Details
.included(base) ⇒ Object
54 55 56 |
# File 'lib/y/action_cable/sync.rb', line 54 def self.included(base) base.extend(ClassMethods) end |
Instance Method Details
#sync_receive(data, key = nil) ⇒ Object
Call from receive. Applies the client's message, replies directly
when the protocol calls for it, and relays document/awareness changes
to the other subscribers.
Reliable delivery: document updates carry an "id", and the server replies
{ "ack" => id } once the update has been durably recorded. A
causally-gapped update is not acked; it gets a resync instead, so the
client retransmits until the update lands.
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/y/action_cable/sync.rb', line 118 def sync_receive(data, key = nil) # Pass `key` (params[:id]) when your transport doesn't keep the channel # instance alive across actions. Under AnyCable each RPC command gets a # fresh channel, so instance variables set in `subscribed` are gone here. @sync_key = key.to_s if key encoded = data.is_a?(Hash) ? data["update"] : nil return unless encoded.is_a?(String) # Optional client-supplied id for reliable delivery (see sync_send_ack). # data is known to be a Hash here (encoded came from it above). id = data["id"] # Frame-size cap: drop oversized frames before decoding (the encoded form # is ~4/3 the decoded size) and again after, so a client can't force large # base64 decodes / native parses / merges. A dropped frame is never acked, # and there is no protocol NACK, so a legitimate oversized update is # retransmitted indefinitely. Log the drop so it is at least findable. cap = self.class.max_frame_bytes if cap && encoded.bytesize > (cap * 4 / 3) + 4 sync_log_drop(:warn, "encoded #{encoded.bytesize}B exceeds max_frame_bytes #{cap}B", id) return end begin bytes = Base64.strict_decode64(encoded) rescue ArgumentError sync_log_drop(:debug, "not valid base64", id) # garbage or a probe, rarely a real client return # ignore the frame and keep the connection end if cap && bytes.bytesize > cap sync_log_drop(:warn, "decoded #{bytes.bytesize}B exceeds max_frame_bytes #{cap}B", id) return end sync_send_ack(id, sync_handle_frame(encoded, bytes)) end |
#sync_subscribed(key) ⇒ Object
Call from subscribed. Streams broadcasts for this document and
transmits the server's opening handshake (SyncStep1 from the store).
98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/y/action_cable/sync.rb', line 98 def sync_subscribed(key) @sync_key = key.to_s sync_validate_required_hooks! # The document stream is never whisper-enabled; under AnyCable we also # subscribe an awareness stream with `whisper: true`, scoping the client-to- # client path to ephemeral presence rather than the durable document stream. stream_from sync_stream_name stream_from sync_awareness_stream_name, whisper: true if respond_to?(:whispers_to) sync_transmit(sync_load_doc.sync_step1) end |