Module: YrbLite::Sync

Defined in:
lib/yrb_lite/sync.rb

Overview

y-websocket protocol over ActionCable.

Include this module in an ActionCable channel to sync Y.js documents (and awareness/presence) with browser clients. Messages are the standard y-protocols binary messages, base64-encoded in a JSON envelope:

{ "m" => "<base64 bytes>" }              # client -> server
{ "m" => "...", "origin" => "<id>" }     # server -> subscribers

Example:

class DocumentChannel < ApplicationCable::Channel
  include YrbLite::Sync

  on_load { |key| Document.find_by(key: key)&.content }
  on_save { |key, update| Document.find_by(key: key)&.update!(content: update) }

  def subscribed
    sync_for params[:id]
  end

  def receive(data)
    sync_receive(data)
  end

  def unsubscribed
    sync_clear_presence
  end
end

The shared YrbLite::Awareness instances are safe to use from ActionCable’s worker thread pool: the native types are Send + Sync and every operation releases the GVL, so concurrent clients sync in parallel.

Defined Under Namespace

Modules: ClassMethods

Constant Summary collapse

MSG_KIND_DROP =

Validated frame kinds from Awareness#message_kind. A frame only gets a non-DROP kind if it is exactly one well-formed message; anything malformed, truncated, multi-message, or unknown is dropped before it can be processed or relayed.

0
MSG_KIND_SYNC_STEP1 =
1
MSG_KIND_UPDATE =
2
MSG_KIND_AWARENESS =
3
MSG_KIND_AWARENESS_QUERY =
4

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.awareness_for(key, loader = nil) ⇒ Object

Get or create the shared Awareness for a key. Creation (including the on_load callback) is serialized under a mutex so concurrent subscribers can never observe two documents for one key; all subsequent operations run lock-free on the thread-safe native types.



386
387
388
389
390
391
392
393
394
395
396
# File 'lib/yrb_lite/sync.rb', line 386

def awareness_for(key, loader = nil)
  @registry_mutex.synchronize do
    @registry[key] ||= begin
      awareness = YrbLite::Awareness.new
      if loader && (state = loader.call(key))
        awareness.apply_update(state)
      end
      awareness
    end
  end
end

.codecObject

A shared, stateless decoder for the store-backed path. message_kind and update_from_message only read their argument (they don’t touch the instance’s document), so one shared instance is safe across threads.



378
379
380
# File 'lib/yrb_lite/sync.rb', line 378

def codec
  @codec ||= YrbLite::Awareness.new
end

.included(base) ⇒ Object



50
51
52
# File 'lib/yrb_lite/sync.rb', line 50

def self.included(base)
  base.extend(ClassMethods)
end

.lock_for(key) ⇒ Object

Per-document mutex serializing the authoritative record -> apply -> broadcast section, so a document’s audit log is a single total order. Only briefly holds the registry mutex to fetch/create the lock; the durable write itself runs while holding only this per-key lock.



402
403
404
# File 'lib/yrb_lite/sync.rb', line 402

def lock_for(key)
  @registry_mutex.synchronize { @locks[key] ||= Mutex.new }
end

.process_idObject

A stable id for this server process, stamped on every broadcast so other processes know to apply it to their replica and this process knows to skip its own. Survives for the life of the process.



371
372
373
# File 'lib/yrb_lite/sync.rb', line 371

def process_id
  @process_id ||= SecureRandom.hex(8)
end

.registryObject



442
443
444
# File 'lib/yrb_lite/sync.rb', line 442

def registry
  @registry_mutex.synchronize { @registry.dup }
end

.release(key, evictable:) {|awareness| ... } ⇒ Object

Drop a subscriber. When the last one leaves and the document is evictable (there’s an on_load to bring it back, so unloading can’t lose data), persist it via the given block and unload it from memory, so a long-running server doesn’t accumulate every document and lock it has ever seen. Returns true if the document was evicted.

The persist runs outside the registry lock (it may do I/O), and we re-check the subscriber count afterward: if someone reconnected while we were saving, eviction is aborted and the warm document is kept.

Yields:

  • (awareness)


420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
# File 'lib/yrb_lite/sync.rb', line 420

def release(key, evictable:)
  awareness = @registry_mutex.synchronize do
    @subscribers[key] -= 1 if @subscribers[key].positive?
    next nil unless @subscribers[key].zero?

    @subscribers.delete(key)
    evictable ? @registry[key] : nil
  end
  return false unless awareness

  yield awareness if block_given?

  @registry_mutex.synchronize do
    # A subscriber may have returned during the persist above.
    next false unless @subscribers[key].zero?

    @subscribers.delete(key)
    @locks.delete(key)
    !@registry.delete(key).nil?
  end
end

.reset!Object

Clear all documents (useful for testing).



447
448
449
450
451
452
453
# File 'lib/yrb_lite/sync.rb', line 447

def reset!
  @registry_mutex.synchronize do
    @registry = {}
    @locks = {}
    @subscribers = Hash.new(0)
  end
end

.subscribe(key) ⇒ Object

Count a new subscriber for a document.



407
408
409
# File 'lib/yrb_lite/sync.rb', line 407

def subscribe(key)
  @registry_mutex.synchronize { @subscribers[key] += 1 }
end

Instance Method Details

#sync_awarenessObject

The shared Awareness (document + presence) for this channel’s key. Also useful for server-side reads, e.g.:

sync_awareness.encode_state_as_update


198
199
200
# File 'lib/yrb_lite/sync.rb', line 198

def sync_awareness
  Sync.awareness_for(@sync_key, self.class.on_load)
end

#sync_clear_presenceObject

Call from ‘unsubscribed`. Clears the presence states this connection introduced and tells the other subscribers to drop those cursors, so a closed tab or dropped socket doesn’t leave a ghost cursor behind until the client-side timeout reaps it.



169
170
171
172
173
174
175
176
177
# File 'lib/yrb_lite/sync.rb', line 169

def sync_clear_presence
  return if @sync_clients.nil? || @sync_clients.empty?

  removal = sync_awareness.remove_clients(@sync_clients)
  @sync_clients = []
  return if removal.empty?

  sync_distribute(Base64.strict_encode64(removal))
end

#sync_for(key) ⇒ Object

Call from ‘subscribed`. Streams broadcasts for this document and transmits the server’s opening handshake (SyncStep1 + awareness).



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/yrb_lite/sync.rb', line 102

def sync_for(key)
  @sync_key = key.to_s
  @sync_origin = SecureRandom.hex(8)
  @sync_clients = [] # awareness client IDs seen on this connection

  return sync_for_store_backed if self.class.sync_backend == :store

  Sync.subscribe(@sync_key)
  awareness = sync_awareness

  stream_from sync_stream_name, coder: ActiveSupport::JSON do |payload|
    sync_on_broadcast(payload)
  end

  # Opening handshake: SyncStep1 then the current awareness, each as its
  # own single-message frame, so providers that parse one message per frame
  # (e.g. @y-rb/actioncable) handle both. The client replies SyncStep2 to
  # the SyncStep1, delivering its state to the server.
  sync_transmit(awareness.sync_step1)
  sync_transmit(awareness.encode_awareness_update)
end

#sync_receive(data, key = nil) ⇒ Object

Call from ‘receive`. Applies the client’s message, replies directly when the protocol calls for it, and relays document/awareness changes to the other subscribers.

If an ‘on_change` recorder is registered, document changes take the strict authoritative path (record -> apply -> broadcast, serialized per document); otherwise the fast path is used.



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/yrb_lite/sync.rb', line 131

def sync_receive(data, key = nil)
  # Pass `key` (params[:id]) when your transport doesn't keep the channel
  # instance alive across actions. Under AnyCable each RPC command gets a
  # fresh channel, so instance variables set in `subscribed` are gone here.
  @sync_key = key.to_s if key

  # Accept both envelope keys: "m" (yrb-lite's own clients) and "update"
  # (the @y-rb/actioncable browser provider).
  m = data.is_a?(Hash) ? (data["m"] || data["update"]) : nil
  return unless m.is_a?(String)

  begin
    bytes = Base64.strict_decode64(m)
  rescue ArgumentError
    return # not valid base64; ignore the frame and keep the connection
  end

  return sync_receive_store_backed(m, bytes) if self.class.sync_backend == :store

  awareness = sync_awareness
  kind = awareness.message_kind(bytes)
  # Malformed / truncated / multi-message / unknown frames are dropped
  # before they can be processed or relayed to other clients.
  return if kind == MSG_KIND_DROP

  sync_track_clients(awareness, bytes) if kind == MSG_KIND_AWARENESS

  if kind == MSG_KIND_UPDATE && self.class.on_change
    sync_apply_authoritative(awareness, m, bytes)
  else
    sync_apply_fast(awareness, m, bytes, kind)
  end
end

#sync_unsubscribed(key = nil) ⇒ Object

Call from ‘unsubscribed`. Clears this connection’s presence and, when the last subscriber for the document leaves, persists and unloads it from memory (only when an ‘on_load` is configured to bring it back; otherwise the in-memory document is the only copy and is kept). Prevents a long-running server from accumulating every document it has ever served.



184
185
186
187
188
189
190
191
192
193
# File 'lib/yrb_lite/sync.rb', line 184

def sync_unsubscribed(key = nil)
  @sync_key = key.to_s if key
  return if self.class.sync_backend == :store # nothing cached per process

  sync_clear_presence
  saver = self.class.on_save
  Sync.release(@sync_key, evictable: !self.class.on_load.nil?) do |awareness|
    saver&.call(@sync_key, awareness.encode_state_as_update)
  end
end