Module: Tep::Presence
- Defined in:
- lib/tep/presence.rb
Class Method Summary collapse
-
.clear ⇒ Object
Drop every entry.
-
.clear_status(topic, fd) ⇒ Object
Reset an entry’s status back to :available / “” / 0.
-
.count(topic) ⇒ Object
Total entries for ‘topic` (across all kinds).
- .count_agents(topic) ⇒ Object
-
.count_filtered(topic, kind_filter) ⇒ Object
Internal counting helper: ‘kind_filter` is :both for all entries, otherwise :human or :agent_for to filter.
- .count_global(topic) ⇒ Object
- .count_humans(topic) ⇒ Object
-
.diff_topic(topic) ⇒ Object
Compose the Broadcast topic for diff fan-out on a presence topic.
- .disable_pg_mirror ⇒ Object
-
.enable_pg_mirror(conninfo) ⇒ Object
—- PG mirror (cross-worker visibility) —-.
-
.encode_diff(kind, entry) ⇒ Object
Flat-JSON wire format for a diff event.
-
.find_entry(topic, fd) ⇒ Object
Internal: find the entry matching (topic, fd).
-
.heartbeat ⇒ Object
Refresh this worker’s heartbeat row to the current Unix timestamp.
-
.list(topic) ⇒ Object
All entries for ‘topic`.
-
.list_global(topic) ⇒ Object
Cross-worker list: SELECT all entries on ‘topic` regardless of which worker tracked them.
-
.mirror_delete(topic, fd) ⇒ Object
Mirror an untrack to PG.
-
.mirror_exec(sql, params) ⇒ Object
Best-effort mirror write: run an exec_params on the mirror conn and swallow any PG::Error.
-
.mirror_insert(entry) ⇒ Object
Mirror a track to PG.
-
.mirror_status(topic, fd, state, note, until_ts) ⇒ Object
Mirror a status update.
-
.prune_stale_workers(ttl_seconds) ⇒ Object
Prune crashed-worker rows.
-
.publish_diff(kind, entry) ⇒ Object
Publish a diff via Tep::Broadcast.
-
.schema_sql ⇒ Object
CREATE TABLE statement, kept here so apps that want to provision the schema separately (migration runners, etc.) can grab the canonical DDL.
-
.set_status(topic, fd, state, note, until_ts) ⇒ Object
Set the structured status on an existing entry.
-
.sweep_expired_status ⇒ Object
Sweep entries whose status_until has passed: reset to :available / “” / 0 and emit a “status” diff for each.
-
.track(req, topic, fd) ⇒ Object
Track a presence entry.
-
.untrack(topic, fd) ⇒ Object
Drop the entry for (topic, fd).
-
.untrack_by_fd(fd) ⇒ Object
Drop every entry associated with ‘fd` (across all topics).
-
.worker_schema_sql ⇒ Object
Heartbeat table – one row per worker that’s mirroring presence right now.
Class Method Details
.clear ⇒ Object
Drop every entry. Used by tests between fixtures and available to apps for graceful-shutdown cleanup. Returns the count dropped. Does NOT emit leave diffs (it’s a registry-management op, not a per-connection event).
195 196 197 198 199 200 201 202 |
# File 'lib/tep/presence.rb', line 195 def self.clear entries = Tep::APP.presence_entries n = entries.length while entries.length > 0 entries.delete_at(0) end n end |
.clear_status(topic, fd) ⇒ Object
Reset an entry’s status back to :available / “” / 0.
173 174 175 |
# File 'lib/tep/presence.rb', line 173 def self.clear_status(topic, fd) Tep::Presence.set_status(topic, fd, :available, "", 0) end |
.count(topic) ⇒ Object
Total entries for ‘topic` (across all kinds).
122 123 124 |
# File 'lib/tep/presence.rb', line 122 def self.count(topic) Tep::Presence.count_filtered(topic, :both) end |
.count_agents(topic) ⇒ Object
130 131 132 |
# File 'lib/tep/presence.rb', line 130 def self.count_agents(topic) Tep::Presence.count_filtered(topic, :agent_for) end |
.count_filtered(topic, kind_filter) ⇒ Object
Internal counting helper: ‘kind_filter` is :both for all entries, otherwise :human or :agent_for to filter.
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/tep/presence.rb', line 136 def self.count_filtered(topic, kind_filter) entries = Tep::APP.presence_entries n = 0 i = 0 while i < entries.length if entries[i].topic == topic if kind_filter == :both n += 1 elsif entries[i].kind == kind_filter n += 1 end end i += 1 end n end |
.count_global(topic) ⇒ Object
573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 |
# File 'lib/tep/presence.rb', line 573 def self.count_global(topic) if Tep::APP.presence_pg_enabled == 0 return 0 end begin r = Tep::APP.presence_pg_conn.exec_params( "SELECT count(*) FROM tep_presence WHERE topic = $1", [topic]) rescue PG::Error return 0 end n = r.getvalue(0, 0).to_i r.clear n end |
.count_humans(topic) ⇒ Object
126 127 128 |
# File 'lib/tep/presence.rb', line 126 def self.count_humans(topic) Tep::Presence.count_filtered(topic, :human) end |
.diff_topic(topic) ⇒ Object
Compose the Broadcast topic for diff fan-out on a presence topic. WS subscribers register via Tep::Broadcast.subscribe_ws(diff_topic(“room:lobby”), ws_fd).
209 210 211 |
# File 'lib/tep/presence.rb', line 209 def self.diff_topic(topic) "presence:" + topic end |
.disable_pg_mirror ⇒ Object
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 |
# File 'lib/tep/presence.rb', line 320 def self.disable_pg_mirror if Tep::APP.presence_pg_enabled == 0 return 0 end # Best-effort cleanup -- swallow PG errors (we're tearing the # mirror down regardless) and still finish + disable below. begin r = Tep::APP.presence_pg_conn.exec_params( "DELETE FROM tep_presence WHERE worker_id = $1", [Tep::APP.presence_pg_worker_id]) r.clear # Remove the heartbeat row so prune_stale_workers doesn't # see this worker as live after we're gone. r = Tep::APP.presence_pg_conn.exec_params( "DELETE FROM tep_presence_worker WHERE worker_id = $1", [Tep::APP.presence_pg_worker_id]) r.clear rescue PG::Error # swallow -- shutting the mirror down anyway end Tep::APP.presence_pg_conn.finish Tep::APP.set_presence_pg_enabled(0) 0 end |
.enable_pg_mirror(conninfo) ⇒ Object
—- PG mirror (cross-worker visibility) —-
Opt-in mirror of the local presence registry to a shared PG table. Each worker’s track/untrack/set_status writes also touch the table; list_global / count_global read across all workers. The local registry stays the fast read path for per-worker queries (list / count); list_global is for the “who’s globally in this room” snapshot that’s typically a one-shot UI render.
Worker ID is PID + boot epoch second so a same-PID restart doesn’t alias a prior worker’s stale rows. On disable_pg_mirror (or clean shutdown), this worker’s rows get DELETE’d. Crashed workers leave stale rows; the heartbeat + prune_stale_workers pair below handles the garbage-collection.
Returns 0 on success, -1 on connect / schema failure.
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 |
# File 'lib/tep/presence.rb', line 286 def self.enable_pg_mirror(conninfo) conn = PG::Connection.new(conninfo) if conn.pgh < 0 return -1 end # exec raises PG::Error on failure now; degrade gracefully # (close + return -1) rather than letting it escape the worker. begin r = conn.exec(Tep::Presence.schema_sql) r.clear # Heartbeat table for the prune-stale-workers path (#47). r = conn.exec(Tep::Presence.worker_schema_sql) r.clear rescue PG::Error conn.finish return -1 end Tep::APP.set_presence_pg_conn(conn) worker_id = Sock.sphttp_getpid.to_s + "-" + Time.now.to_i.to_s Tep::APP.set_presence_pg_worker_id(worker_id) Tep::APP.set_presence_pg_enabled(1) # Drop any rows from a prior worker that managed to leave # stale entries with this same worker_id (unlikely thanks # to the boot-epoch suffix, but defensive). Best-effort. Tep::Presence.mirror_exec( "DELETE FROM tep_presence WHERE worker_id = $1", [worker_id]) # Register this worker's heartbeat row immediately. Apps # refresh it periodically via Tep::Presence.heartbeat; # prune_stale_workers deletes rows whose heartbeat is stale. Tep::Presence.heartbeat 0 end |
.encode_diff(kind, entry) ⇒ Object
Flat-JSON wire format for a diff event. ‘kind` is one of “join” / “leave” / “status”. Tep::Json’s flat-object extractors handle this on the client side (or any JSON-aware peer).
217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/tep/presence.rb', line 217 def self.encode_diff(kind, entry) "{" + Tep::Json.encode_pair_str("kind", kind) + "," + Tep::Json.encode_pair_str("topic", entry.topic) + "," + Tep::Json.encode_pair_str("principal", entry.principal_id) + "," + Tep::Json.encode_pair_str("ekind", entry.kind.to_s) + "," + Tep::Json.encode_pair_str("agent_id", entry.agent_id) + "," + Tep::Json.encode_pair_int("fd", entry.fd) + "," + Tep::Json.encode_pair_int("since", entry.since) + "," + Tep::Json.encode_pair_str("state", entry.status_state.to_s) + "," + Tep::Json.encode_pair_str("note", entry.status_note) + "," + Tep::Json.encode_pair_int("until_ts", entry.status_until) + "}" end |
.find_entry(topic, fd) ⇒ Object
Internal: find the entry matching (topic, fd). Returns nil if no match.
179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/tep/presence.rb', line 179 def self.find_entry(topic, fd) entries = Tep::APP.presence_entries i = 0 while i < entries.length if entries[i].topic == topic && entries[i].fd == fd return entries[i] end i += 1 end nil end |
.heartbeat ⇒ Object
Refresh this worker’s heartbeat row to the current Unix timestamp. Apps call this periodically (typical: from a before-filter, a Tep::Job tick, or an explicit timer fiber) so prune_stale_workers can tell live workers from crashed ones. No-op when the PG mirror isn’t enabled, or when the mirror was opened on a different process and we’re the post-fork child (worker_id is empty until enable_pg_mirror runs locally).
Returns 1 if the heartbeat row was upserted, 0 if the call short-circuited (mirror disabled or no worker_id).
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 |
# File 'lib/tep/presence.rb', line 386 def self.heartbeat if Tep::APP.presence_pg_enabled == 0 return 0 end wid = Tep::APP.presence_pg_worker_id if wid.length == 0 return 0 end begin r = Tep::APP.presence_pg_conn.exec_params( "INSERT INTO tep_presence_worker (worker_id, last_seen_ts) " + "VALUES ($1, $2) " + "ON CONFLICT (worker_id) DO UPDATE SET " + " last_seen_ts = EXCLUDED.last_seen_ts", [wid, Time.now.to_i.to_s]) r.clear rescue PG::Error return 0 end 1 end |
.list(topic) ⇒ Object
All entries for ‘topic`. Caller groups by principal_id when they want the Phoenix.Presence-style => [metas] shape; tep doesn’t pre-group because spinel’s nested-hash lowering is awkward.
107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/tep/presence.rb', line 107 def self.list(topic) result = [Tep::PresenceEntry.new("", "", :human, "", -1, 0)] result.delete_at(0) entries = Tep::APP.presence_entries i = 0 while i < entries.length if entries[i].topic == topic result.push(entries[i]) end i += 1 end result end |
.list_global(topic) ⇒ Object
Cross-worker list: SELECT all entries on ‘topic` regardless of which worker tracked them. Returns Array built from the PG rows. The returned entries are read-only snapshots – mutating them doesn’t write back to PG.
527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 |
# File 'lib/tep/presence.rb', line 527 def self.list_global(topic) result = [Tep::PresenceEntry.new("", "", :human, "", -1, 0)] result.delete_at(0) if Tep::APP.presence_pg_enabled == 0 return result end begin r = Tep::APP.presence_pg_conn.exec_params( "SELECT principal_id, kind, agent_id, fd, since_ts, " + " status_state, status_note, status_until " + "FROM tep_presence WHERE topic = $1 ORDER BY since_ts", [topic]) rescue PG::Error return result end i = 0 n = r.ntuples while i < n kind_sym = :human if r.getvalue(i, 1) == "agent_for" kind_sym = :agent_for end state_sym = :available sstr = r.getvalue(i, 5) if sstr == "busy" state_sym = :busy elsif sstr == "blocked" state_sym = :blocked end e = Tep::PresenceEntry.new( topic, r.getvalue(i, 0), kind_sym, r.getvalue(i, 2), r.getvalue(i, 3).to_i, r.getvalue(i, 4).to_i) e.status_state = state_sym e.status_note = r.getvalue(i, 6) e.status_until = r.getvalue(i, 7).to_i result.push(e) i += 1 end r.clear result end |
.mirror_delete(topic, fd) ⇒ Object
Mirror an untrack to PG.
500 501 502 503 504 505 506 507 508 |
# File 'lib/tep/presence.rb', line 500 def self.mirror_delete(topic, fd) if Tep::APP.presence_pg_enabled == 0 return 0 end Tep::Presence.mirror_exec( "DELETE FROM tep_presence " + "WHERE worker_id = $1 AND topic = $2 AND fd = $3", [Tep::APP.presence_pg_worker_id, topic, fd.to_s]) end |
.mirror_exec(sql, params) ⇒ Object
Best-effort mirror write: run an exec_params on the mirror conn and swallow any PG::Error. The PG mirror is advisory – local presence is authoritative – so a transient mirror failure must never propagate into the caller’s request now that exec raises (matz/spinel#627 + #1041). Always returns 0.
456 457 458 459 460 461 462 463 464 |
# File 'lib/tep/presence.rb', line 456 def self.mirror_exec(sql, params) begin r = Tep::APP.presence_pg_conn.exec_params(sql, params) r.clear rescue PG::Error # swallow -- advisory mirror, local presence is authoritative end 0 end |
.mirror_insert(entry) ⇒ Object
Mirror a track to PG. Called from track() when the PG mirror is enabled.
468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 |
# File 'lib/tep/presence.rb', line 468 def self.mirror_insert(entry) if Tep::APP.presence_pg_enabled == 0 return 0 end Tep::Presence.mirror_exec( "INSERT INTO tep_presence " + "(worker_id, topic, fd, principal_id, kind, agent_id, " + " since_ts, status_state, status_note, status_until) " + "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) " + "ON CONFLICT (worker_id, topic, fd) DO UPDATE SET " + " principal_id = EXCLUDED.principal_id, " + " kind = EXCLUDED.kind, " + " agent_id = EXCLUDED.agent_id, " + " since_ts = EXCLUDED.since_ts, " + " status_state = EXCLUDED.status_state, " + " status_note = EXCLUDED.status_note, " + " status_until = EXCLUDED.status_until", [ Tep::APP.presence_pg_worker_id, entry.topic, entry.fd.to_s, entry.principal_id, entry.kind.to_s, entry.agent_id, entry.since.to_s, entry.status_state.to_s, entry.status_note, entry.status_until.to_s ]) end |
.mirror_status(topic, fd, state, note, until_ts) ⇒ Object
Mirror a status update.
511 512 513 514 515 516 517 518 519 520 521 |
# File 'lib/tep/presence.rb', line 511 def self.mirror_status(topic, fd, state, note, until_ts) if Tep::APP.presence_pg_enabled == 0 return 0 end Tep::Presence.mirror_exec( "UPDATE tep_presence " + "SET status_state = $4, status_note = $5, status_until = $6 " + "WHERE worker_id = $1 AND topic = $2 AND fd = $3", [Tep::APP.presence_pg_worker_id, topic, fd.to_s, state.to_s, note, until_ts.to_s]) end |
.prune_stale_workers(ttl_seconds) ⇒ Object
Prune crashed-worker rows. Deletes:
1. tep_presence_worker rows whose last_seen_ts is older than
ttl_seconds (the worker's heartbeat is stale).
2. tep_presence rows whose worker_id has no surviving
heartbeat (orphans left by the crashed worker).
Apps call this periodically – the canonical shape is a before-filter on a “/health” route that internal monitoring hits every 30s, or a Tep::Job that fires from a cron-like tick. Returns the number of tep_presence rows deleted.
ttl_seconds should be at least 3x the app’s typical heartbeat interval so a transient slow response doesn’t evict a live worker. Default callers pass 90 (assumes 30s heartbeats).
423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 |
# File 'lib/tep/presence.rb', line 423 def self.prune_stale_workers(ttl_seconds) if Tep::APP.presence_pg_enabled == 0 return 0 end cutoff = Time.now.to_i - ttl_seconds conn = Tep::APP.presence_pg_conn begin # Drop dead heartbeats first; the second DELETE then walks # the worker_id space that's still alive. r1 = conn.exec_params( "DELETE FROM tep_presence_worker WHERE last_seen_ts < $1", [cutoff.to_s]) r1.clear # Now drop presence rows whose worker_id isn't in the live # heartbeat table. NOT IN handles both crashed-and-pruned # workers and workers that never registered (legacy rows # from before this prune feature shipped). r2 = conn.exec( "DELETE FROM tep_presence " + "WHERE worker_id NOT IN (SELECT worker_id FROM tep_presence_worker)") n = r2.cmd_tuples r2.clear rescue PG::Error return 0 end n end |
.publish_diff(kind, entry) ⇒ Object
Publish a diff via Tep::Broadcast. Subscribers to diff_topic(entry.topic) – typically WS connections via subscribe_ws – receive the encoded JSON payload as their next message. Returns the local-match count from publish (cross-worker delivery counts aren’t tracked here, same as Broadcast.publish’s documented behavior).
238 239 240 241 242 |
# File 'lib/tep/presence.rb', line 238 def self.publish_diff(kind, entry) payload = Tep::Presence.encode_diff(kind, entry) Tep::Broadcast.publish( Tep::Presence.diff_topic(entry.topic), payload) end |
.schema_sql ⇒ Object
CREATE TABLE statement, kept here so apps that want to provision the schema separately (migration runners, etc.) can grab the canonical DDL. Idempotent via IF NOT EXISTS.
348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 |
# File 'lib/tep/presence.rb', line 348 def self.schema_sql "CREATE TABLE IF NOT EXISTS tep_presence (" + "worker_id TEXT NOT NULL, " + "topic TEXT NOT NULL, " + "fd INTEGER NOT NULL, " + "principal_id TEXT NOT NULL, " + "kind TEXT NOT NULL, " + "agent_id TEXT NOT NULL, " + "since_ts BIGINT NOT NULL, " + "status_state TEXT NOT NULL, " + "status_note TEXT NOT NULL, " + "status_until BIGINT NOT NULL, " + "PRIMARY KEY (worker_id, topic, fd)" + ")" end |
.set_status(topic, fd, state, note, until_ts) ⇒ Object
Set the structured status on an existing entry. ‘state` ∈:busy, :blocked; `note` is free text (~140 char soft hint); `until_ts` is unix epoch seconds (0 = no identity-level expiry). Returns 1 if the entry was found and updated, 0 otherwise. Emits a “status” diff on the topic’s presence channel on update.
159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/tep/presence.rb', line 159 def self.set_status(topic, fd, state, note, until_ts) entry = Tep::Presence.find_entry(topic, fd) if entry == nil return 0 end entry.status_state = state entry.status_note = note entry.status_until = until_ts Tep::Presence.mirror_status(topic, fd, state, note, until_ts) Tep::Presence.publish_diff("status", entry) 1 end |
.sweep_expired_status ⇒ Object
Sweep entries whose status_until has passed: reset to :available / “” / 0 and emit a “status” diff for each. Apps call this periodically (e.g. once per HTTP request, or in a background fiber once Scheduled is reliable). Returns the count of entries reset.
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/tep/presence.rb', line 249 def self.sweep_expired_status entries = Tep::APP.presence_entries now = Time.now.to_i swept = 0 i = 0 while i < entries.length e = entries[i] if e.status_until > 0 && e.status_until <= now && e.status_state != :available e.status_state = :available e.status_note = "" e.status_until = 0 Tep::Presence.publish_diff("status", e) swept += 1 end i += 1 end swept end |
.track(req, topic, fd) ⇒ Object
Track a presence entry. principal_id comes off req.identity; kind is :human or :agent_for depending on the identity’s delegation state. fd is the underlying connection’s socket (typically a WS-accepted fd). Returns 0 on success.
Multiple track() calls for the same (principal, topic, fd) are deduped: the existing entry stays, no second row is created. Apps can call freely from before-filters / reconnect paths without growing the registry.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/tep/presence.rb', line 42 def self.track(req, topic, fd) ident = req.identity if Tep::Presence.find_entry(topic, fd) != nil return 0 end kind = :human agent_id = "" if ident.agent? kind = :agent_for agent_id = ident.acting_via.agent_id end entry = Tep::PresenceEntry.new( topic, ident.principal_id, kind, agent_id, fd, Time.now.to_i) Tep::APP.presence_entries.push(entry) Tep::Presence.mirror_insert(entry) Tep::Presence.publish_diff("join", entry) 0 end |
.untrack(topic, fd) ⇒ Object
Drop the entry for (topic, fd). The fd is the unique key within a topic; principal_id isn’t needed. Returns 1 if an entry was removed, 0 if none matched. Emits a “leave” diff on the topic’s presence channel when removal happens.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/tep/presence.rb', line 65 def self.untrack(topic, fd) entries = Tep::APP.presence_entries i = 0 while i < entries.length if entries[i].topic == topic && entries[i].fd == fd e = entries[i] entries.delete_at(i) Tep::Presence.mirror_delete(topic, fd) Tep::Presence.publish_diff("leave", e) return 1 end i += 1 end 0 end |
.untrack_by_fd(fd) ⇒ Object
Drop every entry associated with ‘fd` (across all topics). Used by the WS close hook to clean up everything a connection had tracked. Returns the count dropped. Emits one “leave” diff per dropped entry, on each entry’s topic’s presence channel.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/tep/presence.rb', line 86 def self.untrack_by_fd(fd) entries = Tep::APP.presence_entries dropped = 0 i = entries.length - 1 while i >= 0 if entries[i].fd == fd e = entries[i] entries.delete_at(i) Tep::Presence.mirror_delete(e.topic, fd) Tep::Presence.publish_diff("leave", e) dropped += 1 end i -= 1 end dropped end |
.worker_schema_sql ⇒ Object
Heartbeat table – one row per worker that’s mirroring presence right now. Used by prune_stale_workers to identify crashed workers (no heartbeat updates in N seconds) and garbage-collect their orphan tep_presence rows.
368 369 370 371 372 373 |
# File 'lib/tep/presence.rb', line 368 def self.worker_schema_sql "CREATE TABLE IF NOT EXISTS tep_presence_worker (" + "worker_id TEXT PRIMARY KEY, " + "last_seen_ts BIGINT NOT NULL" + ")" end |