Module: Tep::Broadcast

Defined in:
lib/tep/pg.rb,
lib/tep/broadcast.rb

Class Method Summary collapse

Class Method Details

.clearObject

Drop every subscription. Used by tests between fixtures, and available to apps that need to fully reset (e.g. during graceful shutdown). Returns the count dropped.



156
157
158
159
160
161
162
163
# File 'lib/tep/broadcast.rb', line 156

def self.clear
  subs = Tep::APP.broadcast_subs
  n = subs.length
  while subs.length > 0
    subs.delete_at(0)
  end
  n
end

.cross_worker_notify(topic, payload) ⇒ Object

No-op cross-worker hook. Overridden by lib/tep/pg.rb when the PG backend is loaded. Keeping the PG NOTIFY out of core is what lets ‘Tep::Broadcast.publish` compile without pulling tep_pg_*/libpq.



1156
1157
1158
1159
1160
1161
1162
1163
# File 'lib/tep/pg.rb', line 1156

def self.cross_worker_notify(topic, payload)
  if Tep::APP.broadcast_pg_enabled != 0
    wire = Tep::Broadcast.encode_wire(topic, payload)
    Tep::APP.broadcast_pg_conn.notify(
      Tep::APP.broadcast_pg_channel, wire)
  end
  0
end

.deliver_wire_local(wire) ⇒ Object



1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
# File 'lib/tep/pg.rb', line 1206

def self.deliver_wire_local(wire)
  colon = Tep.str_find(wire, ":", 0)
  if colon <= 0
    return -1
  end
  len_str = wire[0, colon]
  tlen    = len_str.to_i
  if tlen < 0 || colon + 1 + tlen > wire.length
    return -1
  end
  topic   = wire[colon + 1, tlen]
  payload = wire[colon + 1 + tlen, wire.length - colon - 1 - tlen]
  Tep::Broadcast.publish_local_only(topic, payload)
end

.disable_pg_backendObject



1179
1180
1181
1182
1183
1184
1185
1186
1187
# File 'lib/tep/pg.rb', line 1179

def self.disable_pg_backend
  if Tep::APP.broadcast_pg_enabled == 0
    return 0
  end
  Tep::APP.broadcast_pg_conn.unlisten(Tep::APP.broadcast_pg_channel)
  Tep::APP.broadcast_pg_conn.finish
  Tep::APP.set_broadcast_pg_enabled(0)
  0
end

.enable_pg_backend(conninfo, channel) ⇒ Object



1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
# File 'lib/tep/pg.rb', line 1165

def self.enable_pg_backend(conninfo, channel)
  conn = PG::Connection.new(conninfo)
  if conn.pgh < 0
    return -1
  end
  if conn.listen(channel) < 0
    return -1
  end
  Tep::APP.set_broadcast_pg_conn(conn)
  Tep::APP.set_broadcast_pg_channel(channel)
  Tep::APP.set_broadcast_pg_enabled(1)
  0
end

.encode_wire(topic, payload) ⇒ Object



1202
1203
1204
# File 'lib/tep/pg.rb', line 1202

def self.encode_wire(topic, payload)
  topic.length.to_s + ":" + topic + payload
end

.poll_pg_once(timeout_ms) ⇒ Object



1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
# File 'lib/tep/pg.rb', line 1189

def self.poll_pg_once(timeout_ms)
  if Tep::APP.broadcast_pg_enabled == 0
    return -1
  end
  r = Tep::APP.broadcast_pg_conn.poll_notification(timeout_ms)
  if r != 1
    return r
  end
  wire = Tep::APP.broadcast_pg_conn.last_notify_payload
  Tep::Broadcast.deliver_wire_local(wire)
  1
end

.publish(topic, payload) ⇒ Object

Write ‘payload` to every subscribed fd for `topic`. Returns the number of subscriptions matched (NOT the number of successful writes – a closed / bad fd still counts as matched; the underlying sphttp_write_str returns -1 silently on that fd). Apps that need delivery confirmation should track their own ack channel.

When the PG backend is enabled (Tep::Broadcast.enable_pg_backend), publish ALSO NOTIFY’s the configured channel so other workers subscribed via poll_pg_once can deliver to their local subscribers. Match count returned is the LOCAL match count; remote deliveries are best-effort and not counted here.



115
116
117
118
119
120
121
122
123
# File 'lib/tep/broadcast.rb', line 115

def self.publish(topic, payload)
  matched = Tep::Broadcast.publish_local_only(topic, payload)
  # Cross-worker fan-out is an OPT-IN hook (#216): the core build
  # has no PG reference, so a non-PG app DCEs the entire libpq
  # closure. `require "tep/pg"` redefines cross_worker_notify with
  # the real broadcast_pg_conn.notify (last-definition-wins).
  Tep::Broadcast.cross_worker_notify(topic, payload)
  matched
end

.publish_local_only(topic, payload) ⇒ Object

Same fan-out as #publish but skips the PG NOTIFY step. Used internally by poll_pg_once when delivering a cross-worker message that already came in via PG – re-NOTIFY would cause an infinite loop.

Branches on each subscription’s ‘mode`:

* mode 0 -> raw bytes via Sock.sphttp_write_str (default,
  for SSE / log fan-out / non-framed consumers).
* mode != 0 -> WebSocket frame via Tep::WebSocket::Driver.send_frame,
  using the mode value as the WS opcode (1=TEXT, 2=BINARY).


184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/tep/broadcast.rb', line 184

def self.publish_local_only(topic, payload)
  subs = Tep::APP.broadcast_subs
  matched = 0
  i = 0
  while i < subs.length
    if subs[i].topic == topic
      if subs[i].mode == 0
        Sock.sphttp_write_str(subs[i].fd, payload)
      else
        Tep::WebSocket::Driver.send_frame(
          subs[i].fd, subs[i].mode, payload)
      end
      matched += 1
    end
    i += 1
  end
  matched
end

.subscribe(topic, fd) ⇒ Object

Register a subscription for ‘fd` on `topic`. Returns an opaque sub_id for later unsubscribe. The fd receives raw bytes on publish – suits SSE / log fan-out / anything that doesn’t need WebSocket framing. For WS connections, prefer subscribe_ws.



45
46
47
48
49
50
# File 'lib/tep/broadcast.rb', line 45

def self.subscribe(topic, fd)
  subs = Tep::APP.broadcast_subs
  sub = Tep::BroadcastSubscription.new(topic, fd, 0)
  subs.push(sub)
  subs.length - 1
end

.subscribe_ws(topic, fd) ⇒ Object

WebSocket-bridged variant of subscribe. The fd is expected to be an established WS connection (typically a Tep::WebSocket::Connection’s #fd). On publish, payload is wrapped in a WS TEXT frame via Tep::WebSocket::Driver before delivery – the peer sees a well-formed WS message, not raw bytes that would close the connection.

Cleanup is automatic: when the WS connection closes, Tep::WebSocket::Connection.dispatch_close runs the user’s on_close handler and then calls unsubscribe_fd(driver.fd), dropping every subscription tied to the closed connection. Apps don’t need to add their own unsubscribe; if they do, the second call just finds 0 matches (harmless).



65
66
67
68
69
70
71
# File 'lib/tep/broadcast.rb', line 65

def self.subscribe_ws(topic, fd)
  subs = Tep::APP.broadcast_subs
  sub = Tep::BroadcastSubscription.new(
    topic, fd, Tep::WebSocket::OPCODE_TEXT)
  subs.push(sub)
  subs.length - 1
end

.subscriber_countObject

Total subscription count across all topics. Useful for diagnostics and the v1 test surface.



134
135
136
# File 'lib/tep/broadcast.rb', line 134

def self.subscriber_count
  Tep::APP.broadcast_subs.length
end

.subscribers_for(topic) ⇒ Object

Count of subscribers for one topic. O(n) over the registry; acceptable for v1 (n is typically small per worker).



140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/tep/broadcast.rb', line 140

def self.subscribers_for(topic)
  subs = Tep::APP.broadcast_subs
  n = 0
  i = 0
  while i < subs.length
    if subs[i].topic == topic
      n += 1
    end
    i += 1
  end
  n
end

.unsubscribe(sub_id) ⇒ Object

Drop the subscription at ‘sub_id`. Note that ids are registry indexes; subsequent drops shift everything past it downward. For multi-sub drop, prefer `unsubscribe_fd`.



76
77
78
79
80
81
82
83
# File 'lib/tep/broadcast.rb', line 76

def self.unsubscribe(sub_id)
  subs = Tep::APP.broadcast_subs
  if sub_id < 0 || sub_id >= subs.length
    return 0
  end
  subs.delete_at(sub_id)
  0
end

.unsubscribe_fd(fd) ⇒ Object

Drop every subscription whose fd matches. Returns the count dropped. Used by WS on-close to clean up everything a closing connection had subscribed to. Back-to-front so delete_at indices stay valid mid-loop.



89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/tep/broadcast.rb', line 89

def self.unsubscribe_fd(fd)
  subs = Tep::APP.broadcast_subs
  dropped = 0
  i = subs.length - 1
  while i >= 0
    if subs[i].fd == fd
      subs.delete_at(i)
      dropped += 1
    end
    i -= 1
  end
  dropped
end