Module: YrbLite::ActionCable::Sync
- Defined in:
- lib/yrb_lite/action_cable/sync.rb
Overview
y-websocket protocol over ActionCable.
Include this module in an ActionCable channel to sync Y.js documents (and awareness/presence) with browser clients. Messages are the standard y-protocols binary messages, base64-encoded in a JSON envelope:
{ "update" => "<base64 bytes>", "id" => 42 } # client -> server
{ "update" => "...", "origin" => "<id>" } # server -> subscribers
{ "ack" => 42 } # server -> sender
Example:
class DocumentChannel < ApplicationCable::Channel
include YrbLite::ActionCable::Sync
on_load { |key| Document.find_by(key: key)&.content }
# on_change blocks run in the channel instance's context, so instance
# methods (current_user, params, ...) are available without plumbing:
on_change { |key, update| Document.record!(key, update, by: current_user) }
def subscribed
sync_for params[:id]
end
def receive(data)
sync_receive(data)
end
def unsubscribed
sync_unsubscribed
end
end
The concern is store-backed and fail-closed: every document update is validated against ‘on_load`, recorded through `on_change`, then broadcast. No authoritative document state is kept in ActionCable process memory.
Defined Under Namespace
Modules: ClassMethods
Constant Summary collapse
- MSG_KIND_DROP =
Validated frame kinds from Awareness#message_kind. A frame only gets a non-DROP kind if it is exactly one well-formed message; anything malformed, truncated, multi-message, or unknown is dropped before it can be processed or relayed.
0- MSG_KIND_SYNC_STEP1 =
1- MSG_KIND_UPDATE =
2- MSG_KIND_AWARENESS =
3- MSG_KIND_AWARENESS_QUERY =
4- DEFAULT_MAX_FRAME_BYTES =
Default incoming-frame size cap (decoded bytes). Generous enough for a large initial SyncStep2, small enough to bound a single message’s allocation/parse cost. Override per channel with ‘max_frame_bytes`.
8 * 1024 * 1024
Class Method Summary collapse
-
.codec ⇒ Object
A shared, stateless decoder for the store-backed path.
- .included(base) ⇒ Object
-
.lock_for(key) ⇒ Object
Per-document mutex serializing load -> record -> broadcast within this process.
-
.process_id ⇒ Object
A stable id for this server process, stamped on every broadcast so other processes know to apply it to their replica and this process knows to skip its own.
-
.reset! ⇒ Object
Clear process-local locks and codec (useful for testing).
Instance Method Summary collapse
-
#sync_dispatch(encoded, bytes) ⇒ Object
Route a decoded frame to the backend/path that handles it and return the outcome symbol (:recorded/:applied/:gap/:noop) used by the reliable- delivery ack.
-
#sync_for(key) ⇒ Object
Call from ‘subscribed`.
-
#sync_receive(data, key = nil) ⇒ Object
Call from ‘receive`.
-
#sync_unsubscribed(key = nil) ⇒ Object
Kept as the ActionCable lifecycle hook target.
Class Method Details
.codec ⇒ Object
A shared, stateless decoder for the store-backed path. message_kind and update_from_message only read their argument (they don’t touch the instance’s document), so one shared instance is safe across threads.
321 322 323 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 321 def codec @codec ||= YrbLite::Awareness.new end |
.included(base) ⇒ Object
59 60 61 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 59 def self.included(base) base.extend(ClassMethods) end |
.lock_for(key) ⇒ Object
Per-document mutex serializing load -> record -> broadcast within this process. The durable store remains the cross-process source of truth. Only briefly holds the registry mutex to fetch/create the lock; the durable write itself runs while holding only this per-key lock.
329 330 331 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 329 def lock_for(key) @registry_mutex.synchronize { @locks[key] ||= Mutex.new } end |
.process_id ⇒ Object
A stable id for this server process, stamped on every broadcast so other processes know to apply it to their replica and this process knows to skip its own. Survives for the life of the process.
314 315 316 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 314 def process_id @process_id ||= SecureRandom.hex(8) end |
.reset! ⇒ Object
Clear process-local locks and codec (useful for testing).
334 335 336 337 338 339 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 334 def reset! @registry_mutex.synchronize do @locks = {} @codec = nil end end |
Instance Method Details
#sync_dispatch(encoded, bytes) ⇒ Object
Route a decoded frame to the backend/path that handles it and return the outcome symbol (:recorded/:applied/:gap/:noop) used by the reliable- delivery ack. A dropped frame returns nil (never acked).
154 155 156 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 154 def sync_dispatch(encoded, bytes) sync_receive_store_backed(encoded, bytes) end |
#sync_for(key) ⇒ Object
Call from ‘subscribed`. Streams broadcasts for this document and transmits the server’s opening handshake (SyncStep1 from the store).
104 105 106 107 108 109 110 111 112 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 104 def sync_for(key) @sync_key = key.to_s @sync_origin = SecureRandom.hex(8) sync_require_store_recorder! sync_stream sync_stream_name sync_stream sync_awareness_stream_name, whisper: true if respond_to?(:whispers_to) sync_transmit(sync_load_doc.sync_step1) end |
#sync_receive(data, key = nil) ⇒ Object
Call from ‘receive`. Applies the client’s message, replies directly when the protocol calls for it, and relays document/awareness changes to the other subscribers.
Reliable delivery: document updates carry an “id”, and the server replies ‘{ “ack” => id }` once the update has been durably recorded. A causally-gapped update is not acked – it gets a resync instead – so the client retransmits until the update lands.
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 122 def sync_receive(data, key = nil) # Pass `key` (params[:id]) when your transport doesn't keep the channel # instance alive across actions. Under AnyCable each RPC command gets a # fresh channel, so instance variables set in `subscribed` are gone here. @sync_key = key.to_s if key encoded = data.is_a?(Hash) ? data["update"] : nil return unless encoded.is_a?(String) # Optional client-supplied id for reliable delivery (see sync_send_ack). id = data.is_a?(Hash) ? data["id"] : nil # Frame-size cap: drop oversized frames before decoding (the encoded form # is ~4/3 the decoded size) and again after, so a client can't force large # base64 decodes / native parses / merges. A dropped frame is never acked. cap = self.class.max_frame_bytes return if cap && encoded.bytesize > (cap * 4 / 3) + 4 begin bytes = Base64.strict_decode64(encoded) rescue ArgumentError return # not valid base64; ignore the frame and keep the connection end return if cap && bytes.bytesize > cap sync_send_ack(id, sync_dispatch(encoded, bytes)) end |
#sync_unsubscribed(key = nil) ⇒ Object
Kept as the ActionCable lifecycle hook target. There is no cached document or server-owned presence state to clean up in the store-backed design.
160 161 162 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 160 def sync_unsubscribed(key = nil) @sync_key = key.to_s if key end |