Module: Tep::Broadcast

Defined in:
lib/tep/broadcast.rb

Class Method Summary collapse

Class Method Details

.clearObject

Drop every subscription. Used by tests between fixtures, and available to apps that need to fully reset (e.g. during graceful shutdown). Returns the count dropped.



149
150
151
152
153
154
155
156
# File 'lib/tep/broadcast.rb', line 149

def self.clear
  subs = Tep::APP.broadcast_subs
  n = subs.length
  while subs.length > 0
    subs.delete_at(0)
  end
  n
end

.deliver_wire_local(wire) ⇒ Object



221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/tep/broadcast.rb', line 221

def self.deliver_wire_local(wire)
  colon = Tep.str_find(wire, ":", 0)
  if colon <= 0
    return -1
  end
  len_str = wire[0, colon]
  tlen    = len_str.to_i
  if tlen < 0 || colon + 1 + tlen > wire.length
    return -1
  end
  topic   = wire[colon + 1, tlen]
  payload = wire[colon + 1 + tlen, wire.length - colon - 1 - tlen]
  Tep::Broadcast.publish_local_only(topic, payload)
end

.disable_pg_backendObject



184
185
186
187
188
189
190
191
192
# File 'lib/tep/broadcast.rb', line 184

def self.disable_pg_backend
  if Tep::APP.broadcast_pg_enabled == 0
    return 0
  end
  Tep::APP.broadcast_pg_conn.unlisten(Tep::APP.broadcast_pg_channel)
  Tep::APP.broadcast_pg_conn.finish
  Tep::APP.set_broadcast_pg_enabled(0)
  0
end

.enable_pg_backend(conninfo, channel) ⇒ Object

—- PG backend (cross-worker pub/sub) —-

Opens a dedicated PG connection and issues ‘LISTEN <channel>`. Subsequent publishes NOTIFY this channel too – other workers subscribed to the same channel can receive the message via poll_pg_once.

‘conninfo` is the libpq connect string. `channel` must be a safe SQL identifier (e.g. “tep_broadcast”) since it lands inside a LISTEN / NOTIFY command unescaped.

Returns 0 on success, -1 on connection or LISTEN failure.



170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/tep/broadcast.rb', line 170

def self.enable_pg_backend(conninfo, channel)
  conn = PG::Connection.new(conninfo)
  if conn.pgh < 0
    return -1
  end
  if conn.listen(channel) < 0
    return -1
  end
  Tep::APP.set_broadcast_pg_conn(conn)
  Tep::APP.set_broadcast_pg_channel(channel)
  Tep::APP.set_broadcast_pg_enabled(1)
  0
end

.encode_wire(topic, payload) ⇒ Object

Wire format: “<topic_byte_length>:<topic><payload>”. Length-prefixed so topics and payloads with arbitrary chars (commas, colons, embedded quotes, newlines) round-trip unambiguously. Encoded by ‘publish` when the PG backend is enabled; decoded by `deliver_wire_local`.



217
218
219
# File 'lib/tep/broadcast.rb', line 217

def self.encode_wire(topic, payload)
  topic.length.to_s + ":" + topic + payload
end

.poll_pg_once(timeout_ms) ⇒ Object

Process one notification from the PG channel: parse the wire format, dispatch to local subscribers as if ‘publish` had been called locally (but WITHOUT re-NOTIFYing – that would loop). Returns 1 if a notification was processed, 0 on timeout, -1 on connection error or unenabled backend.



199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/tep/broadcast.rb', line 199

def self.poll_pg_once(timeout_ms)
  if Tep::APP.broadcast_pg_enabled == 0
    return -1
  end
  r = Tep::APP.broadcast_pg_conn.poll_notification(timeout_ms)
  if r != 1
    return r
  end
  wire = Tep::APP.broadcast_pg_conn.last_notify_payload
  Tep::Broadcast.deliver_wire_local(wire)
  1
end

.publish(topic, payload) ⇒ Object

Write ‘payload` to every subscribed fd for `topic`. Returns the number of subscriptions matched (NOT the number of successful writes – a closed / bad fd still counts as matched; the underlying sphttp_write_str returns -1 silently on that fd). Apps that need delivery confirmation should track their own ack channel.

When the PG backend is enabled (Tep::Broadcast.enable_pg_backend), publish ALSO NOTIFY’s the configured channel so other workers subscribed via poll_pg_once can deliver to their local subscribers. Match count returned is the LOCAL match count; remote deliveries are best-effort and not counted here.



115
116
117
118
119
120
121
122
123
# File 'lib/tep/broadcast.rb', line 115

def self.publish(topic, payload)
  matched = Tep::Broadcast.publish_local_only(topic, payload)
  if Tep::APP.broadcast_pg_enabled != 0
    wire = Tep::Broadcast.encode_wire(topic, payload)
    Tep::APP.broadcast_pg_conn.notify(
      Tep::APP.broadcast_pg_channel, wire)
  end
  matched
end

.publish_local_only(topic, payload) ⇒ Object

Same fan-out as #publish but skips the PG NOTIFY step. Used internally by poll_pg_once when delivering a cross-worker message that already came in via PG – re-NOTIFY would cause an infinite loop.

Branches on each subscription’s ‘mode`:

* mode 0 -> raw bytes via Sock.sphttp_write_str (default,
  for SSE / log fan-out / non-framed consumers).
* mode != 0 -> WebSocket frame via Tep::WebSocket::Driver.send_frame,
  using the mode value as the WS opcode (1=TEXT, 2=BINARY).


246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/tep/broadcast.rb', line 246

def self.publish_local_only(topic, payload)
  subs = Tep::APP.broadcast_subs
  matched = 0
  i = 0
  while i < subs.length
    if subs[i].topic == topic
      if subs[i].mode == 0
        Sock.sphttp_write_str(subs[i].fd, payload)
      else
        Tep::WebSocket::Driver.send_frame(
          subs[i].fd, subs[i].mode, payload)
      end
      matched += 1
    end
    i += 1
  end
  matched
end

.subscribe(topic, fd) ⇒ Object

Register a subscription for ‘fd` on `topic`. Returns an opaque sub_id for later unsubscribe. The fd receives raw bytes on publish – suits SSE / log fan-out / anything that doesn’t need WebSocket framing. For WS connections, prefer subscribe_ws.



45
46
47
48
49
50
# File 'lib/tep/broadcast.rb', line 45

def self.subscribe(topic, fd)
  subs = Tep::APP.broadcast_subs
  sub = Tep::BroadcastSubscription.new(topic, fd, 0)
  subs.push(sub)
  subs.length - 1
end

.subscribe_ws(topic, fd) ⇒ Object

WebSocket-bridged variant of subscribe. The fd is expected to be an established WS connection (typically a Tep::WebSocket::Connection’s #fd). On publish, payload is wrapped in a WS TEXT frame via Tep::WebSocket::Driver before delivery – the peer sees a well-formed WS message, not raw bytes that would close the connection.

Cleanup is automatic: when the WS connection closes, Tep::WebSocket::Connection.dispatch_close runs the user’s on_close handler and then calls unsubscribe_fd(driver.fd), dropping every subscription tied to the closed connection. Apps don’t need to add their own unsubscribe; if they do, the second call just finds 0 matches (harmless).



65
66
67
68
69
70
71
# File 'lib/tep/broadcast.rb', line 65

def self.subscribe_ws(topic, fd)
  subs = Tep::APP.broadcast_subs
  sub = Tep::BroadcastSubscription.new(
    topic, fd, Tep::WebSocket::OPCODE_TEXT)
  subs.push(sub)
  subs.length - 1
end

.subscriber_countObject

Total subscription count across all topics. Useful for diagnostics and the v1 test surface.



127
128
129
# File 'lib/tep/broadcast.rb', line 127

def self.subscriber_count
  Tep::APP.broadcast_subs.length
end

.subscribers_for(topic) ⇒ Object

Count of subscribers for one topic. O(n) over the registry; acceptable for v1 (n is typically small per worker).



133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/tep/broadcast.rb', line 133

def self.subscribers_for(topic)
  subs = Tep::APP.broadcast_subs
  n = 0
  i = 0
  while i < subs.length
    if subs[i].topic == topic
      n += 1
    end
    i += 1
  end
  n
end

.unsubscribe(sub_id) ⇒ Object

Drop the subscription at ‘sub_id`. Note that ids are registry indexes; subsequent drops shift everything past it downward. For multi-sub drop, prefer `unsubscribe_fd`.



76
77
78
79
80
81
82
83
# File 'lib/tep/broadcast.rb', line 76

def self.unsubscribe(sub_id)
  subs = Tep::APP.broadcast_subs
  if sub_id < 0 || sub_id >= subs.length
    return 0
  end
  subs.delete_at(sub_id)
  0
end

.unsubscribe_fd(fd) ⇒ Object

Drop every subscription whose fd matches. Returns the count dropped. Used by WS on-close to clean up everything a closing connection had subscribed to. Back-to-front so delete_at indices stay valid mid-loop.



89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/tep/broadcast.rb', line 89

def self.unsubscribe_fd(fd)
  subs = Tep::APP.broadcast_subs
  dropped = 0
  i = subs.length - 1
  while i >= 0
    if subs[i].fd == fd
      subs.delete_at(i)
      dropped += 1
    end
    i -= 1
  end
  dropped
end