Module: YrbLite::Sync

Defined in:
lib/yrb_lite/sync.rb

Overview

y-websocket protocol over ActionCable.

Include this module in an ActionCable channel to sync Y.js documents (and awareness/presence) with browser clients. Messages are the standard y-protocols binary messages, base64-encoded in a JSON envelope:

{ "m" => "<base64 bytes>" }              # client -> server
{ "m" => "...", "origin" => "<id>" }     # server -> subscribers

Example:

class DocumentChannel < ApplicationCable::Channel
  include YrbLite::Sync

  on_load { |key| Document.find_by(key: key)&.content }
  on_save { |key, update| Document.find_by(key: key)&.update!(content: update) }

  def subscribed
    sync_for params[:id]
  end

  def receive(data)
    sync_receive(data)
  end

  def unsubscribed
    sync_clear_presence
  end
end

The shared YrbLite::Awareness instances are safe to use from ActionCable’s worker thread pool: the native types are Send + Sync and every operation releases the GVL, so concurrent clients sync in parallel.

Defined Under Namespace

Modules: ClassMethods

Constant Summary collapse

MSG_KIND_DROP =

Validated frame kinds from Awareness#message_kind. A frame only gets a non-DROP kind if it is exactly one well-formed message; anything malformed, truncated, multi-message, or unknown is dropped before it can be processed or relayed.

0
MSG_KIND_SYNC_STEP1 =
1
MSG_KIND_UPDATE =
2
MSG_KIND_AWARENESS =
3
MSG_KIND_AWARENESS_QUERY =
4

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.awareness_for(key, loader = nil) ⇒ Object

Get or create the shared Awareness for a key. Creation (including the on_load callback) is serialized under a mutex so concurrent subscribers can never observe two documents for one key; all subsequent operations run lock-free on the thread-safe native types.



489
490
491
492
493
494
495
496
497
498
499
# File 'lib/yrb_lite/sync.rb', line 489

def awareness_for(key, loader = nil)
  @registry_mutex.synchronize do
    @registry[key] ||= begin
      awareness = YrbLite::Awareness.new
      if loader && (state = loader.call(key))
        awareness.apply_update(state)
      end
      awareness
    end
  end
end

.codecObject

A shared, stateless decoder for the store-backed path. message_kind and update_from_message only read their argument (they don’t touch the instance’s document), so one shared instance is safe across threads.



481
482
483
# File 'lib/yrb_lite/sync.rb', line 481

def codec
  @codec ||= YrbLite::Awareness.new
end

.included(base) ⇒ Object



50
51
52
# File 'lib/yrb_lite/sync.rb', line 50

def self.included(base)
  base.extend(ClassMethods)
end

.lock_for(key) ⇒ Object

Per-document mutex serializing the authoritative record -> apply -> broadcast section, so a document’s audit log is a single total order. Only briefly holds the registry mutex to fetch/create the lock; the durable write itself runs while holding only this per-key lock.



505
506
507
# File 'lib/yrb_lite/sync.rb', line 505

def lock_for(key)
  @registry_mutex.synchronize { @locks[key] ||= Mutex.new }
end

.process_idObject

A stable id for this server process, stamped on every broadcast so other processes know to apply it to their replica and this process knows to skip its own. Survives for the life of the process.



474
475
476
# File 'lib/yrb_lite/sync.rb', line 474

def process_id
  @process_id ||= SecureRandom.hex(8)
end

.registryObject



545
546
547
# File 'lib/yrb_lite/sync.rb', line 545

def registry
  @registry_mutex.synchronize { @registry.dup }
end

.release(key, evictable:) {|awareness| ... } ⇒ Object

Drop a subscriber. When the last one leaves and the document is evictable (there’s an on_load to bring it back, so unloading can’t lose data), persist it via the given block and unload it from memory, so a long-running server doesn’t accumulate every document and lock it has ever seen. Returns true if the document was evicted.

The persist runs outside the registry lock (it may do I/O), and we re-check the subscriber count afterward: if someone reconnected while we were saving, eviction is aborted and the warm document is kept.

Yields:

  • (awareness)


523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
# File 'lib/yrb_lite/sync.rb', line 523

def release(key, evictable:)
  awareness = @registry_mutex.synchronize do
    @subscribers[key] -= 1 if @subscribers[key].positive?
    next nil unless @subscribers[key].zero?

    @subscribers.delete(key)
    evictable ? @registry[key] : nil
  end
  return false unless awareness

  yield awareness if block_given?

  @registry_mutex.synchronize do
    # A subscriber may have returned during the persist above.
    next false unless @subscribers[key].zero?

    @subscribers.delete(key)
    @locks.delete(key)
    !@registry.delete(key).nil?
  end
end

.reset!Object

Clear all documents (useful for testing).



550
551
552
553
554
555
556
# File 'lib/yrb_lite/sync.rb', line 550

def reset!
  @registry_mutex.synchronize do
    @registry = {}
    @locks = {}
    @subscribers = Hash.new(0)
  end
end

.subscribe(key) ⇒ Object

Count a new subscriber for a document.



510
511
512
# File 'lib/yrb_lite/sync.rb', line 510

def subscribe(key)
  @registry_mutex.synchronize { @subscribers[key] += 1 }
end

Instance Method Details

#sync_awarenessObject

The shared Awareness (document + presence) for this channel’s key. Also useful for server-side reads, e.g.:

sync_awareness.encode_state_as_update


215
216
217
# File 'lib/yrb_lite/sync.rb', line 215

def sync_awareness
  Sync.awareness_for(@sync_key, self.class.on_load)
end

#sync_clear_presenceObject

Call from ‘unsubscribed`. Clears the presence states this connection introduced and tells the other subscribers to drop those cursors, so a closed tab or dropped socket doesn’t leave a ghost cursor behind until the client-side timeout reaps it.



186
187
188
189
190
191
192
193
194
# File 'lib/yrb_lite/sync.rb', line 186

def sync_clear_presence
  return if @sync_clients.nil? || @sync_clients.empty?

  removal = sync_awareness.remove_clients(@sync_clients)
  @sync_clients = []
  return if removal.empty?

  sync_distribute(Base64.strict_encode64(removal))
end

#sync_dispatch(encoded, bytes) ⇒ Object

Route a decoded frame to the backend/path that handles it and return the outcome symbol (:recorded/:applied/:gap/:noop) used by the reliable- delivery ack. A dropped frame returns nil (never acked).



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/yrb_lite/sync.rb', line 164

def sync_dispatch(encoded, bytes)
  return sync_receive_store_backed(encoded, bytes) if self.class.sync_backend == :store

  awareness = sync_awareness
  kind = awareness.message_kind(bytes)
  # Malformed / truncated / multi-message / unknown frames are dropped
  # before they can be processed or relayed to other clients.
  return if kind == MSG_KIND_DROP

  sync_track_clients(awareness, bytes) if kind == MSG_KIND_AWARENESS

  if kind == MSG_KIND_UPDATE && self.class.on_change
    sync_apply_authoritative(awareness, encoded, bytes)
  else
    sync_apply_fast(awareness, encoded, bytes, kind)
  end
end

#sync_for(key) ⇒ Object

Call from ‘subscribed`. Streams broadcasts for this document and transmits the server’s opening handshake (SyncStep1 + awareness).



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/yrb_lite/sync.rb', line 102

def sync_for(key)
  @sync_key = key.to_s
  @sync_origin = SecureRandom.hex(8)
  @sync_clients = [] # awareness client IDs seen on this connection

  return sync_for_store_backed if self.class.sync_backend == :store

  Sync.subscribe(@sync_key)
  awareness = sync_awareness

  stream_from sync_stream_name, coder: ActiveSupport::JSON do |payload|
    sync_on_broadcast(payload)
  end

  # Opening handshake: SyncStep1 then the current awareness, each as its
  # own single-message frame, so providers that parse one message per frame
  # (e.g. @y-rb/actioncable) handle both. The client replies SyncStep2 to
  # the SyncStep1, delivering its state to the server.
  sync_transmit(awareness.sync_step1)
  sync_transmit(awareness.encode_awareness_update)
end

#sync_receive(data, key = nil) ⇒ Object

Call from ‘receive`. Applies the client’s message, replies directly when the protocol calls for it, and relays document/awareness changes to the other subscribers.

If an ‘on_change` recorder is registered, document changes take the strict authoritative path (record -> apply -> broadcast, serialized per document); otherwise the fast path is used.

Reliable delivery (opt-in, client-driven): if the frame carries an “id”, the server replies ‘{ “ack” => id }` once the update has been accepted (recorded in audit mode, applied in fast mode). A causally-gapped update is not acked – it gets a resync instead – so an ack-aware client knows to retransmit until the update lands. Stock clients send no “id”, never get acks, and are completely unaffected.



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/yrb_lite/sync.rb', line 138

def sync_receive(data, key = nil)
  # Pass `key` (params[:id]) when your transport doesn't keep the channel
  # instance alive across actions. Under AnyCable each RPC command gets a
  # fresh channel, so instance variables set in `subscribed` are gone here.
  @sync_key = key.to_s if key

  # Accept both envelope keys: "m" (yrb-lite's own clients) and "update"
  # (the @y-rb/actioncable browser provider).
  m = data.is_a?(Hash) ? (data["m"] || data["update"]) : nil
  return unless m.is_a?(String)

  # Optional client-supplied id for reliable delivery (see sync_send_ack).
  id = data.is_a?(Hash) ? data["id"] : nil

  begin
    bytes = Base64.strict_decode64(m)
  rescue ArgumentError
    return # not valid base64; ignore the frame and keep the connection
  end

  sync_send_ack(id, sync_dispatch(m, bytes))
end

#sync_unsubscribed(key = nil) ⇒ Object

Call from ‘unsubscribed`. Clears this connection’s presence and, when the last subscriber for the document leaves, persists and unloads it from memory (only when an ‘on_load` is configured to bring it back; otherwise the in-memory document is the only copy and is kept). Prevents a long-running server from accumulating every document it has ever served.



201
202
203
204
205
206
207
208
209
210
# File 'lib/yrb_lite/sync.rb', line 201

def sync_unsubscribed(key = nil)
  @sync_key = key.to_s if key
  return if self.class.sync_backend == :store # nothing cached per process

  sync_clear_presence
  saver = self.class.on_save
  Sync.release(@sync_key, evictable: !self.class.on_load.nil?) do |awareness|
    saver&.call(@sync_key, awareness.encode_state_as_update)
  end
end