Class: Supabase::Realtime::Sockets::AsyncWebsocket
- Inherits:
-
Object
- Object
- Supabase::Realtime::Sockets::AsyncWebsocket
show all
- Includes:
- Supabase::Realtime::Socket
- Defined in:
- lib/supabase/realtime/sockets/async_websocket.rb
Overview
Supabase::Realtime::Socket implementation backed by the async-websocket gem (socketry/async).
Unlike WebsocketClientSimple — which spawns a background OS thread for the read loop — this adapter runs entirely inside the calling fiber’s Async reactor. Pick it when:
- your app already runs on the socketry/async stack (Falcon, or
supabase-rb's own async REST clients via async-http-faraday), so
a single reactor owns all I/O;
- you want cooperative concurrency without cross-thread callback
hops or mutexes on listener state.
require "async"
require "supabase/realtime"
require "supabase/realtime/sockets/async_websocket"
Async do
socket = Supabase::Realtime::Sockets::AsyncWebsocket.new(url: ws_url)
client = Supabase::Realtime::Client.new(url: ws_url, socket: socket)
client.connect
channel = client.channel("realtime:public:users")
channel.on_postgres_changes("INSERT", schema: "public", table: "users") { |p| puts p }
channel.subscribe
end
All callbacks (on_open / on_message / on_close / on_error, and every downstream channel listener) run inside the Async reactor on the same fiber tree as the caller — no thread hops, no mutexes required for state owned by the reactor.
Instance Method Summary
collapse
#close_callbacks, #error_callbacks, #message_callbacks, #on_close, #on_error, #on_message, #on_open, #open_callbacks
Constructor Details
#initialize(url:, headers: {}, parent: nil, connector: ::Async::WebSocket::Client) ⇒ AsyncWebsocket
Returns a new instance of AsyncWebsocket.
56
57
58
59
60
61
62
63
|
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 56
def initialize(url:, headers: {}, parent: nil, connector: ::Async::WebSocket::Client)
@url = url
@headers =
@parent = parent
@connector = connector
@connection = nil
@session = nil
end
|
Instance Method Details
#close ⇒ Object
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 107
def close
conn = @connection
session = @session
@connection = nil
@session = nil
begin
conn&.close
rescue StandardError
end
session&.stop
end
|
#connect ⇒ Object
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 65
def connect
return if connected?
parent = @parent || ::Async::Task.current?
unless parent
raise Errors::RealtimeError,
"Supabase::Realtime::Sockets::AsyncWebsocket#connect must run inside an Async { ... } block " \
"(or be constructed with parent:)"
end
endpoint = ::Async::HTTP::Endpoint.parse(@url)
ready = ::Async::Variable.new
@session = parent.async do
@connector.connect(endpoint, headers: ) do |connection|
@connection = connection
fire_open
ready.resolve(true)
read_loop(connection)
end
rescue => err
fire_error(err)
ready.resolve(err) unless ready.resolved?
ensure
fire_close
end
outcome = ready.wait
raise outcome if outcome.is_a?(Exception)
nil
end
|
#connected? ⇒ Boolean
138
139
140
|
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 138
def connected?
!@connection.nil?
end
|
#fire_close ⇒ Object
152
153
154
155
156
157
|
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 152
def fire_close
return if @connection.nil? && close_callbacks.empty?
@connection = nil
close_callbacks.each(&:call)
end
|
#fire_error(err) ⇒ Object
159
160
161
|
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 159
def fire_error(err)
error_callbacks.each { |cb| cb.call(err) }
end
|
#fire_message(payload) ⇒ Object
148
149
150
|
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 148
def fire_message(payload)
message_callbacks.each { |cb| cb.call(payload) }
end
|
#fire_open ⇒ Object
—– Internal callback fan-outs (public so the read task can reach them) —–
144
145
146
|
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 144
def fire_open
open_callbacks.each(&:call)
end
|
#send(payload) ⇒ Object
128
129
130
131
132
133
134
135
136
|
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 128
def send(payload)
conn = @connection
return unless conn
conn.write(payload)
conn.flush
end
|