Module: Tep::Broadcast
- Defined in:
- lib/tep/broadcast.rb
Class Method Summary collapse
-
.clear ⇒ Object
Drop every subscription.
- .deliver_wire_local(wire) ⇒ Object
- .disable_pg_backend ⇒ Object
-
.enable_pg_backend(conninfo, channel) ⇒ Object
—- PG backend (cross-worker pub/sub) —-.
-
.encode_wire(topic, payload) ⇒ Object
Wire format: “<topic_byte_length>:<topic><payload>”.
-
.poll_pg_once(timeout_ms) ⇒ Object
Process one notification from the PG channel: parse the wire format, dispatch to local subscribers as if ‘publish` had been called locally (but WITHOUT re-NOTIFYing – that would loop).
-
.publish(topic, payload) ⇒ Object
Write ‘payload` to every subscribed fd for `topic`.
-
.publish_local_only(topic, payload) ⇒ Object
Same fan-out as #publish but skips the PG NOTIFY step.
-
.subscribe(topic, fd) ⇒ Object
Register a subscription for ‘fd` on `topic`.
-
.subscribe_ws(topic, fd) ⇒ Object
WebSocket-bridged variant of subscribe.
-
.subscriber_count ⇒ Object
Total subscription count across all topics.
-
.subscribers_for(topic) ⇒ Object
Count of subscribers for one topic.
-
.unsubscribe(sub_id) ⇒ Object
Drop the subscription at ‘sub_id`.
-
.unsubscribe_fd(fd) ⇒ Object
Drop every subscription whose fd matches.
Class Method Details
.clear ⇒ Object
Drop every subscription. Used by tests between fixtures, and available to apps that need to fully reset (e.g. during graceful shutdown). Returns the count dropped.
149 150 151 152 153 154 155 156 |
# File 'lib/tep/broadcast.rb', line 149 def self.clear subs = Tep::APP.broadcast_subs n = subs.length while subs.length > 0 subs.delete_at(0) end n end |
.deliver_wire_local(wire) ⇒ Object
221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/tep/broadcast.rb', line 221 def self.deliver_wire_local(wire) colon = Tep.str_find(wire, ":", 0) if colon <= 0 return -1 end len_str = wire[0, colon] tlen = len_str.to_i if tlen < 0 || colon + 1 + tlen > wire.length return -1 end topic = wire[colon + 1, tlen] payload = wire[colon + 1 + tlen, wire.length - colon - 1 - tlen] Tep::Broadcast.publish_local_only(topic, payload) end |
.disable_pg_backend ⇒ Object
184 185 186 187 188 189 190 191 192 |
# File 'lib/tep/broadcast.rb', line 184 def self.disable_pg_backend if Tep::APP.broadcast_pg_enabled == 0 return 0 end Tep::APP.broadcast_pg_conn.unlisten(Tep::APP.broadcast_pg_channel) Tep::APP.broadcast_pg_conn.finish Tep::APP.set_broadcast_pg_enabled(0) 0 end |
.enable_pg_backend(conninfo, channel) ⇒ Object
—- PG backend (cross-worker pub/sub) —-
Opens a dedicated PG connection and issues ‘LISTEN <channel>`. Subsequent publishes NOTIFY this channel too – other workers subscribed to the same channel can receive the message via poll_pg_once.
‘conninfo` is the libpq connect string. `channel` must be a safe SQL identifier (e.g. “tep_broadcast”) since it lands inside a LISTEN / NOTIFY command unescaped.
Returns 0 on success, -1 on connection or LISTEN failure.
170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/tep/broadcast.rb', line 170 def self.enable_pg_backend(conninfo, channel) conn = PG::Connection.new(conninfo) if conn.pgh < 0 return -1 end if conn.listen(channel) < 0 return -1 end Tep::APP.set_broadcast_pg_conn(conn) Tep::APP.set_broadcast_pg_channel(channel) Tep::APP.set_broadcast_pg_enabled(1) 0 end |
.encode_wire(topic, payload) ⇒ Object
Wire format: “<topic_byte_length>:<topic><payload>”. Length-prefixed so topics and payloads with arbitrary chars (commas, colons, embedded quotes, newlines) round-trip unambiguously. Encoded by ‘publish` when the PG backend is enabled; decoded by `deliver_wire_local`.
217 218 219 |
# File 'lib/tep/broadcast.rb', line 217 def self.encode_wire(topic, payload) topic.length.to_s + ":" + topic + payload end |
.poll_pg_once(timeout_ms) ⇒ Object
Process one notification from the PG channel: parse the wire format, dispatch to local subscribers as if ‘publish` had been called locally (but WITHOUT re-NOTIFYing – that would loop). Returns 1 if a notification was processed, 0 on timeout, -1 on connection error or unenabled backend.
199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/tep/broadcast.rb', line 199 def self.poll_pg_once(timeout_ms) if Tep::APP.broadcast_pg_enabled == 0 return -1 end r = Tep::APP.broadcast_pg_conn.poll_notification(timeout_ms) if r != 1 return r end wire = Tep::APP.broadcast_pg_conn.last_notify_payload Tep::Broadcast.deliver_wire_local(wire) 1 end |
.publish(topic, payload) ⇒ Object
Write ‘payload` to every subscribed fd for `topic`. Returns the number of subscriptions matched (NOT the number of successful writes – a closed / bad fd still counts as matched; the underlying sphttp_write_str returns -1 silently on that fd). Apps that need delivery confirmation should track their own ack channel.
When the PG backend is enabled (Tep::Broadcast.enable_pg_backend), publish ALSO NOTIFY’s the configured channel so other workers subscribed via poll_pg_once can deliver to their local subscribers. Match count returned is the LOCAL match count; remote deliveries are best-effort and not counted here.
115 116 117 118 119 120 121 122 123 |
# File 'lib/tep/broadcast.rb', line 115 def self.publish(topic, payload) matched = Tep::Broadcast.publish_local_only(topic, payload) if Tep::APP.broadcast_pg_enabled != 0 wire = Tep::Broadcast.encode_wire(topic, payload) Tep::APP.broadcast_pg_conn.notify( Tep::APP.broadcast_pg_channel, wire) end matched end |
.publish_local_only(topic, payload) ⇒ Object
Same fan-out as #publish but skips the PG NOTIFY step. Used internally by poll_pg_once when delivering a cross-worker message that already came in via PG – re-NOTIFY would cause an infinite loop.
Branches on each subscription’s ‘mode`:
* mode 0 -> raw bytes via Sock.sphttp_write_str (default,
for SSE / log fan-out / non-framed consumers).
* mode != 0 -> WebSocket frame via Tep::WebSocket::Driver.send_frame,
using the mode value as the WS opcode (1=TEXT, 2=BINARY).
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 |
# File 'lib/tep/broadcast.rb', line 246 def self.publish_local_only(topic, payload) subs = Tep::APP.broadcast_subs matched = 0 i = 0 while i < subs.length if subs[i].topic == topic if subs[i].mode == 0 Sock.sphttp_write_str(subs[i].fd, payload) else Tep::WebSocket::Driver.send_frame( subs[i].fd, subs[i].mode, payload) end matched += 1 end i += 1 end matched end |
.subscribe(topic, fd) ⇒ Object
Register a subscription for ‘fd` on `topic`. Returns an opaque sub_id for later unsubscribe. The fd receives raw bytes on publish – suits SSE / log fan-out / anything that doesn’t need WebSocket framing. For WS connections, prefer subscribe_ws.
45 46 47 48 49 50 |
# File 'lib/tep/broadcast.rb', line 45 def self.subscribe(topic, fd) subs = Tep::APP.broadcast_subs sub = Tep::BroadcastSubscription.new(topic, fd, 0) subs.push(sub) subs.length - 1 end |
.subscribe_ws(topic, fd) ⇒ Object
WebSocket-bridged variant of subscribe. The fd is expected to be an established WS connection (typically a Tep::WebSocket::Connection’s #fd). On publish, payload is wrapped in a WS TEXT frame via Tep::WebSocket::Driver before delivery – the peer sees a well-formed WS message, not raw bytes that would close the connection.
Cleanup is automatic: when the WS connection closes, Tep::WebSocket::Connection.dispatch_close runs the user’s on_close handler and then calls unsubscribe_fd(driver.fd), dropping every subscription tied to the closed connection. Apps don’t need to add their own unsubscribe; if they do, the second call just finds 0 matches (harmless).
65 66 67 68 69 70 71 |
# File 'lib/tep/broadcast.rb', line 65 def self.subscribe_ws(topic, fd) subs = Tep::APP.broadcast_subs sub = Tep::BroadcastSubscription.new( topic, fd, Tep::WebSocket::OPCODE_TEXT) subs.push(sub) subs.length - 1 end |
.subscriber_count ⇒ Object
Total subscription count across all topics. Useful for diagnostics and the v1 test surface.
127 128 129 |
# File 'lib/tep/broadcast.rb', line 127 def self.subscriber_count Tep::APP.broadcast_subs.length end |
.subscribers_for(topic) ⇒ Object
Count of subscribers for one topic. O(n) over the registry; acceptable for v1 (n is typically small per worker).
133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/tep/broadcast.rb', line 133 def self.subscribers_for(topic) subs = Tep::APP.broadcast_subs n = 0 i = 0 while i < subs.length if subs[i].topic == topic n += 1 end i += 1 end n end |
.unsubscribe(sub_id) ⇒ Object
Drop the subscription at ‘sub_id`. Note that ids are registry indexes; subsequent drops shift everything past it downward. For multi-sub drop, prefer `unsubscribe_fd`.
76 77 78 79 80 81 82 83 |
# File 'lib/tep/broadcast.rb', line 76 def self.unsubscribe(sub_id) subs = Tep::APP.broadcast_subs if sub_id < 0 || sub_id >= subs.length return 0 end subs.delete_at(sub_id) 0 end |
.unsubscribe_fd(fd) ⇒ Object
Drop every subscription whose fd matches. Returns the count dropped. Used by WS on-close to clean up everything a closing connection had subscribed to. Back-to-front so delete_at indices stay valid mid-loop.
89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/tep/broadcast.rb', line 89 def self.unsubscribe_fd(fd) subs = Tep::APP.broadcast_subs dropped = 0 i = subs.length - 1 while i >= 0 if subs[i].fd == fd subs.delete_at(i) dropped += 1 end i -= 1 end dropped end |