Module: Tep::Presence
- Defined in:
- lib/tep/pg.rb,
lib/tep/presence.rb
Class Method Summary collapse
-
.clear ⇒ Object
Drop every entry.
-
.clear_status(topic, fd) ⇒ Object
Reset an entry’s status back to :available / “” / 0.
-
.count(topic) ⇒ Object
Total entries for ‘topic` (across all kinds).
- .count_agents(topic) ⇒ Object
-
.count_filtered(topic, kind_filter) ⇒ Object
Internal counting helper: ‘kind_filter` is :both for all entries, otherwise :human or :agent_for to filter.
- .count_global(topic) ⇒ Object
- .count_humans(topic) ⇒ Object
-
.diff_topic(topic) ⇒ Object
Compose the Broadcast topic for diff fan-out on a presence topic.
- .disable_pg_mirror ⇒ Object
- .enable_pg_mirror(conninfo) ⇒ Object
-
.encode_diff(kind, entry) ⇒ Object
Flat-JSON wire format for a diff event.
-
.find_entry(topic, fd) ⇒ Object
Internal: find the entry matching (topic, fd).
- .heartbeat ⇒ Object
-
.list(topic) ⇒ Object
All entries for ‘topic`.
- .list_global(topic) ⇒ Object
- .mirror_delete(topic, fd) ⇒ Object
- .mirror_exec(sql, params) ⇒ Object
-
.mirror_insert(entry) ⇒ Object
—- PG mirror (cross-worker visibility) —-.
- .mirror_status(topic, fd, state, note, until_ts) ⇒ Object
- .prune_stale_workers(ttl_seconds) ⇒ Object
-
.publish_diff(kind, entry) ⇒ Object
Publish a diff via Tep::Broadcast.
- .schema_sql ⇒ Object
-
.set_status(topic, fd, state, note, until_ts) ⇒ Object
Set the structured status on an existing entry.
-
.sweep_expired_status ⇒ Object
Sweep entries whose status_until has passed: reset to :available / “” / 0 and emit a “status” diff for each.
-
.track(req, topic, fd) ⇒ Object
Track a presence entry.
-
.untrack(topic, fd) ⇒ Object
Drop the entry for (topic, fd).
-
.untrack_by_fd(fd) ⇒ Object
Drop every entry associated with ‘fd` (across all topics).
- .worker_schema_sql ⇒ Object
Class Method Details
.clear ⇒ Object
Drop every entry. Used by tests between fixtures and available to apps for graceful-shutdown cleanup. Returns the count dropped. Does NOT emit leave diffs (it’s a registry-management op, not a per-connection event).
195 196 197 198 199 200 201 202 |
# File 'lib/tep/presence.rb', line 195 def self.clear entries = Tep::APP.presence_entries n = entries.length while entries.length > 0 entries.delete_at(0) end n end |
.clear_status(topic, fd) ⇒ Object
Reset an entry’s status back to :available / “” / 0.
173 174 175 |
# File 'lib/tep/presence.rb', line 173 def self.clear_status(topic, fd) Tep::Presence.set_status(topic, fd, :available, "", 0) end |
.count(topic) ⇒ Object
Total entries for ‘topic` (across all kinds).
122 123 124 |
# File 'lib/tep/presence.rb', line 122 def self.count(topic) Tep::Presence.count_filtered(topic, :both) end |
.count_agents(topic) ⇒ Object
130 131 132 |
# File 'lib/tep/presence.rb', line 130 def self.count_agents(topic) Tep::Presence.count_filtered(topic, :agent_for) end |
.count_filtered(topic, kind_filter) ⇒ Object
Internal counting helper: ‘kind_filter` is :both for all entries, otherwise :human or :agent_for to filter.
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/tep/presence.rb', line 136 def self.count_filtered(topic, kind_filter) entries = Tep::APP.presence_entries n = 0 i = 0 while i < entries.length if entries[i].topic == topic if kind_filter == :both n += 1 elsif entries[i].kind == kind_filter n += 1 end end i += 1 end n end |
.count_global(topic) ⇒ Object
1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 |
# File 'lib/tep/pg.rb', line 1465 def self.count_global(topic) if Tep::APP.presence_pg_enabled == 0 return 0 end begin r = Tep::APP.presence_pg_conn.exec_params( "SELECT count(*) FROM tep_presence WHERE topic = $1", [topic]) rescue PG::Error return 0 end n = r.getvalue(0, 0).to_i r.clear n end |
.count_humans(topic) ⇒ Object
126 127 128 |
# File 'lib/tep/presence.rb', line 126 def self.count_humans(topic) Tep::Presence.count_filtered(topic, :human) end |
.diff_topic(topic) ⇒ Object
Compose the Broadcast topic for diff fan-out on a presence topic. WS subscribers register via Tep::Broadcast.subscribe_ws(diff_topic(“room:lobby”), ws_fd).
209 210 211 |
# File 'lib/tep/presence.rb', line 209 def self.diff_topic(topic) "presence:" + topic end |
.disable_pg_mirror ⇒ Object
1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 |
# File 'lib/tep/pg.rb', line 1258 def self.disable_pg_mirror if Tep::APP.presence_pg_enabled == 0 return 0 end # Best-effort cleanup -- swallow PG errors (we're tearing the # mirror down regardless) and still finish + disable below. begin r = Tep::APP.presence_pg_conn.exec_params( "DELETE FROM tep_presence WHERE worker_id = $1", [Tep::APP.presence_pg_worker_id]) r.clear # Remove the heartbeat row so prune_stale_workers doesn't # see this worker as live after we're gone. r = Tep::APP.presence_pg_conn.exec_params( "DELETE FROM tep_presence_worker WHERE worker_id = $1", [Tep::APP.presence_pg_worker_id]) r.clear rescue PG::Error # swallow -- shutting the mirror down anyway end Tep::APP.presence_pg_conn.finish Tep::APP.set_presence_pg_enabled(0) 0 end |
.enable_pg_mirror(conninfo) ⇒ Object
1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 |
# File 'lib/tep/pg.rb', line 1224 def self.enable_pg_mirror(conninfo) conn = PG::Connection.new(conninfo) if conn.pgh < 0 return -1 end # exec raises PG::Error on failure now; degrade gracefully # (close + return -1) rather than letting it escape the worker. begin r = conn.exec(Tep::Presence.schema_sql) r.clear # Heartbeat table for the prune-stale-workers path (#47). r = conn.exec(Tep::Presence.worker_schema_sql) r.clear rescue PG::Error conn.finish return -1 end Tep::APP.set_presence_pg_conn(conn) worker_id = Sock.sphttp_getpid.to_s + "-" + Time.now.to_i.to_s Tep::APP.set_presence_pg_worker_id(worker_id) Tep::APP.set_presence_pg_enabled(1) # Drop any rows from a prior worker that managed to leave # stale entries with this same worker_id (unlikely thanks # to the boot-epoch suffix, but defensive). Best-effort. Tep::Presence.mirror_exec( "DELETE FROM tep_presence WHERE worker_id = $1", [worker_id]) # Register this worker's heartbeat row immediately. Apps # refresh it periodically via Tep::Presence.heartbeat; # prune_stale_workers deletes rows whose heartbeat is stale. Tep::Presence.heartbeat 0 end |
.encode_diff(kind, entry) ⇒ Object
Flat-JSON wire format for a diff event. ‘kind` is one of “join” / “leave” / “status”. SpinelKit::Json’s flat-object extractors handle this on the client side (or any JSON-aware peer).
217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/tep/presence.rb', line 217 def self.encode_diff(kind, entry) "{" + SpinelKit::Json.encode_pair_str("kind", kind) + "," + SpinelKit::Json.encode_pair_str("topic", entry.topic) + "," + SpinelKit::Json.encode_pair_str("principal", entry.principal_id) + "," + SpinelKit::Json.encode_pair_str("ekind", entry.kind.to_s) + "," + SpinelKit::Json.encode_pair_str("agent_id", entry.agent_id) + "," + SpinelKit::Json.encode_pair_int("fd", entry.fd) + "," + SpinelKit::Json.encode_pair_int("since", entry.since) + "," + SpinelKit::Json.encode_pair_str("state", entry.status_state.to_s) + "," + SpinelKit::Json.encode_pair_str("note", entry.status_note) + "," + SpinelKit::Json.encode_pair_int("until_ts", entry.status_until) + "}" end |
.find_entry(topic, fd) ⇒ Object
Internal: find the entry matching (topic, fd). Returns nil if no match.
179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/tep/presence.rb', line 179 def self.find_entry(topic, fd) entries = Tep::APP.presence_entries i = 0 while i < entries.length if entries[i].topic == topic && entries[i].fd == fd return entries[i] end i += 1 end nil end |
.heartbeat ⇒ Object
1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 |
# File 'lib/tep/pg.rb', line 1306 def self.heartbeat if Tep::APP.presence_pg_enabled == 0 return 0 end wid = Tep::APP.presence_pg_worker_id if wid.length == 0 return 0 end begin r = Tep::APP.presence_pg_conn.exec_params( "INSERT INTO tep_presence_worker (worker_id, last_seen_ts) " + "VALUES ($1, $2) " + "ON CONFLICT (worker_id) DO UPDATE SET " + " last_seen_ts = EXCLUDED.last_seen_ts", [wid, Time.now.to_i.to_s]) r.clear rescue PG::Error return 0 end 1 end |
.list(topic) ⇒ Object
All entries for ‘topic`. Caller groups by principal_id when they want the Phoenix.Presence-style => [metas] shape; tep doesn’t pre-group because spinel’s nested-hash lowering is awkward.
107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/tep/presence.rb', line 107 def self.list(topic) result = [Tep::PresenceEntry.new("", "", :human, "", -1, 0)] result.delete_at(0) entries = Tep::APP.presence_entries i = 0 while i < entries.length if entries[i].topic == topic result.push(entries[i]) end i += 1 end result end |
.list_global(topic) ⇒ Object
1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 |
# File 'lib/tep/pg.rb', line 1419 def self.list_global(topic) result = [Tep::PresenceEntry.new("", "", :human, "", -1, 0)] result.delete_at(0) if Tep::APP.presence_pg_enabled == 0 return result end begin r = Tep::APP.presence_pg_conn.exec_params( "SELECT principal_id, kind, agent_id, fd, since_ts, " + " status_state, status_note, status_until " + "FROM tep_presence WHERE topic = $1 ORDER BY since_ts", [topic]) rescue PG::Error return result end i = 0 n = r.ntuples while i < n kind_sym = :human if r.getvalue(i, 1) == "agent_for" kind_sym = :agent_for end state_sym = :available sstr = r.getvalue(i, 5) if sstr == "busy" state_sym = :busy elsif sstr == "blocked" state_sym = :blocked end e = Tep::PresenceEntry.new( topic, r.getvalue(i, 0), kind_sym, r.getvalue(i, 2), r.getvalue(i, 3).to_i, r.getvalue(i, 4).to_i) e.status_state = state_sym e.status_note = r.getvalue(i, 6) e.status_until = r.getvalue(i, 7).to_i result.push(e) i += 1 end r.clear result end |
.mirror_delete(topic, fd) ⇒ Object
1397 1398 1399 1400 1401 1402 1403 1404 1405 |
# File 'lib/tep/pg.rb', line 1397 def self.mirror_delete(topic, fd) if Tep::APP.presence_pg_enabled == 0 return 0 end Tep::Presence.mirror_exec( "DELETE FROM tep_presence " + "WHERE worker_id = $1 AND topic = $2 AND fd = $3", [Tep::APP.presence_pg_worker_id, topic, fd.to_s]) end |
.mirror_exec(sql, params) ⇒ Object
1356 1357 1358 1359 1360 1361 1362 1363 1364 |
# File 'lib/tep/pg.rb', line 1356 def self.mirror_exec(sql, params) begin r = Tep::APP.presence_pg_conn.exec_params(sql, params) r.clear rescue PG::Error # swallow -- advisory mirror, local presence is authoritative end 0 end |
.mirror_insert(entry) ⇒ Object
—- PG mirror (cross-worker visibility) —-
The presence PG mirror is OPT-IN (#216). enable_pg_mirror / disable_pg_mirror / heartbeat / prune_stale_workers / mirror_exec / list_global / count_global / the schema DDL, and the REAL bodies of the mirror_* hooks below, live in lib/tep/pg.rb and only compile into apps that ‘require “tep/pg”`. Core Presence keeps no PG reference, so a non-PG app DCEs the libpq closure entirely.
track / untrack / set_status call these mirror_* hooks unconditionally; the core definitions are no-ops. ‘require “tep/pg”` redefines them (last-definition-wins) with the real exec_params writes.
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/tep/presence.rb', line 281 def self.mirror_insert(entry) if Tep::APP.presence_pg_enabled == 0 return 0 end Tep::Presence.mirror_exec( "INSERT INTO tep_presence " + "(worker_id, topic, fd, principal_id, kind, agent_id, " + " since_ts, status_state, status_note, status_until) " + "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) " + "ON CONFLICT (worker_id, topic, fd) DO UPDATE SET " + " principal_id = EXCLUDED.principal_id, " + " kind = EXCLUDED.kind, " + " agent_id = EXCLUDED.agent_id, " + " since_ts = EXCLUDED.since_ts, " + " status_state = EXCLUDED.status_state, " + " status_note = EXCLUDED.status_note, " + " status_until = EXCLUDED.status_until", [ Tep::APP.presence_pg_worker_id, entry.topic, entry.fd.to_s, entry.principal_id, entry.kind.to_s, entry.agent_id, entry.since.to_s, entry.status_state.to_s, entry.status_note, entry.status_until.to_s ]) end |
.mirror_status(topic, fd, state, note, until_ts) ⇒ Object
1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 |
# File 'lib/tep/pg.rb', line 1407 def self.mirror_status(topic, fd, state, note, until_ts) if Tep::APP.presence_pg_enabled == 0 return 0 end Tep::Presence.mirror_exec( "UPDATE tep_presence " + "SET status_state = $4, status_note = $5, status_until = $6 " + "WHERE worker_id = $1 AND topic = $2 AND fd = $3", [Tep::APP.presence_pg_worker_id, topic, fd.to_s, state.to_s, note, until_ts.to_s]) end |
.prune_stale_workers(ttl_seconds) ⇒ Object
1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 |
# File 'lib/tep/pg.rb', line 1328 def self.prune_stale_workers(ttl_seconds) if Tep::APP.presence_pg_enabled == 0 return 0 end cutoff = Time.now.to_i - ttl_seconds conn = Tep::APP.presence_pg_conn begin # Drop dead heartbeats first; the second DELETE then walks # the worker_id space that's still alive. r1 = conn.exec_params( "DELETE FROM tep_presence_worker WHERE last_seen_ts < $1", [cutoff.to_s]) r1.clear # Now drop presence rows whose worker_id isn't in the live # heartbeat table. NOT IN handles both crashed-and-pruned # workers and workers that never registered (legacy rows # from before this prune feature shipped). r2 = conn.exec( "DELETE FROM tep_presence " + "WHERE worker_id NOT IN (SELECT worker_id FROM tep_presence_worker)") n = r2.cmd_tuples r2.clear rescue PG::Error return 0 end n end |
.publish_diff(kind, entry) ⇒ Object
Publish a diff via Tep::Broadcast. Subscribers to diff_topic(entry.topic) – typically WS connections via subscribe_ws – receive the encoded JSON payload as their next message. Returns the local-match count from publish (cross-worker delivery counts aren’t tracked here, same as Broadcast.publish’s documented behavior).
238 239 240 241 242 |
# File 'lib/tep/presence.rb', line 238 def self.publish_diff(kind, entry) payload = Tep::Presence.encode_diff(kind, entry) Tep::Broadcast.publish( Tep::Presence.diff_topic(entry.topic), payload) end |
.schema_sql ⇒ Object
1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 |
# File 'lib/tep/pg.rb', line 1283 def self.schema_sql "CREATE TABLE IF NOT EXISTS tep_presence (" + "worker_id TEXT NOT NULL, " + "topic TEXT NOT NULL, " + "fd INTEGER NOT NULL, " + "principal_id TEXT NOT NULL, " + "kind TEXT NOT NULL, " + "agent_id TEXT NOT NULL, " + "since_ts BIGINT NOT NULL, " + "status_state TEXT NOT NULL, " + "status_note TEXT NOT NULL, " + "status_until BIGINT NOT NULL, " + "PRIMARY KEY (worker_id, topic, fd)" + ")" end |
.set_status(topic, fd, state, note, until_ts) ⇒ Object
Set the structured status on an existing entry. ‘state` ∈:busy, :blocked; `note` is free text (~140 char soft hint); `until_ts` is unix epoch seconds (0 = no identity-level expiry). Returns 1 if the entry was found and updated, 0 otherwise. Emits a “status” diff on the topic’s presence channel on update.
159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/tep/presence.rb', line 159 def self.set_status(topic, fd, state, note, until_ts) entry = Tep::Presence.find_entry(topic, fd) if entry == nil return 0 end entry.status_state = state entry.status_note = note entry.status_until = until_ts Tep::Presence.mirror_status(topic, fd, state, note, until_ts) Tep::Presence.publish_diff("status", entry) 1 end |
.sweep_expired_status ⇒ Object
Sweep entries whose status_until has passed: reset to :available / “” / 0 and emit a “status” diff for each. Apps call this periodically (e.g. once per HTTP request, or in a background fiber once Scheduled is reliable). Returns the count of entries reset.
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/tep/presence.rb', line 249 def self.sweep_expired_status entries = Tep::APP.presence_entries now = Time.now.to_i swept = 0 i = 0 while i < entries.length e = entries[i] if e.status_until > 0 && e.status_until <= now && e.status_state != :available e.status_state = :available e.status_note = "" e.status_until = 0 Tep::Presence.publish_diff("status", e) swept += 1 end i += 1 end swept end |
.track(req, topic, fd) ⇒ Object
Track a presence entry. principal_id comes off req.identity; kind is :human or :agent_for depending on the identity’s delegation state. fd is the underlying connection’s socket (typically a WS-accepted fd). Returns 0 on success.
Multiple track() calls for the same (principal, topic, fd) are deduped: the existing entry stays, no second row is created. Apps can call freely from before-filters / reconnect paths without growing the registry.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/tep/presence.rb', line 42 def self.track(req, topic, fd) ident = req.identity if Tep::Presence.find_entry(topic, fd) != nil return 0 end kind = :human agent_id = "" if ident.agent? kind = :agent_for agent_id = ident.acting_via.agent_id end entry = Tep::PresenceEntry.new( topic, ident.principal_id, kind, agent_id, fd, Time.now.to_i) Tep::APP.presence_entries.push(entry) Tep::Presence.mirror_insert(entry) Tep::Presence.publish_diff("join", entry) 0 end |
.untrack(topic, fd) ⇒ Object
Drop the entry for (topic, fd). The fd is the unique key within a topic; principal_id isn’t needed. Returns 1 if an entry was removed, 0 if none matched. Emits a “leave” diff on the topic’s presence channel when removal happens.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/tep/presence.rb', line 65 def self.untrack(topic, fd) entries = Tep::APP.presence_entries i = 0 while i < entries.length if entries[i].topic == topic && entries[i].fd == fd e = entries[i] entries.delete_at(i) Tep::Presence.mirror_delete(topic, fd) Tep::Presence.publish_diff("leave", e) return 1 end i += 1 end 0 end |
.untrack_by_fd(fd) ⇒ Object
Drop every entry associated with ‘fd` (across all topics). Used by the WS close hook to clean up everything a connection had tracked. Returns the count dropped. Emits one “leave” diff per dropped entry, on each entry’s topic’s presence channel.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/tep/presence.rb', line 86 def self.untrack_by_fd(fd) entries = Tep::APP.presence_entries dropped = 0 i = entries.length - 1 while i >= 0 if entries[i].fd == fd e = entries[i] entries.delete_at(i) Tep::Presence.mirror_delete(e.topic, fd) Tep::Presence.publish_diff("leave", e) dropped += 1 end i -= 1 end dropped end |
.worker_schema_sql ⇒ Object
1299 1300 1301 1302 1303 1304 |
# File 'lib/tep/pg.rb', line 1299 def self.worker_schema_sql "CREATE TABLE IF NOT EXISTS tep_presence_worker (" + "worker_id TEXT PRIMARY KEY, " + "last_seen_ts BIGINT NOT NULL" + ")" end |