Module: Tep::Presence

Defined in:
lib/tep/presence.rb

Class Method Summary collapse

Class Method Details

.clearObject

Drop every entry. Used by tests between fixtures and available to apps for graceful-shutdown cleanup. Returns the count dropped. Does NOT emit leave diffs (it’s a registry-management op, not a per-connection event).



195
196
197
198
199
200
201
202
# File 'lib/tep/presence.rb', line 195

def self.clear
  entries = Tep::APP.presence_entries
  n = entries.length
  while entries.length > 0
    entries.delete_at(0)
  end
  n
end

.clear_status(topic, fd) ⇒ Object

Reset an entry’s status back to :available / “” / 0.



173
174
175
# File 'lib/tep/presence.rb', line 173

def self.clear_status(topic, fd)
  Tep::Presence.set_status(topic, fd, :available, "", 0)
end

.count(topic) ⇒ Object

Total entries for ‘topic` (across all kinds).



122
123
124
# File 'lib/tep/presence.rb', line 122

def self.count(topic)
  Tep::Presence.count_filtered(topic, :both)
end

.count_agents(topic) ⇒ Object



130
131
132
# File 'lib/tep/presence.rb', line 130

def self.count_agents(topic)
  Tep::Presence.count_filtered(topic, :agent_for)
end

.count_filtered(topic, kind_filter) ⇒ Object

Internal counting helper: ‘kind_filter` is :both for all entries, otherwise :human or :agent_for to filter.



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/tep/presence.rb', line 136

def self.count_filtered(topic, kind_filter)
  entries = Tep::APP.presence_entries
  n = 0
  i = 0
  while i < entries.length
    if entries[i].topic == topic
      if kind_filter == :both
        n += 1
      elsif entries[i].kind == kind_filter
        n += 1
      end
    end
    i += 1
  end
  n
end

.count_global(topic) ⇒ Object



573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
# File 'lib/tep/presence.rb', line 573

def self.count_global(topic)
  if Tep::APP.presence_pg_enabled == 0
    return 0
  end
  begin
    r = Tep::APP.presence_pg_conn.exec_params(
      "SELECT count(*) FROM tep_presence WHERE topic = $1",
      [topic])
  rescue PG::Error
    return 0
  end
  n = r.getvalue(0, 0).to_i
  r.clear
  n
end

.count_humans(topic) ⇒ Object



126
127
128
# File 'lib/tep/presence.rb', line 126

def self.count_humans(topic)
  Tep::Presence.count_filtered(topic, :human)
end

.diff_topic(topic) ⇒ Object

Compose the Broadcast topic for diff fan-out on a presence topic. WS subscribers register via Tep::Broadcast.subscribe_ws(diff_topic(“room:lobby”), ws_fd).



209
210
211
# File 'lib/tep/presence.rb', line 209

def self.diff_topic(topic)
  "presence:" + topic
end

.disable_pg_mirrorObject



320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# File 'lib/tep/presence.rb', line 320

def self.disable_pg_mirror
  if Tep::APP.presence_pg_enabled == 0
    return 0
  end
  # Best-effort cleanup -- swallow PG errors (we're tearing the
  # mirror down regardless) and still finish + disable below.
  begin
    r = Tep::APP.presence_pg_conn.exec_params(
      "DELETE FROM tep_presence WHERE worker_id = $1",
      [Tep::APP.presence_pg_worker_id])
    r.clear
    # Remove the heartbeat row so prune_stale_workers doesn't
    # see this worker as live after we're gone.
    r = Tep::APP.presence_pg_conn.exec_params(
      "DELETE FROM tep_presence_worker WHERE worker_id = $1",
      [Tep::APP.presence_pg_worker_id])
    r.clear
  rescue PG::Error
    # swallow -- shutting the mirror down anyway
  end
  Tep::APP.presence_pg_conn.finish
  Tep::APP.set_presence_pg_enabled(0)
  0
end

.enable_pg_mirror(conninfo) ⇒ Object

—- PG mirror (cross-worker visibility) —-

Opt-in mirror of the local presence registry to a shared PG table. Each worker’s track/untrack/set_status writes also touch the table; list_global / count_global read across all workers. The local registry stays the fast read path for per-worker queries (list / count); list_global is for the “who’s globally in this room” snapshot that’s typically a one-shot UI render.

Worker ID is PID + boot epoch second so a same-PID restart doesn’t alias a prior worker’s stale rows. On disable_pg_mirror (or clean shutdown), this worker’s rows get DELETE’d. Crashed workers leave stale rows; the heartbeat + prune_stale_workers pair below handles the garbage-collection.

Returns 0 on success, -1 on connect / schema failure.



286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/tep/presence.rb', line 286

def self.enable_pg_mirror(conninfo)
  conn = PG::Connection.new(conninfo)
  if conn.pgh < 0
    return -1
  end
  # exec raises PG::Error on failure now; degrade gracefully
  # (close + return -1) rather than letting it escape the worker.
  begin
    r = conn.exec(Tep::Presence.schema_sql)
    r.clear
    # Heartbeat table for the prune-stale-workers path (#47).
    r = conn.exec(Tep::Presence.worker_schema_sql)
    r.clear
  rescue PG::Error
    conn.finish
    return -1
  end
  Tep::APP.set_presence_pg_conn(conn)
  worker_id = Sock.sphttp_getpid.to_s + "-" + Time.now.to_i.to_s
  Tep::APP.set_presence_pg_worker_id(worker_id)
  Tep::APP.set_presence_pg_enabled(1)
  # Drop any rows from a prior worker that managed to leave
  # stale entries with this same worker_id (unlikely thanks
  # to the boot-epoch suffix, but defensive). Best-effort.
  Tep::Presence.mirror_exec(
    "DELETE FROM tep_presence WHERE worker_id = $1",
    [worker_id])
  # Register this worker's heartbeat row immediately. Apps
  # refresh it periodically via Tep::Presence.heartbeat;
  # prune_stale_workers deletes rows whose heartbeat is stale.
  Tep::Presence.heartbeat
  0
end

.encode_diff(kind, entry) ⇒ Object

Flat-JSON wire format for a diff event. ‘kind` is one of “join” / “leave” / “status”. Tep::Json’s flat-object extractors handle this on the client side (or any JSON-aware peer).



217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/tep/presence.rb', line 217

def self.encode_diff(kind, entry)
  "{" +
    Tep::Json.encode_pair_str("kind", kind) + "," +
    Tep::Json.encode_pair_str("topic", entry.topic) + "," +
    Tep::Json.encode_pair_str("principal", entry.principal_id) + "," +
    Tep::Json.encode_pair_str("ekind", entry.kind.to_s) + "," +
    Tep::Json.encode_pair_str("agent_id", entry.agent_id) + "," +
    Tep::Json.encode_pair_int("fd", entry.fd) + "," +
    Tep::Json.encode_pair_int("since", entry.since) + "," +
    Tep::Json.encode_pair_str("state", entry.status_state.to_s) + "," +
    Tep::Json.encode_pair_str("note", entry.status_note) + "," +
    Tep::Json.encode_pair_int("until_ts", entry.status_until) +
  "}"
end

.find_entry(topic, fd) ⇒ Object

Internal: find the entry matching (topic, fd). Returns nil if no match.



179
180
181
182
183
184
185
186
187
188
189
# File 'lib/tep/presence.rb', line 179

def self.find_entry(topic, fd)
  entries = Tep::APP.presence_entries
  i = 0
  while i < entries.length
    if entries[i].topic == topic && entries[i].fd == fd
      return entries[i]
    end
    i += 1
  end
  nil
end

.heartbeatObject

Refresh this worker’s heartbeat row to the current Unix timestamp. Apps call this periodically (typical: from a before-filter, a Tep::Job tick, or an explicit timer fiber) so prune_stale_workers can tell live workers from crashed ones. No-op when the PG mirror isn’t enabled, or when the mirror was opened on a different process and we’re the post-fork child (worker_id is empty until enable_pg_mirror runs locally).

Returns 1 if the heartbeat row was upserted, 0 if the call short-circuited (mirror disabled or no worker_id).



386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
# File 'lib/tep/presence.rb', line 386

def self.heartbeat
  if Tep::APP.presence_pg_enabled == 0
    return 0
  end
  wid = Tep::APP.presence_pg_worker_id
  if wid.length == 0
    return 0
  end
  begin
    r = Tep::APP.presence_pg_conn.exec_params(
      "INSERT INTO tep_presence_worker (worker_id, last_seen_ts) " +
      "VALUES ($1, $2) " +
      "ON CONFLICT (worker_id) DO UPDATE SET " +
      "  last_seen_ts = EXCLUDED.last_seen_ts",
      [wid, Time.now.to_i.to_s])
    r.clear
  rescue PG::Error
    return 0
  end
  1
end

.list(topic) ⇒ Object

All entries for ‘topic`. Caller groups by principal_id when they want the Phoenix.Presence-style => [metas] shape; tep doesn’t pre-group because spinel’s nested-hash lowering is awkward.



107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/tep/presence.rb', line 107

def self.list(topic)
  result = [Tep::PresenceEntry.new("", "", :human, "", -1, 0)]
  result.delete_at(0)
  entries = Tep::APP.presence_entries
  i = 0
  while i < entries.length
    if entries[i].topic == topic
      result.push(entries[i])
    end
    i += 1
  end
  result
end

.list_global(topic) ⇒ Object

Cross-worker list: SELECT all entries on ‘topic` regardless of which worker tracked them. Returns Array built from the PG rows. The returned entries are read-only snapshots – mutating them doesn’t write back to PG.



527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
# File 'lib/tep/presence.rb', line 527

def self.list_global(topic)
  result = [Tep::PresenceEntry.new("", "", :human, "", -1, 0)]
  result.delete_at(0)
  if Tep::APP.presence_pg_enabled == 0
    return result
  end
  begin
    r = Tep::APP.presence_pg_conn.exec_params(
      "SELECT principal_id, kind, agent_id, fd, since_ts, " +
      "       status_state, status_note, status_until " +
      "FROM tep_presence WHERE topic = $1 ORDER BY since_ts",
      [topic])
  rescue PG::Error
    return result
  end
  i = 0
  n = r.ntuples
  while i < n
    kind_sym = :human
    if r.getvalue(i, 1) == "agent_for"
      kind_sym = :agent_for
    end
    state_sym = :available
    sstr = r.getvalue(i, 5)
    if sstr == "busy"
      state_sym = :busy
    elsif sstr == "blocked"
      state_sym = :blocked
    end
    e = Tep::PresenceEntry.new(
      topic,
      r.getvalue(i, 0),
      kind_sym,
      r.getvalue(i, 2),
      r.getvalue(i, 3).to_i,
      r.getvalue(i, 4).to_i)
    e.status_state = state_sym
    e.status_note  = r.getvalue(i, 6)
    e.status_until = r.getvalue(i, 7).to_i
    result.push(e)
    i += 1
  end
  r.clear
  result
end

.mirror_delete(topic, fd) ⇒ Object

Mirror an untrack to PG.



500
501
502
503
504
505
506
507
508
# File 'lib/tep/presence.rb', line 500

def self.mirror_delete(topic, fd)
  if Tep::APP.presence_pg_enabled == 0
    return 0
  end
  Tep::Presence.mirror_exec(
    "DELETE FROM tep_presence " +
    "WHERE worker_id = $1 AND topic = $2 AND fd = $3",
    [Tep::APP.presence_pg_worker_id, topic, fd.to_s])
end

.mirror_exec(sql, params) ⇒ Object

Best-effort mirror write: run an exec_params on the mirror conn and swallow any PG::Error. The PG mirror is advisory – local presence is authoritative – so a transient mirror failure must never propagate into the caller’s request now that exec raises (matz/spinel#627 + #1041). Always returns 0.



456
457
458
459
460
461
462
463
464
# File 'lib/tep/presence.rb', line 456

def self.mirror_exec(sql, params)
  begin
    r = Tep::APP.presence_pg_conn.exec_params(sql, params)
    r.clear
  rescue PG::Error
    # swallow -- advisory mirror, local presence is authoritative
  end
  0
end

.mirror_insert(entry) ⇒ Object

Mirror a track to PG. Called from track() when the PG mirror is enabled.



468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
# File 'lib/tep/presence.rb', line 468

def self.mirror_insert(entry)
  if Tep::APP.presence_pg_enabled == 0
    return 0
  end
  Tep::Presence.mirror_exec(
    "INSERT INTO tep_presence " +
    "(worker_id, topic, fd, principal_id, kind, agent_id, " +
    " since_ts, status_state, status_note, status_until) " +
    "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) " +
    "ON CONFLICT (worker_id, topic, fd) DO UPDATE SET " +
    "  principal_id = EXCLUDED.principal_id, " +
    "  kind         = EXCLUDED.kind, " +
    "  agent_id     = EXCLUDED.agent_id, " +
    "  since_ts     = EXCLUDED.since_ts, " +
    "  status_state = EXCLUDED.status_state, " +
    "  status_note  = EXCLUDED.status_note, " +
    "  status_until = EXCLUDED.status_until",
    [
      Tep::APP.presence_pg_worker_id,
      entry.topic,
      entry.fd.to_s,
      entry.principal_id,
      entry.kind.to_s,
      entry.agent_id,
      entry.since.to_s,
      entry.status_state.to_s,
      entry.status_note,
      entry.status_until.to_s
    ])
end

.mirror_status(topic, fd, state, note, until_ts) ⇒ Object

Mirror a status update.



511
512
513
514
515
516
517
518
519
520
521
# File 'lib/tep/presence.rb', line 511

def self.mirror_status(topic, fd, state, note, until_ts)
  if Tep::APP.presence_pg_enabled == 0
    return 0
  end
  Tep::Presence.mirror_exec(
    "UPDATE tep_presence " +
    "SET status_state = $4, status_note = $5, status_until = $6 " +
    "WHERE worker_id = $1 AND topic = $2 AND fd = $3",
    [Tep::APP.presence_pg_worker_id, topic, fd.to_s,
     state.to_s, note, until_ts.to_s])
end

.prune_stale_workers(ttl_seconds) ⇒ Object

Prune crashed-worker rows. Deletes:

1. tep_presence_worker rows whose last_seen_ts is older than
   ttl_seconds (the worker's heartbeat is stale).
2. tep_presence rows whose worker_id has no surviving
   heartbeat (orphans left by the crashed worker).

Apps call this periodically – the canonical shape is a before-filter on a “/health” route that internal monitoring hits every 30s, or a Tep::Job that fires from a cron-like tick. Returns the number of tep_presence rows deleted.

ttl_seconds should be at least 3x the app’s typical heartbeat interval so a transient slow response doesn’t evict a live worker. Default callers pass 90 (assumes 30s heartbeats).



423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
# File 'lib/tep/presence.rb', line 423

def self.prune_stale_workers(ttl_seconds)
  if Tep::APP.presence_pg_enabled == 0
    return 0
  end
  cutoff = Time.now.to_i - ttl_seconds
  conn = Tep::APP.presence_pg_conn
  begin
    # Drop dead heartbeats first; the second DELETE then walks
    # the worker_id space that's still alive.
    r1 = conn.exec_params(
      "DELETE FROM tep_presence_worker WHERE last_seen_ts < $1",
      [cutoff.to_s])
    r1.clear
    # Now drop presence rows whose worker_id isn't in the live
    # heartbeat table. NOT IN handles both crashed-and-pruned
    # workers and workers that never registered (legacy rows
    # from before this prune feature shipped).
    r2 = conn.exec(
      "DELETE FROM tep_presence " +
      "WHERE worker_id NOT IN (SELECT worker_id FROM tep_presence_worker)")
    n = r2.cmd_tuples
    r2.clear
  rescue PG::Error
    return 0
  end
  n
end

.publish_diff(kind, entry) ⇒ Object

Publish a diff via Tep::Broadcast. Subscribers to diff_topic(entry.topic) – typically WS connections via subscribe_ws – receive the encoded JSON payload as their next message. Returns the local-match count from publish (cross-worker delivery counts aren’t tracked here, same as Broadcast.publish’s documented behavior).



238
239
240
241
242
# File 'lib/tep/presence.rb', line 238

def self.publish_diff(kind, entry)
  payload = Tep::Presence.encode_diff(kind, entry)
  Tep::Broadcast.publish(
    Tep::Presence.diff_topic(entry.topic), payload)
end

.schema_sqlObject

CREATE TABLE statement, kept here so apps that want to provision the schema separately (migration runners, etc.) can grab the canonical DDL. Idempotent via IF NOT EXISTS.



348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
# File 'lib/tep/presence.rb', line 348

def self.schema_sql
  "CREATE TABLE IF NOT EXISTS tep_presence (" +
    "worker_id    TEXT NOT NULL, " +
    "topic        TEXT NOT NULL, " +
    "fd           INTEGER NOT NULL, " +
    "principal_id TEXT NOT NULL, " +
    "kind         TEXT NOT NULL, " +
    "agent_id     TEXT NOT NULL, " +
    "since_ts     BIGINT NOT NULL, " +
    "status_state TEXT NOT NULL, " +
    "status_note  TEXT NOT NULL, " +
    "status_until BIGINT NOT NULL, " +
    "PRIMARY KEY (worker_id, topic, fd)" +
  ")"
end

.set_status(topic, fd, state, note, until_ts) ⇒ Object

Set the structured status on an existing entry. ‘state` ∈:busy, :blocked; `note` is free text (~140 char soft hint); `until_ts` is unix epoch seconds (0 = no identity-level expiry). Returns 1 if the entry was found and updated, 0 otherwise. Emits a “status” diff on the topic’s presence channel on update.



159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/tep/presence.rb', line 159

def self.set_status(topic, fd, state, note, until_ts)
  entry = Tep::Presence.find_entry(topic, fd)
  if entry == nil
    return 0
  end
  entry.status_state = state
  entry.status_note  = note
  entry.status_until = until_ts
  Tep::Presence.mirror_status(topic, fd, state, note, until_ts)
  Tep::Presence.publish_diff("status", entry)
  1
end

.sweep_expired_statusObject

Sweep entries whose status_until has passed: reset to :available / “” / 0 and emit a “status” diff for each. Apps call this periodically (e.g. once per HTTP request, or in a background fiber once Scheduled is reliable). Returns the count of entries reset.



249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/tep/presence.rb', line 249

def self.sweep_expired_status
  entries = Tep::APP.presence_entries
  now = Time.now.to_i
  swept = 0
  i = 0
  while i < entries.length
    e = entries[i]
    if e.status_until > 0 && e.status_until <= now && e.status_state != :available
      e.status_state = :available
      e.status_note  = ""
      e.status_until = 0
      Tep::Presence.publish_diff("status", e)
      swept += 1
    end
    i += 1
  end
  swept
end

.track(req, topic, fd) ⇒ Object

Track a presence entry. principal_id comes off req.identity; kind is :human or :agent_for depending on the identity’s delegation state. fd is the underlying connection’s socket (typically a WS-accepted fd). Returns 0 on success.

Multiple track() calls for the same (principal, topic, fd) are deduped: the existing entry stays, no second row is created. Apps can call freely from before-filters / reconnect paths without growing the registry.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/tep/presence.rb', line 42

def self.track(req, topic, fd)
  ident = req.identity
  if Tep::Presence.find_entry(topic, fd) != nil
    return 0
  end
  kind = :human
  agent_id = ""
  if ident.agent?
    kind = :agent_for
    agent_id = ident.acting_via.agent_id
  end
  entry = Tep::PresenceEntry.new(
    topic, ident.principal_id, kind, agent_id, fd, Time.now.to_i)
  Tep::APP.presence_entries.push(entry)
  Tep::Presence.mirror_insert(entry)
  Tep::Presence.publish_diff("join", entry)
  0
end

.untrack(topic, fd) ⇒ Object

Drop the entry for (topic, fd). The fd is the unique key within a topic; principal_id isn’t needed. Returns 1 if an entry was removed, 0 if none matched. Emits a “leave” diff on the topic’s presence channel when removal happens.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/tep/presence.rb', line 65

def self.untrack(topic, fd)
  entries = Tep::APP.presence_entries
  i = 0
  while i < entries.length
    if entries[i].topic == topic && entries[i].fd == fd
      e = entries[i]
      entries.delete_at(i)
      Tep::Presence.mirror_delete(topic, fd)
      Tep::Presence.publish_diff("leave", e)
      return 1
    end
    i += 1
  end
  0
end

.untrack_by_fd(fd) ⇒ Object

Drop every entry associated with ‘fd` (across all topics). Used by the WS close hook to clean up everything a connection had tracked. Returns the count dropped. Emits one “leave” diff per dropped entry, on each entry’s topic’s presence channel.



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/tep/presence.rb', line 86

def self.untrack_by_fd(fd)
  entries = Tep::APP.presence_entries
  dropped = 0
  i = entries.length - 1
  while i >= 0
    if entries[i].fd == fd
      e = entries[i]
      entries.delete_at(i)
      Tep::Presence.mirror_delete(e.topic, fd)
      Tep::Presence.publish_diff("leave", e)
      dropped += 1
    end
    i -= 1
  end
  dropped
end

.worker_schema_sqlObject

Heartbeat table – one row per worker that’s mirroring presence right now. Used by prune_stale_workers to identify crashed workers (no heartbeat updates in N seconds) and garbage-collect their orphan tep_presence rows.



368
369
370
371
372
373
# File 'lib/tep/presence.rb', line 368

def self.worker_schema_sql
  "CREATE TABLE IF NOT EXISTS tep_presence_worker (" +
    "worker_id    TEXT PRIMARY KEY, " +
    "last_seen_ts BIGINT NOT NULL" +
  ")"
end