Class: Supabase::Realtime::Sockets::AsyncWebsocket
- Inherits:
-
Object
- Object
- Supabase::Realtime::Sockets::AsyncWebsocket
show all
- Includes:
- Supabase::Realtime::Socket
- Defined in:
- lib/supabase/realtime/sockets/async_websocket.rb
Overview
Supabase::Realtime::Socket implementation backed by the async-websocket gem (socketry/async).
Unlike WebsocketClientSimple — which spawns a background OS thread for the read loop — this adapter runs entirely inside the calling fiber’s Async reactor. Pick it when:
- your app already runs on the socketry/async stack (Falcon, or
supabase-rb's own async REST clients via async-http-faraday), so
a single reactor owns all I/O;
- you want cooperative concurrency without cross-thread callback
hops or mutexes on listener state.
require "async"
require "supabase/realtime"
require "supabase/realtime/sockets/async_websocket"
Async do
socket = Supabase::Realtime::Sockets::AsyncWebsocket.new(url: ws_url)
client = Supabase::Realtime::Client.new(url: ws_url, socket: socket)
client.connect
channel = client.channel("realtime:public:users")
channel.on_postgres_changes("INSERT", schema: "public", table: "users") { |p| puts p }
channel.subscribe
end
All callbacks (on_open / on_message / on_close / on_error, and every downstream channel listener) run inside the Async reactor on the same fiber tree as the caller — no thread hops, no mutexes required for state owned by the reactor.
Instance Method Summary
collapse
#close_callbacks, #error_callbacks, #message_callbacks, #on_close, #on_error, #on_message, #on_open, #open_callbacks
Constructor Details
#initialize(url:, headers: {}, parent: nil, connector: ::Async::WebSocket::Client) ⇒ AsyncWebsocket
Returns a new instance of AsyncWebsocket.
55
56
57
58
59
60
61
62
|
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 55
def initialize(url:, headers: {}, parent: nil, connector: ::Async::WebSocket::Client)
@url = url
@headers =
@parent = parent
@connector = connector
@connection = nil
@session = nil
end
|
Instance Method Details
#close ⇒ Object
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 99
def close
conn = @connection
session = @session
@connection = nil
@session = nil
begin
conn&.close
rescue StandardError
end
session&.stop
end
|
#connect ⇒ Object
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 64
def connect
return if connected?
parent = @parent || ::Async::Task.current?
unless parent
raise Errors::RealtimeError,
"Supabase::Realtime::Sockets::AsyncWebsocket#connect must run inside an Async { ... } block " \
"(or be constructed with parent:)"
end
endpoint = ::Async::HTTP::Endpoint.parse(@url)
ready = ::Async::Promise.new
@session = parent.async do
@connector.connect(endpoint, headers: ) do |connection|
@connection = connection
fire_open
ready.resolve(true)
read_loop(connection)
end
rescue => err
fire_error(err)
ready.reject(err) unless ready.resolved?
ensure
fire_close
end
ready.wait
nil
end
|
#connected? ⇒ Boolean
130
131
132
|
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 130
def connected?
!@connection.nil?
end
|
#fire_close ⇒ Object
144
145
146
147
148
149
|
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 144
def fire_close
return if @connection.nil? && close_callbacks.empty?
@connection = nil
close_callbacks.each(&:call)
end
|
#fire_error(err) ⇒ Object
151
152
153
|
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 151
def fire_error(err)
error_callbacks.each { |cb| cb.call(err) }
end
|
#fire_message(payload) ⇒ Object
140
141
142
|
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 140
def fire_message(payload)
message_callbacks.each { |cb| cb.call(payload) }
end
|
#fire_open ⇒ Object
—– Internal callback fan-outs (public so the read task can reach them) —–
136
137
138
|
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 136
def fire_open
open_callbacks.each(&:call)
end
|
#send(payload) ⇒ Object
120
121
122
123
124
125
126
127
128
|
# File 'lib/supabase/realtime/sockets/async_websocket.rb', line 120
def send(payload)
conn = @connection
return unless conn
conn.write(payload)
conn.flush
end
|