Class: Supabase::Realtime::Sockets::AsyncWebsocket

Inherits:
Object
  • Object
show all
Includes:
Supabase::Realtime::Socket
Defined in:
lib/supabase/realtime/sockets/async_websocket.rb

Overview

Supabase::Realtime::Socket implementation backed by the async-websocket gem (socketry/async).

Unlike WebsocketClientSimple — which spawns a background OS thread for the read loop — this adapter runs entirely inside the calling fiber’s Async reactor. Pick it when:

- your app already runs on the socketry/async stack (Falcon, or
  supabase-rb's own async REST clients via async-http-faraday), so
  a single reactor owns all I/O;
- you want cooperative concurrency without cross-thread callback
  hops or mutexes on listener state.

require "async"
require "supabase/realtime"
require "supabase/realtime/sockets/async_websocket"

Async do
  socket  = Supabase::Realtime::Sockets::AsyncWebsocket.new(url: ws_url)
  client  = Supabase::Realtime::Client.new(url: ws_url, socket: socket)
  client.connect

  channel = client.channel("realtime:public:users")
  channel.on_postgres_changes("INSERT", schema: "public", table: "users") { |p| puts p }
  channel.subscribe
end

All callbacks (on_open / on_message / on_close / on_error, and every downstream channel listener) run inside the Async reactor on the same fiber tree as the caller — no thread hops, no mutexes required for state owned by the reactor.

Instance Method Summary collapse

Methods included from Supabase::Realtime::Socket

#close_callbacks, #error_callbacks, #message_callbacks, #on_close, #on_error, #on_message, #on_open, #open_callbacks

Constructor Details

#initialize(url:, headers: {}, parent: nil, connector: ::Async::WebSocket::Client) ⇒ AsyncWebsocket

Returns a new instance of AsyncWebsocket.

Parameters:

  • url (String)

    ws(s):// URL including query params

  • headers (Hash) (defaults to: {})

    extra HTTP headers sent on the upgrade request

  • parent (Async::Task, nil) (defaults to: nil)

    reactor task to attach the session to. Defaults to Async::Task.current? resolved at connect-time, so callers must be inside an ‘Async { … }` block.

  • connector (#connect) (defaults to: ::Async::WebSocket::Client)

    dependency-injection seam — defaults to Async::WebSocket::Client. Tests pass a fake that returns a stub connection so no real socket is opened.



55
56
57
58
59
60
61
62
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 55

def initialize(url:, headers: {}, parent: nil, connector: ::Async::WebSocket::Client)
  @url        = url
  @headers    = headers
  @parent     = parent
  @connector  = connector
  @connection = nil
  @session    = nil
end

Instance Method Details

#closeObject



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 99

def close
  conn    = @connection
  session = @session
  @connection = nil
  @session    = nil

  # Closing the connection makes #read return nil → read_loop exits →
  # the session task terminates naturally. This is more reliable than
  # task.stop, which doesn't always interrupt a fiber blocked on a
  # non-IO suspend point (queues, notifications).
  begin
    conn&.close
  rescue StandardError
    # Connection may already be torn down — ignore.
  end

  # Belt-and-braces: if the connection didn't unblock the read, stop
  # the task as a fallback. No-op if it already finished.
  session&.stop
end

#connectObject



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 64

def connect
  return if connected?

  parent = @parent || ::Async::Task.current?
  unless parent
    raise Errors::RealtimeError,
          "Supabase::Realtime::Sockets::AsyncWebsocket#connect must run inside an Async { ... } block " \
          "(or be constructed with parent:)"
  end

  endpoint = ::Async::HTTP::Endpoint.parse(@url)
  ready    = ::Async::Promise.new

  @session = parent.async do
    @connector.connect(endpoint, headers: header_pairs) do |connection|
      @connection = connection
      fire_open
      ready.resolve(true)

      read_loop(connection)
    end
  rescue => err
    fire_error(err)
    ready.reject(err) unless ready.resolved?
  ensure
    fire_close
  end

  # Cooperative wait — Promise buffers the resolution, so this returns
  # immediately whether the session task got there first or not. After
  # this, callers can rely on connected?.
  ready.wait
  nil
end

#connected?Boolean

Returns:

  • (Boolean)


130
131
132
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 130

def connected?
  !@connection.nil?
end

#fire_closeObject



144
145
146
147
148
149
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 144

def fire_close
  return if @connection.nil? && close_callbacks.empty?

  @connection = nil
  close_callbacks.each(&:call)
end

#fire_error(err) ⇒ Object



151
152
153
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 151

def fire_error(err)
  error_callbacks.each { |cb| cb.call(err) }
end

#fire_message(payload) ⇒ Object



140
141
142
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 140

def fire_message(payload)
  message_callbacks.each { |cb| cb.call(payload) }
end

#fire_openObject

—– Internal callback fan-outs (public so the read task can reach them) —–



136
137
138
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 136

def fire_open
  open_callbacks.each(&:call)
end

#send(payload) ⇒ Object



120
121
122
123
124
125
126
127
128
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 120

def send(payload)
  conn = @connection
  return unless conn

  # Connection#write auto-wraps UTF-8 strings in a text frame, which is
  # what the Phoenix protocol expects.
  conn.write(payload)
  conn.flush
end