Module: YrbLite::Sync
- Defined in:
- lib/yrb_lite/sync.rb
Overview
y-websocket protocol over ActionCable.
Include this module in an ActionCable channel to sync Y.js documents (and awareness/presence) with browser clients. Messages are the standard y-protocols binary messages, base64-encoded in a JSON envelope:
{ "m" => "<base64 bytes>" } # client -> server
{ "m" => "...", "origin" => "<id>" } # server -> subscribers
Example:
class DocumentChannel < ApplicationCable::Channel
include YrbLite::Sync
on_load { |key| Document.find_by(key: key)&.content }
on_save { |key, update| Document.find_by(key: key)&.update!(content: update) }
def subscribed
sync_for params[:id]
end
def receive(data)
sync_receive(data)
end
def unsubscribed
sync_clear_presence
end
end
The shared YrbLite::Awareness instances are safe to use from ActionCable’s worker thread pool: the native types are Send + Sync and every operation releases the GVL, so concurrent clients sync in parallel.
Defined Under Namespace
Modules: ClassMethods
Constant Summary collapse
- MSG_KIND_DROP =
Validated frame kinds from Awareness#message_kind. A frame only gets a non-DROP kind if it is exactly one well-formed message; anything malformed, truncated, multi-message, or unknown is dropped before it can be processed or relayed.
0- MSG_KIND_SYNC_STEP1 =
1- MSG_KIND_UPDATE =
2- MSG_KIND_AWARENESS =
3- MSG_KIND_AWARENESS_QUERY =
4
Class Method Summary collapse
-
.awareness_for(key, loader = nil) ⇒ Object
Get or create the shared Awareness for a key.
-
.codec ⇒ Object
A shared, stateless decoder for the store-backed path.
- .included(base) ⇒ Object
-
.lock_for(key) ⇒ Object
Per-document mutex serializing the authoritative record -> apply -> broadcast section, so a document’s audit log is a single total order.
-
.process_id ⇒ Object
A stable id for this server process, stamped on every broadcast so other processes know to apply it to their replica and this process knows to skip its own.
- .registry ⇒ Object
-
.release(key, evictable:) {|awareness| ... } ⇒ Object
Drop a subscriber.
-
.reset! ⇒ Object
Clear all documents (useful for testing).
-
.subscribe(key) ⇒ Object
Count a new subscriber for a document.
Instance Method Summary collapse
-
#sync_awareness ⇒ Object
The shared Awareness (document + presence) for this channel’s key.
-
#sync_clear_presence ⇒ Object
Call from ‘unsubscribed`.
-
#sync_for(key) ⇒ Object
Call from ‘subscribed`.
-
#sync_receive(data, key = nil) ⇒ Object
Call from ‘receive`.
-
#sync_unsubscribed(key = nil) ⇒ Object
Call from ‘unsubscribed`.
Class Method Details
.awareness_for(key, loader = nil) ⇒ Object
Get or create the shared Awareness for a key. Creation (including the on_load callback) is serialized under a mutex so concurrent subscribers can never observe two documents for one key; all subsequent operations run lock-free on the thread-safe native types.
386 387 388 389 390 391 392 393 394 395 396 |
# File 'lib/yrb_lite/sync.rb', line 386 def awareness_for(key, loader = nil) @registry_mutex.synchronize do @registry[key] ||= begin awareness = YrbLite::Awareness.new if loader && (state = loader.call(key)) awareness.apply_update(state) end awareness end end end |
.codec ⇒ Object
A shared, stateless decoder for the store-backed path. message_kind and update_from_message only read their argument (they don’t touch the instance’s document), so one shared instance is safe across threads.
378 379 380 |
# File 'lib/yrb_lite/sync.rb', line 378 def codec @codec ||= YrbLite::Awareness.new end |
.included(base) ⇒ Object
50 51 52 |
# File 'lib/yrb_lite/sync.rb', line 50 def self.included(base) base.extend(ClassMethods) end |
.lock_for(key) ⇒ Object
Per-document mutex serializing the authoritative record -> apply -> broadcast section, so a document’s audit log is a single total order. Only briefly holds the registry mutex to fetch/create the lock; the durable write itself runs while holding only this per-key lock.
402 403 404 |
# File 'lib/yrb_lite/sync.rb', line 402 def lock_for(key) @registry_mutex.synchronize { @locks[key] ||= Mutex.new } end |
.process_id ⇒ Object
A stable id for this server process, stamped on every broadcast so other processes know to apply it to their replica and this process knows to skip its own. Survives for the life of the process.
371 372 373 |
# File 'lib/yrb_lite/sync.rb', line 371 def process_id @process_id ||= SecureRandom.hex(8) end |
.registry ⇒ Object
442 443 444 |
# File 'lib/yrb_lite/sync.rb', line 442 def registry @registry_mutex.synchronize { @registry.dup } end |
.release(key, evictable:) {|awareness| ... } ⇒ Object
Drop a subscriber. When the last one leaves and the document is evictable (there’s an on_load to bring it back, so unloading can’t lose data), persist it via the given block and unload it from memory, so a long-running server doesn’t accumulate every document and lock it has ever seen. Returns true if the document was evicted.
The persist runs outside the registry lock (it may do I/O), and we re-check the subscriber count afterward: if someone reconnected while we were saving, eviction is aborted and the warm document is kept.
420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 |
# File 'lib/yrb_lite/sync.rb', line 420 def release(key, evictable:) awareness = @registry_mutex.synchronize do @subscribers[key] -= 1 if @subscribers[key].positive? next nil unless @subscribers[key].zero? @subscribers.delete(key) evictable ? @registry[key] : nil end return false unless awareness yield awareness if block_given? @registry_mutex.synchronize do # A subscriber may have returned during the persist above. next false unless @subscribers[key].zero? @subscribers.delete(key) @locks.delete(key) !@registry.delete(key).nil? end end |
.reset! ⇒ Object
Clear all documents (useful for testing).
447 448 449 450 451 452 453 |
# File 'lib/yrb_lite/sync.rb', line 447 def reset! @registry_mutex.synchronize do @registry = {} @locks = {} @subscribers = Hash.new(0) end end |
.subscribe(key) ⇒ Object
Count a new subscriber for a document.
407 408 409 |
# File 'lib/yrb_lite/sync.rb', line 407 def subscribe(key) @registry_mutex.synchronize { @subscribers[key] += 1 } end |
Instance Method Details
#sync_awareness ⇒ Object
The shared Awareness (document + presence) for this channel’s key. Also useful for server-side reads, e.g.:
sync_awareness.encode_state_as_update
198 199 200 |
# File 'lib/yrb_lite/sync.rb', line 198 def sync_awareness Sync.awareness_for(@sync_key, self.class.on_load) end |
#sync_clear_presence ⇒ Object
Call from ‘unsubscribed`. Clears the presence states this connection introduced and tells the other subscribers to drop those cursors, so a closed tab or dropped socket doesn’t leave a ghost cursor behind until the client-side timeout reaps it.
169 170 171 172 173 174 175 176 177 |
# File 'lib/yrb_lite/sync.rb', line 169 def sync_clear_presence return if @sync_clients.nil? || @sync_clients.empty? removal = sync_awareness.remove_clients(@sync_clients) @sync_clients = [] return if removal.empty? sync_distribute(Base64.strict_encode64(removal)) end |
#sync_for(key) ⇒ Object
Call from ‘subscribed`. Streams broadcasts for this document and transmits the server’s opening handshake (SyncStep1 + awareness).
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/yrb_lite/sync.rb', line 102 def sync_for(key) @sync_key = key.to_s @sync_origin = SecureRandom.hex(8) @sync_clients = [] # awareness client IDs seen on this connection return sync_for_store_backed if self.class.sync_backend == :store Sync.subscribe(@sync_key) awareness = sync_awareness stream_from sync_stream_name, coder: ActiveSupport::JSON do |payload| sync_on_broadcast(payload) end # Opening handshake: SyncStep1 then the current awareness, each as its # own single-message frame, so providers that parse one message per frame # (e.g. @y-rb/actioncable) handle both. The client replies SyncStep2 to # the SyncStep1, delivering its state to the server. sync_transmit(awareness.sync_step1) sync_transmit(awareness.encode_awareness_update) end |
#sync_receive(data, key = nil) ⇒ Object
Call from ‘receive`. Applies the client’s message, replies directly when the protocol calls for it, and relays document/awareness changes to the other subscribers.
If an ‘on_change` recorder is registered, document changes take the strict authoritative path (record -> apply -> broadcast, serialized per document); otherwise the fast path is used.
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/yrb_lite/sync.rb', line 131 def sync_receive(data, key = nil) # Pass `key` (params[:id]) when your transport doesn't keep the channel # instance alive across actions. Under AnyCable each RPC command gets a # fresh channel, so instance variables set in `subscribed` are gone here. @sync_key = key.to_s if key # Accept both envelope keys: "m" (yrb-lite's own clients) and "update" # (the @y-rb/actioncable browser provider). m = data.is_a?(Hash) ? (data["m"] || data["update"]) : nil return unless m.is_a?(String) begin bytes = Base64.strict_decode64(m) rescue ArgumentError return # not valid base64; ignore the frame and keep the connection end return sync_receive_store_backed(m, bytes) if self.class.sync_backend == :store awareness = sync_awareness kind = awareness.(bytes) # Malformed / truncated / multi-message / unknown frames are dropped # before they can be processed or relayed to other clients. return if kind == MSG_KIND_DROP sync_track_clients(awareness, bytes) if kind == MSG_KIND_AWARENESS if kind == MSG_KIND_UPDATE && self.class.on_change (awareness, m, bytes) else sync_apply_fast(awareness, m, bytes, kind) end end |
#sync_unsubscribed(key = nil) ⇒ Object
Call from ‘unsubscribed`. Clears this connection’s presence and, when the last subscriber for the document leaves, persists and unloads it from memory (only when an ‘on_load` is configured to bring it back; otherwise the in-memory document is the only copy and is kept). Prevents a long-running server from accumulating every document it has ever served.
184 185 186 187 188 189 190 191 192 193 |
# File 'lib/yrb_lite/sync.rb', line 184 def sync_unsubscribed(key = nil) @sync_key = key.to_s if key return if self.class.sync_backend == :store # nothing cached per process sync_clear_presence saver = self.class.on_save Sync.release(@sync_key, evictable: !self.class.on_load.nil?) do |awareness| saver&.call(@sync_key, awareness.encode_state_as_update) end end |