Module: YrbLite::ActionCable::Sync
- Defined in:
- lib/yrb_lite/action_cable/sync.rb
Overview
y-websocket protocol over ActionCable.
Include this module in an ActionCable channel to sync Y.js documents (and awareness/presence) with browser clients. Messages are the standard y-protocols binary messages, base64-encoded in a JSON envelope:
{ "m" => "<base64 bytes>" } # client -> server
{ "m" => "...", "origin" => "<id>" } # server -> subscribers
Example:
class DocumentChannel < ApplicationCable::Channel
include YrbLite::ActionCable::Sync
on_load { |key| Document.find_by(key: key)&.content }
on_save { |key, update| Document.find_by(key: key)&.update!(content: update) }
# on_change blocks run in the channel instance's context, so instance
# methods (current_user, params, ...) are available without plumbing:
on_change { |key, update| Document.record!(key, update, by: current_user) }
def subscribed
sync_for params[:id]
end
def receive(data)
sync_receive(data)
end
def unsubscribed
sync_clear_presence
end
end
The shared YrbLite::Awareness instances are safe to use from ActionCable’s worker thread pool: the native types are Send + Sync and every operation releases the GVL, so concurrent clients sync in parallel.
Defined Under Namespace
Modules: ClassMethods
Constant Summary collapse
- MSG_KIND_DROP =
Validated frame kinds from Awareness#message_kind. A frame only gets a non-DROP kind if it is exactly one well-formed message; anything malformed, truncated, multi-message, or unknown is dropped before it can be processed or relayed.
0- MSG_KIND_SYNC_STEP1 =
1- MSG_KIND_UPDATE =
2- MSG_KIND_AWARENESS =
3- MSG_KIND_AWARENESS_QUERY =
4
Class Method Summary collapse
-
.awareness_for(key, loader = nil) ⇒ Object
Get or create the shared Awareness for a key.
-
.codec ⇒ Object
A shared, stateless decoder for the store-backed path.
- .included(base) ⇒ Object
-
.lock_for(key) ⇒ Object
Per-document mutex serializing the authoritative record -> apply -> broadcast section, so a document’s audit log is a single total order.
-
.process_id ⇒ Object
A stable id for this server process, stamped on every broadcast so other processes know to apply it to their replica and this process knows to skip its own.
- .registry ⇒ Object
-
.release(key, evictable:) {|awareness| ... } ⇒ Object
Drop a subscriber.
-
.reset! ⇒ Object
Clear all documents (useful for testing).
-
.subscribe(key) ⇒ Object
Count a new subscriber for a document.
Instance Method Summary collapse
-
#sync_awareness ⇒ Object
The shared Awareness (document + presence) for this channel’s key.
-
#sync_clear_presence ⇒ Object
Call from ‘unsubscribed`.
-
#sync_dispatch(encoded, bytes) ⇒ Object
Route a decoded frame to the backend/path that handles it and return the outcome symbol (:recorded/:applied/:gap/:noop) used by the reliable- delivery ack.
-
#sync_for(key) ⇒ Object
Call from ‘subscribed`.
-
#sync_receive(data, key = nil) ⇒ Object
Call from ‘receive`.
-
#sync_unsubscribed(key = nil) ⇒ Object
Call from ‘unsubscribed`.
Class Method Details
.awareness_for(key, loader = nil) ⇒ Object
Get or create the shared Awareness for a key. Creation (including the on_load callback) is serialized under a mutex so concurrent subscribers can never observe two documents for one key; all subsequent operations run lock-free on the thread-safe native types.
511 512 513 514 515 516 517 518 519 520 521 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 511 def awareness_for(key, loader = nil) @registry_mutex.synchronize do @registry[key] ||= begin awareness = YrbLite::Awareness.new if loader && (state = loader.call(key)) awareness.apply_update(state) end awareness end end end |
.codec ⇒ Object
A shared, stateless decoder for the store-backed path. message_kind and update_from_message only read their argument (they don’t touch the instance’s document), so one shared instance is safe across threads.
503 504 505 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 503 def codec @codec ||= YrbLite::Awareness.new end |
.included(base) ⇒ Object
55 56 57 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 55 def self.included(base) base.extend(ClassMethods) end |
.lock_for(key) ⇒ Object
Per-document mutex serializing the authoritative record -> apply -> broadcast section, so a document’s audit log is a single total order. Only briefly holds the registry mutex to fetch/create the lock; the durable write itself runs while holding only this per-key lock.
527 528 529 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 527 def lock_for(key) @registry_mutex.synchronize { @locks[key] ||= Mutex.new } end |
.process_id ⇒ Object
A stable id for this server process, stamped on every broadcast so other processes know to apply it to their replica and this process knows to skip its own. Survives for the life of the process.
496 497 498 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 496 def process_id @process_id ||= SecureRandom.hex(8) end |
.registry ⇒ Object
567 568 569 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 567 def registry @registry_mutex.synchronize { @registry.dup } end |
.release(key, evictable:) {|awareness| ... } ⇒ Object
Drop a subscriber. When the last one leaves and the document is evictable (there’s an on_load to bring it back, so unloading can’t lose data), persist it via the given block and unload it from memory, so a long-running server doesn’t accumulate every document and lock it has ever seen. Returns true if the document was evicted.
The persist runs outside the registry lock (it may do I/O), and we re-check the subscriber count afterward: if someone reconnected while we were saving, eviction is aborted and the warm document is kept.
545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 545 def release(key, evictable:) awareness = @registry_mutex.synchronize do @subscribers[key] -= 1 if @subscribers[key].positive? next nil unless @subscribers[key].zero? @subscribers.delete(key) evictable ? @registry[key] : nil end return false unless awareness yield awareness if block_given? @registry_mutex.synchronize do # A subscriber may have returned during the persist above. next false unless @subscribers[key].zero? @subscribers.delete(key) @locks.delete(key) !@registry.delete(key).nil? end end |
.reset! ⇒ Object
Clear all documents (useful for testing).
572 573 574 575 576 577 578 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 572 def reset! @registry_mutex.synchronize do @registry = {} @locks = {} @subscribers = Hash.new(0) end end |
.subscribe(key) ⇒ Object
Count a new subscriber for a document.
532 533 534 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 532 def subscribe(key) @registry_mutex.synchronize { @subscribers[key] += 1 } end |
Instance Method Details
#sync_awareness ⇒ Object
The shared Awareness (document + presence) for this channel’s key. Also useful for server-side reads, e.g.:
sync_awareness.encode_state_as_update
227 228 229 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 227 def sync_awareness Sync.awareness_for(@sync_key, self.class.on_load) end |
#sync_clear_presence ⇒ Object
Call from ‘unsubscribed`. Clears the presence states this connection introduced and tells the other subscribers to drop those cursors, so a closed tab or dropped socket doesn’t leave a ghost cursor behind until the client-side timeout reaps it.
198 199 200 201 202 203 204 205 206 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 198 def sync_clear_presence return if @sync_clients.nil? || @sync_clients.empty? removal = sync_awareness.remove_clients(@sync_clients) @sync_clients = [] return if removal.empty? sync_distribute(Base64.strict_encode64(removal)) end |
#sync_dispatch(encoded, bytes) ⇒ Object
Route a decoded frame to the backend/path that handles it and return the outcome symbol (:recorded/:applied/:gap/:noop) used by the reliable- delivery ack. A dropped frame returns nil (never acked).
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 176 def sync_dispatch(encoded, bytes) return sync_receive_store_backed(encoded, bytes) if self.class.sync_backend == :store awareness = sync_awareness kind = awareness.(bytes) # Malformed / truncated / multi-message / unknown frames are dropped # before they can be processed or relayed to other clients. return if kind == MSG_KIND_DROP sync_track_clients(awareness, bytes) if kind == MSG_KIND_AWARENESS if kind == MSG_KIND_UPDATE && self.class.on_change (awareness, encoded, bytes) else sync_apply_fast(awareness, encoded, bytes, kind) end end |
#sync_for(key) ⇒ Object
Call from ‘subscribed`. Streams broadcasts for this document and transmits the server’s opening handshake (SyncStep1 + awareness).
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 114 def sync_for(key) @sync_key = key.to_s @sync_origin = SecureRandom.hex(8) @sync_clients = [] # awareness client IDs seen on this connection return sync_for_store_backed if self.class.sync_backend == :store Sync.subscribe(@sync_key) awareness = sync_awareness stream_from sync_stream_name, coder: ActiveSupport::JSON do |payload| sync_on_broadcast(payload) end # Opening handshake: SyncStep1 then the current awareness, each as its # own single-message frame, so providers that parse one message per frame # (e.g. @y-rb/actioncable) handle both. The client replies SyncStep2 to # the SyncStep1, delivering its state to the server. sync_transmit(awareness.sync_step1) sync_transmit(awareness.encode_awareness_update) end |
#sync_receive(data, key = nil) ⇒ Object
Call from ‘receive`. Applies the client’s message, replies directly when the protocol calls for it, and relays document/awareness changes to the other subscribers.
If an ‘on_change` recorder is registered, document changes take the strict authoritative path (record -> apply -> broadcast, serialized per document); otherwise the fast path is used.
Reliable delivery (opt-in, client-driven): if the frame carries an “id”, the server replies ‘{ “ack” => id }` once the update has been accepted (recorded in audit mode, applied in fast mode). A causally-gapped update is not acked – it gets a resync instead – so an ack-aware client knows to retransmit until the update lands. Stock clients send no “id”, never get acks, and are completely unaffected.
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 150 def sync_receive(data, key = nil) # Pass `key` (params[:id]) when your transport doesn't keep the channel # instance alive across actions. Under AnyCable each RPC command gets a # fresh channel, so instance variables set in `subscribed` are gone here. @sync_key = key.to_s if key # Accept both envelope keys: "m" (yrb-lite's own clients) and "update" # (the @y-rb/actioncable browser provider). m = data.is_a?(Hash) ? (data["m"] || data["update"]) : nil return unless m.is_a?(String) # Optional client-supplied id for reliable delivery (see sync_send_ack). id = data.is_a?(Hash) ? data["id"] : nil begin bytes = Base64.strict_decode64(m) rescue ArgumentError return # not valid base64; ignore the frame and keep the connection end sync_send_ack(id, sync_dispatch(m, bytes)) end |
#sync_unsubscribed(key = nil) ⇒ Object
Call from ‘unsubscribed`. Clears this connection’s presence and, when the last subscriber for the document leaves, persists and unloads it from memory (only when an ‘on_load` is configured to bring it back; otherwise the in-memory document is the only copy and is kept). Prevents a long-running server from accumulating every document it has ever served.
213 214 215 216 217 218 219 220 221 222 |
# File 'lib/yrb_lite/action_cable/sync.rb', line 213 def sync_unsubscribed(key = nil) @sync_key = key.to_s if key return if self.class.sync_backend == :store # nothing cached per process sync_clear_presence saver = self.class.on_save Sync.release(@sync_key, evictable: !self.class.on_load.nil?) do |awareness| saver&.call(@sync_key, awareness.encode_state_as_update) end end |