Module: Tep::Broadcast
- Defined in:
- lib/tep/pg.rb,
lib/tep/broadcast.rb
Class Method Summary collapse
-
.clear ⇒ Object
Drop every subscription.
-
.cross_worker_notify(topic, payload) ⇒ Object
No-op cross-worker hook.
- .deliver_wire_local(wire) ⇒ Object
- .disable_pg_backend ⇒ Object
- .enable_pg_backend(conninfo, channel) ⇒ Object
- .encode_wire(topic, payload) ⇒ Object
- .poll_pg_once(timeout_ms) ⇒ Object
-
.publish(topic, payload) ⇒ Object
Write ‘payload` to every subscribed fd for `topic`.
-
.publish_local_only(topic, payload) ⇒ Object
Same fan-out as #publish but skips the PG NOTIFY step.
-
.subscribe(topic, fd) ⇒ Object
Register a subscription for ‘fd` on `topic`.
-
.subscribe_ws(topic, fd) ⇒ Object
WebSocket-bridged variant of subscribe.
-
.subscriber_count ⇒ Object
Total subscription count across all topics.
-
.subscribers_for(topic) ⇒ Object
Count of subscribers for one topic.
-
.unsubscribe(sub_id) ⇒ Object
Drop the subscription at ‘sub_id`.
-
.unsubscribe_fd(fd) ⇒ Object
Drop every subscription whose fd matches.
Class Method Details
.clear ⇒ Object
Drop every subscription. Used by tests between fixtures, and available to apps that need to fully reset (e.g. during graceful shutdown). Returns the count dropped.
156 157 158 159 160 161 162 163 |
# File 'lib/tep/broadcast.rb', line 156 def self.clear subs = Tep::APP.broadcast_subs n = subs.length while subs.length > 0 subs.delete_at(0) end n end |
.cross_worker_notify(topic, payload) ⇒ Object
No-op cross-worker hook. Overridden by lib/tep/pg.rb when the PG backend is loaded. Keeping the PG NOTIFY out of core is what lets ‘Tep::Broadcast.publish` compile without pulling tep_pg_*/libpq.
1156 1157 1158 1159 1160 1161 1162 1163 |
# File 'lib/tep/pg.rb', line 1156 def self.cross_worker_notify(topic, payload) if Tep::APP.broadcast_pg_enabled != 0 wire = Tep::Broadcast.encode_wire(topic, payload) Tep::APP.broadcast_pg_conn.notify( Tep::APP.broadcast_pg_channel, wire) end 0 end |
.deliver_wire_local(wire) ⇒ Object
1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 |
# File 'lib/tep/pg.rb', line 1206 def self.deliver_wire_local(wire) colon = Tep.str_find(wire, ":", 0) if colon <= 0 return -1 end len_str = wire[0, colon] tlen = len_str.to_i if tlen < 0 || colon + 1 + tlen > wire.length return -1 end topic = wire[colon + 1, tlen] payload = wire[colon + 1 + tlen, wire.length - colon - 1 - tlen] Tep::Broadcast.publish_local_only(topic, payload) end |
.disable_pg_backend ⇒ Object
1179 1180 1181 1182 1183 1184 1185 1186 1187 |
# File 'lib/tep/pg.rb', line 1179 def self.disable_pg_backend if Tep::APP.broadcast_pg_enabled == 0 return 0 end Tep::APP.broadcast_pg_conn.unlisten(Tep::APP.broadcast_pg_channel) Tep::APP.broadcast_pg_conn.finish Tep::APP.set_broadcast_pg_enabled(0) 0 end |
.enable_pg_backend(conninfo, channel) ⇒ Object
1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 |
# File 'lib/tep/pg.rb', line 1165 def self.enable_pg_backend(conninfo, channel) conn = PG::Connection.new(conninfo) if conn.pgh < 0 return -1 end if conn.listen(channel) < 0 return -1 end Tep::APP.set_broadcast_pg_conn(conn) Tep::APP.set_broadcast_pg_channel(channel) Tep::APP.set_broadcast_pg_enabled(1) 0 end |
.encode_wire(topic, payload) ⇒ Object
1202 1203 1204 |
# File 'lib/tep/pg.rb', line 1202 def self.encode_wire(topic, payload) topic.length.to_s + ":" + topic + payload end |
.poll_pg_once(timeout_ms) ⇒ Object
1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 |
# File 'lib/tep/pg.rb', line 1189 def self.poll_pg_once(timeout_ms) if Tep::APP.broadcast_pg_enabled == 0 return -1 end r = Tep::APP.broadcast_pg_conn.poll_notification(timeout_ms) if r != 1 return r end wire = Tep::APP.broadcast_pg_conn.last_notify_payload Tep::Broadcast.deliver_wire_local(wire) 1 end |
.publish(topic, payload) ⇒ Object
Write ‘payload` to every subscribed fd for `topic`. Returns the number of subscriptions matched (NOT the number of successful writes – a closed / bad fd still counts as matched; the underlying sphttp_write_str returns -1 silently on that fd). Apps that need delivery confirmation should track their own ack channel.
When the PG backend is enabled (Tep::Broadcast.enable_pg_backend), publish ALSO NOTIFY’s the configured channel so other workers subscribed via poll_pg_once can deliver to their local subscribers. Match count returned is the LOCAL match count; remote deliveries are best-effort and not counted here.
115 116 117 118 119 120 121 122 123 |
# File 'lib/tep/broadcast.rb', line 115 def self.publish(topic, payload) matched = Tep::Broadcast.publish_local_only(topic, payload) # Cross-worker fan-out is an OPT-IN hook (#216): the core build # has no PG reference, so a non-PG app DCEs the entire libpq # closure. `require "tep/pg"` redefines cross_worker_notify with # the real broadcast_pg_conn.notify (last-definition-wins). Tep::Broadcast.cross_worker_notify(topic, payload) matched end |
.publish_local_only(topic, payload) ⇒ Object
Same fan-out as #publish but skips the PG NOTIFY step. Used internally by poll_pg_once when delivering a cross-worker message that already came in via PG – re-NOTIFY would cause an infinite loop.
Branches on each subscription’s ‘mode`:
* mode 0 -> raw bytes via Sock.sphttp_write_str (default,
for SSE / log fan-out / non-framed consumers).
* mode != 0 -> WebSocket frame via Tep::WebSocket::Driver.send_frame,
using the mode value as the WS opcode (1=TEXT, 2=BINARY).
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/tep/broadcast.rb', line 184 def self.publish_local_only(topic, payload) subs = Tep::APP.broadcast_subs matched = 0 i = 0 while i < subs.length if subs[i].topic == topic if subs[i].mode == 0 Sock.sphttp_write_str(subs[i].fd, payload) else Tep::WebSocket::Driver.send_frame( subs[i].fd, subs[i].mode, payload) end matched += 1 end i += 1 end matched end |
.subscribe(topic, fd) ⇒ Object
Register a subscription for ‘fd` on `topic`. Returns an opaque sub_id for later unsubscribe. The fd receives raw bytes on publish – suits SSE / log fan-out / anything that doesn’t need WebSocket framing. For WS connections, prefer subscribe_ws.
45 46 47 48 49 50 |
# File 'lib/tep/broadcast.rb', line 45 def self.subscribe(topic, fd) subs = Tep::APP.broadcast_subs sub = Tep::BroadcastSubscription.new(topic, fd, 0) subs.push(sub) subs.length - 1 end |
.subscribe_ws(topic, fd) ⇒ Object
WebSocket-bridged variant of subscribe. The fd is expected to be an established WS connection (typically a Tep::WebSocket::Connection’s #fd). On publish, payload is wrapped in a WS TEXT frame via Tep::WebSocket::Driver before delivery – the peer sees a well-formed WS message, not raw bytes that would close the connection.
Cleanup is automatic: when the WS connection closes, Tep::WebSocket::Connection.dispatch_close runs the user’s on_close handler and then calls unsubscribe_fd(driver.fd), dropping every subscription tied to the closed connection. Apps don’t need to add their own unsubscribe; if they do, the second call just finds 0 matches (harmless).
65 66 67 68 69 70 71 |
# File 'lib/tep/broadcast.rb', line 65 def self.subscribe_ws(topic, fd) subs = Tep::APP.broadcast_subs sub = Tep::BroadcastSubscription.new( topic, fd, Tep::WebSocket::OPCODE_TEXT) subs.push(sub) subs.length - 1 end |
.subscriber_count ⇒ Object
Total subscription count across all topics. Useful for diagnostics and the v1 test surface.
134 135 136 |
# File 'lib/tep/broadcast.rb', line 134 def self.subscriber_count Tep::APP.broadcast_subs.length end |
.subscribers_for(topic) ⇒ Object
Count of subscribers for one topic. O(n) over the registry; acceptable for v1 (n is typically small per worker).
140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/tep/broadcast.rb', line 140 def self.subscribers_for(topic) subs = Tep::APP.broadcast_subs n = 0 i = 0 while i < subs.length if subs[i].topic == topic n += 1 end i += 1 end n end |
.unsubscribe(sub_id) ⇒ Object
Drop the subscription at ‘sub_id`. Note that ids are registry indexes; subsequent drops shift everything past it downward. For multi-sub drop, prefer `unsubscribe_fd`.
76 77 78 79 80 81 82 83 |
# File 'lib/tep/broadcast.rb', line 76 def self.unsubscribe(sub_id) subs = Tep::APP.broadcast_subs if sub_id < 0 || sub_id >= subs.length return 0 end subs.delete_at(sub_id) 0 end |
.unsubscribe_fd(fd) ⇒ Object
Drop every subscription whose fd matches. Returns the count dropped. Used by WS on-close to clean up everything a closing connection had subscribed to. Back-to-front so delete_at indices stay valid mid-loop.
89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/tep/broadcast.rb', line 89 def self.unsubscribe_fd(fd) subs = Tep::APP.broadcast_subs dropped = 0 i = subs.length - 1 while i >= 0 if subs[i].fd == fd subs.delete_at(i) dropped += 1 end i -= 1 end dropped end |