Module: YrbLite::ActionCable::Sync

Defined in:
lib/yrb_lite/action_cable/sync.rb

Overview

y-websocket protocol over ActionCable.

Include this module in an ActionCable channel to sync Y.js documents (and awareness/presence) with browser clients. Messages are the standard y-protocols binary messages, base64-encoded in a JSON envelope:

{ "update" => "<base64 bytes>", "id" => 42 } # client -> server
{ "update" => "<base64 bytes>" }             # server -> subscribers
{ "ack" => 42 }                              # server -> sender

Example:

class DocumentChannel < ApplicationCable::Channel
  include YrbLite::ActionCable::Sync

  on_load { |key| Document.find_by(key: key)&.content }
  # on_change runs in the channel instance's context, so instance methods
  # (current_user, params, ...) are available:
  on_change { |key, update| Document.record!(key, update, by: current_user) }

  def subscribed
    sync_subscribed params[:id]
  end

  def receive(data)
    sync_receive(data)
  end
end

There is no unsubscribe hook: the server keeps no per-connection document or presence state, so a disconnect needs no server-side cleanup.

The concern is store-backed and fail-closed: every document update is validated against ‘on_load`, recorded through `on_change`, then broadcast. No authoritative document state is kept in ActionCable process memory.

Defined Under Namespace

Modules: ClassMethods

Constant Summary collapse

MSG_KIND_SYNC_STEP1 =

Frame kinds we act on, from YrbLite.message_kind. Its other codes (0 for a drop: malformed/truncated/multi-message/unknown, and 4 for an awareness query) fall through to a no-op in the dispatch below.

1
MSG_KIND_UPDATE =
2
MSG_KIND_AWARENESS =
3
DEFAULT_MAX_FRAME_BYTES =

Default incoming-frame size cap (decoded bytes). Generous enough for a large initial SyncStep2, small enough to bound a single message’s allocation/parse cost. Override per channel with ‘max_frame_bytes`.

8 * 1024 * 1024

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object



54
55
56
# File 'lib/yrb_lite/action_cable/sync.rb', line 54

def self.included(base)
  base.extend(ClassMethods)
end

Instance Method Details

#sync_receive(data, key = nil) ⇒ Object

Call from ‘receive`. Applies the client’s message, replies directly when the protocol calls for it, and relays document/awareness changes to the other subscribers.

Reliable delivery: document updates carry an “id”, and the server replies ‘{ “ack” => id }` once the update has been durably recorded. A causally-gapped update is not acked; it gets a resync instead, so the client retransmits until the update lands.



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/yrb_lite/action_cable/sync.rb', line 118

def sync_receive(data, key = nil)
  # Pass `key` (params[:id]) when your transport doesn't keep the channel
  # instance alive across actions. Under AnyCable each RPC command gets a
  # fresh channel, so instance variables set in `subscribed` are gone here.
  @sync_key = key.to_s if key

  encoded = data.is_a?(Hash) ? data["update"] : nil
  return unless encoded.is_a?(String)

  # Optional client-supplied id for reliable delivery (see sync_send_ack).
  # data is known to be a Hash here (encoded came from it above).
  id = data["id"]

  # Frame-size cap: drop oversized frames before decoding (the encoded form
  # is ~4/3 the decoded size) and again after, so a client can't force large
  # base64 decodes / native parses / merges. A dropped frame is never acked,
  # and there is no protocol NACK, so a legitimate oversized update is
  # retransmitted indefinitely. Log the drop so it is at least findable.
  cap = self.class.max_frame_bytes
  if cap && encoded.bytesize > (cap * 4 / 3) + 4
    sync_log_drop(:warn, "encoded #{encoded.bytesize}B exceeds max_frame_bytes #{cap}B", id)
    return
  end

  begin
    bytes = Base64.strict_decode64(encoded)
  rescue ArgumentError
    sync_log_drop(:debug, "not valid base64", id) # garbage or a probe, rarely a real client
    return # ignore the frame and keep the connection
  end

  if cap && bytes.bytesize > cap
    sync_log_drop(:warn, "decoded #{bytes.bytesize}B exceeds max_frame_bytes #{cap}B", id)
    return
  end

  sync_send_ack(id, sync_handle_frame(encoded, bytes))
end

#sync_subscribed(key) ⇒ Object

Call from ‘subscribed`. Streams broadcasts for this document and transmits the server’s opening handshake (SyncStep1 from the store).



98
99
100
101
102
103
104
105
106
107
108
# File 'lib/yrb_lite/action_cable/sync.rb', line 98

def sync_subscribed(key)
  @sync_key = key.to_s
  sync_validate_required_hooks!

  # The document stream is never whisper-enabled; under AnyCable we also
  # subscribe an awareness stream with `whisper: true`, scoping the client-to-
  # client path to ephemeral presence rather than the durable document stream.
  stream_from sync_stream_name
  stream_from sync_awareness_stream_name, whisper: true if respond_to?(:whispers_to)
  sync_transmit(sync_load_doc.sync_step1)
end