Class: Supabase::Realtime::Sockets::AsyncWebsocket

Inherits:
Object
  • Object
show all
Includes:
Supabase::Realtime::Socket
Defined in:
lib/supabase/realtime/sockets/async_websocket.rb

Overview

Supabase::Realtime::Socket implementation backed by the async-websocket gem (socketry/async).

Unlike WebsocketClientSimple — which spawns a background OS thread for the read loop — this adapter runs entirely inside the calling fiber’s Async reactor. Pick it when:

- your app already runs on the socketry/async stack (Falcon, or
  supabase-rb's own async REST clients via async-http-faraday), so
  a single reactor owns all I/O;
- you want cooperative concurrency without cross-thread callback
  hops or mutexes on listener state.

require "async"
require "supabase/realtime"
require "supabase/realtime/sockets/async_websocket"

Async do
  socket  = Supabase::Realtime::Sockets::AsyncWebsocket.new(url: ws_url)
  client  = Supabase::Realtime::Client.new(url: ws_url, socket: socket)
  client.connect

  channel = client.channel("realtime:public:users")
  channel.on_postgres_changes("INSERT", schema: "public", table: "users") { |p| puts p }
  channel.subscribe
end

All callbacks (on_open / on_message / on_close / on_error, and every downstream channel listener) run inside the Async reactor on the same fiber tree as the caller — no thread hops, no mutexes required for state owned by the reactor.

Instance Method Summary collapse

Methods included from Supabase::Realtime::Socket

#close_callbacks, #error_callbacks, #message_callbacks, #on_close, #on_error, #on_message, #on_open, #open_callbacks

Constructor Details

#initialize(url:, headers: {}, parent: nil, connector: ::Async::WebSocket::Client) ⇒ AsyncWebsocket

Returns a new instance of AsyncWebsocket.

Parameters:

  • url (String)

    ws(s):// URL including query params

  • headers (Hash) (defaults to: {})

    extra HTTP headers sent on the upgrade request

  • parent (Async::Task, nil) (defaults to: nil)

    reactor task to attach the session to. Defaults to Async::Task.current? resolved at connect-time, so callers must be inside an ‘Async { … }` block.

  • connector (#connect) (defaults to: ::Async::WebSocket::Client)

    dependency-injection seam — defaults to Async::WebSocket::Client. Tests pass a fake that returns a stub connection so no real socket is opened.



56
57
58
59
60
61
62
63
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 56

def initialize(url:, headers: {}, parent: nil, connector: ::Async::WebSocket::Client)
  @url        = url
  @headers    = headers
  @parent     = parent
  @connector  = connector
  @connection = nil
  @session    = nil
end

Instance Method Details

#closeObject



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 107

def close
  conn    = @connection
  session = @session
  @connection = nil
  @session    = nil

  # Closing the connection makes #read return nil → read_loop exits →
  # the session task terminates naturally. This is more reliable than
  # task.stop, which doesn't always interrupt a fiber blocked on a
  # non-IO suspend point (queues, notifications).
  begin
    conn&.close
  rescue StandardError
    # Connection may already be torn down — ignore.
  end

  # Belt-and-braces: if the connection didn't unblock the read, stop
  # the task as a fallback. No-op if it already finished.
  session&.stop
end

#connectObject



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 65

def connect
  return if connected?

  parent = @parent || ::Async::Task.current?
  unless parent
    raise Errors::RealtimeError,
          "Supabase::Realtime::Sockets::AsyncWebsocket#connect must run inside an Async { ... } block " \
          "(or be constructed with parent:)"
  end

  endpoint = ::Async::HTTP::Endpoint.parse(@url)
  # Async::Variable, not Async::Promise: Promise only exists in newer
  # async releases (Ruby >= 3.2 resolutions), while Variable is
  # available across all async 2.x — and 3.1 resolves async 2.24.
  # Variable has no #reject, so a connect error is resolved as a value
  # and re-raised by the waiting side below.
  ready = ::Async::Variable.new

  @session = parent.async do
    @connector.connect(endpoint, headers: header_pairs) do |connection|
      @connection = connection
      fire_open
      ready.resolve(true)

      read_loop(connection)
    end
  rescue => err
    fire_error(err)
    ready.resolve(err) unless ready.resolved?
  ensure
    fire_close
  end

  # Cooperative wait — Variable buffers the resolution, so this returns
  # immediately whether the session task got there first or not. After
  # this, callers can rely on connected?.
  outcome = ready.wait
  raise outcome if outcome.is_a?(Exception)

  nil
end

#connected?Boolean

Returns:

  • (Boolean)


138
139
140
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 138

def connected?
  !@connection.nil?
end

#fire_closeObject



152
153
154
155
156
157
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 152

def fire_close
  return if @connection.nil? && close_callbacks.empty?

  @connection = nil
  close_callbacks.each(&:call)
end

#fire_error(err) ⇒ Object



159
160
161
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 159

def fire_error(err)
  error_callbacks.each { |cb| cb.call(err) }
end

#fire_message(payload) ⇒ Object



148
149
150
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 148

def fire_message(payload)
  message_callbacks.each { |cb| cb.call(payload) }
end

#fire_openObject

—– Internal callback fan-outs (public so the read task can reach them) —–



144
145
146
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 144

def fire_open
  open_callbacks.each(&:call)
end

#send(payload) ⇒ Object



128
129
130
131
132
133
134
135
136
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 128

def send(payload)
  conn = @connection
  return unless conn

  # Connection#write auto-wraps UTF-8 strings in a text frame, which is
  # what the Phoenix protocol expects.
  conn.write(payload)
  conn.flush
end