Module: Tep::Presence

Defined in:
lib/tep/pg.rb,
lib/tep/presence.rb

Class Method Summary collapse

Class Method Details

.clearObject

Drop every entry. Used by tests between fixtures and available to apps for graceful-shutdown cleanup. Returns the count dropped. Does NOT emit leave diffs (it’s a registry-management op, not a per-connection event).



195
196
197
198
199
200
201
202
# File 'lib/tep/presence.rb', line 195

def self.clear
  entries = Tep::APP.presence_entries
  n = entries.length
  while entries.length > 0
    entries.delete_at(0)
  end
  n
end

.clear_status(topic, fd) ⇒ Object

Reset an entry’s status back to :available / “” / 0.



173
174
175
# File 'lib/tep/presence.rb', line 173

def self.clear_status(topic, fd)
  Tep::Presence.set_status(topic, fd, :available, "", 0)
end

.count(topic) ⇒ Object

Total entries for ‘topic` (across all kinds).



122
123
124
# File 'lib/tep/presence.rb', line 122

def self.count(topic)
  Tep::Presence.count_filtered(topic, :both)
end

.count_agents(topic) ⇒ Object



130
131
132
# File 'lib/tep/presence.rb', line 130

def self.count_agents(topic)
  Tep::Presence.count_filtered(topic, :agent_for)
end

.count_filtered(topic, kind_filter) ⇒ Object

Internal counting helper: ‘kind_filter` is :both for all entries, otherwise :human or :agent_for to filter.



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/tep/presence.rb', line 136

def self.count_filtered(topic, kind_filter)
  entries = Tep::APP.presence_entries
  n = 0
  i = 0
  while i < entries.length
    if entries[i].topic == topic
      if kind_filter == :both
        n += 1
      elsif entries[i].kind == kind_filter
        n += 1
      end
    end
    i += 1
  end
  n
end

.count_global(topic) ⇒ Object



1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
# File 'lib/tep/pg.rb', line 1465

def self.count_global(topic)
  if Tep::APP.presence_pg_enabled == 0
    return 0
  end
  begin
    r = Tep::APP.presence_pg_conn.exec_params(
      "SELECT count(*) FROM tep_presence WHERE topic = $1",
      [topic])
  rescue PG::Error
    return 0
  end
  n = r.getvalue(0, 0).to_i
  r.clear
  n
end

.count_humans(topic) ⇒ Object



126
127
128
# File 'lib/tep/presence.rb', line 126

def self.count_humans(topic)
  Tep::Presence.count_filtered(topic, :human)
end

.diff_topic(topic) ⇒ Object

Compose the Broadcast topic for diff fan-out on a presence topic. WS subscribers register via Tep::Broadcast.subscribe_ws(diff_topic(“room:lobby”), ws_fd).



209
210
211
# File 'lib/tep/presence.rb', line 209

def self.diff_topic(topic)
  "presence:" + topic
end

.disable_pg_mirrorObject



1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
# File 'lib/tep/pg.rb', line 1258

def self.disable_pg_mirror
  if Tep::APP.presence_pg_enabled == 0
    return 0
  end
  # Best-effort cleanup -- swallow PG errors (we're tearing the
  # mirror down regardless) and still finish + disable below.
  begin
    r = Tep::APP.presence_pg_conn.exec_params(
      "DELETE FROM tep_presence WHERE worker_id = $1",
      [Tep::APP.presence_pg_worker_id])
    r.clear
    # Remove the heartbeat row so prune_stale_workers doesn't
    # see this worker as live after we're gone.
    r = Tep::APP.presence_pg_conn.exec_params(
      "DELETE FROM tep_presence_worker WHERE worker_id = $1",
      [Tep::APP.presence_pg_worker_id])
    r.clear
  rescue PG::Error
    # swallow -- shutting the mirror down anyway
  end
  Tep::APP.presence_pg_conn.finish
  Tep::APP.set_presence_pg_enabled(0)
  0
end

.enable_pg_mirror(conninfo) ⇒ Object



1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
# File 'lib/tep/pg.rb', line 1224

def self.enable_pg_mirror(conninfo)
  conn = PG::Connection.new(conninfo)
  if conn.pgh < 0
    return -1
  end
  # exec raises PG::Error on failure now; degrade gracefully
  # (close + return -1) rather than letting it escape the worker.
  begin
    r = conn.exec(Tep::Presence.schema_sql)
    r.clear
    # Heartbeat table for the prune-stale-workers path (#47).
    r = conn.exec(Tep::Presence.worker_schema_sql)
    r.clear
  rescue PG::Error
    conn.finish
    return -1
  end
  Tep::APP.set_presence_pg_conn(conn)
  worker_id = Sock.sphttp_getpid.to_s + "-" + Time.now.to_i.to_s
  Tep::APP.set_presence_pg_worker_id(worker_id)
  Tep::APP.set_presence_pg_enabled(1)
  # Drop any rows from a prior worker that managed to leave
  # stale entries with this same worker_id (unlikely thanks
  # to the boot-epoch suffix, but defensive). Best-effort.
  Tep::Presence.mirror_exec(
    "DELETE FROM tep_presence WHERE worker_id = $1",
    [worker_id])
  # Register this worker's heartbeat row immediately. Apps
  # refresh it periodically via Tep::Presence.heartbeat;
  # prune_stale_workers deletes rows whose heartbeat is stale.
  Tep::Presence.heartbeat
  0
end

.encode_diff(kind, entry) ⇒ Object

Flat-JSON wire format for a diff event. ‘kind` is one of “join” / “leave” / “status”. SpinelKit::Json’s flat-object extractors handle this on the client side (or any JSON-aware peer).



217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/tep/presence.rb', line 217

def self.encode_diff(kind, entry)
  "{" +
    SpinelKit::Json.encode_pair_str("kind", kind) + "," +
    SpinelKit::Json.encode_pair_str("topic", entry.topic) + "," +
    SpinelKit::Json.encode_pair_str("principal", entry.principal_id) + "," +
    SpinelKit::Json.encode_pair_str("ekind", entry.kind.to_s) + "," +
    SpinelKit::Json.encode_pair_str("agent_id", entry.agent_id) + "," +
    SpinelKit::Json.encode_pair_int("fd", entry.fd) + "," +
    SpinelKit::Json.encode_pair_int("since", entry.since) + "," +
    SpinelKit::Json.encode_pair_str("state", entry.status_state.to_s) + "," +
    SpinelKit::Json.encode_pair_str("note", entry.status_note) + "," +
    SpinelKit::Json.encode_pair_int("until_ts", entry.status_until) +
  "}"
end

.find_entry(topic, fd) ⇒ Object

Internal: find the entry matching (topic, fd). Returns nil if no match.



179
180
181
182
183
184
185
186
187
188
189
# File 'lib/tep/presence.rb', line 179

def self.find_entry(topic, fd)
  entries = Tep::APP.presence_entries
  i = 0
  while i < entries.length
    if entries[i].topic == topic && entries[i].fd == fd
      return entries[i]
    end
    i += 1
  end
  nil
end

.heartbeatObject



1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
# File 'lib/tep/pg.rb', line 1306

def self.heartbeat
  if Tep::APP.presence_pg_enabled == 0
    return 0
  end
  wid = Tep::APP.presence_pg_worker_id
  if wid.length == 0
    return 0
  end
  begin
    r = Tep::APP.presence_pg_conn.exec_params(
      "INSERT INTO tep_presence_worker (worker_id, last_seen_ts) " +
      "VALUES ($1, $2) " +
      "ON CONFLICT (worker_id) DO UPDATE SET " +
      "  last_seen_ts = EXCLUDED.last_seen_ts",
      [wid, Time.now.to_i.to_s])
    r.clear
  rescue PG::Error
    return 0
  end
  1
end

.list(topic) ⇒ Object

All entries for ‘topic`. Caller groups by principal_id when they want the Phoenix.Presence-style => [metas] shape; tep doesn’t pre-group because spinel’s nested-hash lowering is awkward.



107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/tep/presence.rb', line 107

def self.list(topic)
  result = [Tep::PresenceEntry.new("", "", :human, "", -1, 0)]
  result.delete_at(0)
  entries = Tep::APP.presence_entries
  i = 0
  while i < entries.length
    if entries[i].topic == topic
      result.push(entries[i])
    end
    i += 1
  end
  result
end

.list_global(topic) ⇒ Object



1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
# File 'lib/tep/pg.rb', line 1419

def self.list_global(topic)
  result = [Tep::PresenceEntry.new("", "", :human, "", -1, 0)]
  result.delete_at(0)
  if Tep::APP.presence_pg_enabled == 0
    return result
  end
  begin
    r = Tep::APP.presence_pg_conn.exec_params(
      "SELECT principal_id, kind, agent_id, fd, since_ts, " +
      "       status_state, status_note, status_until " +
      "FROM tep_presence WHERE topic = $1 ORDER BY since_ts",
      [topic])
  rescue PG::Error
    return result
  end
  i = 0
  n = r.ntuples
  while i < n
    kind_sym = :human
    if r.getvalue(i, 1) == "agent_for"
      kind_sym = :agent_for
    end
    state_sym = :available
    sstr = r.getvalue(i, 5)
    if sstr == "busy"
      state_sym = :busy
    elsif sstr == "blocked"
      state_sym = :blocked
    end
    e = Tep::PresenceEntry.new(
      topic,
      r.getvalue(i, 0),
      kind_sym,
      r.getvalue(i, 2),
      r.getvalue(i, 3).to_i,
      r.getvalue(i, 4).to_i)
    e.status_state = state_sym
    e.status_note  = r.getvalue(i, 6)
    e.status_until = r.getvalue(i, 7).to_i
    result.push(e)
    i += 1
  end
  r.clear
  result
end

.mirror_delete(topic, fd) ⇒ Object



1397
1398
1399
1400
1401
1402
1403
1404
1405
# File 'lib/tep/pg.rb', line 1397

def self.mirror_delete(topic, fd)
  if Tep::APP.presence_pg_enabled == 0
    return 0
  end
  Tep::Presence.mirror_exec(
    "DELETE FROM tep_presence " +
    "WHERE worker_id = $1 AND topic = $2 AND fd = $3",
    [Tep::APP.presence_pg_worker_id, topic, fd.to_s])
end

.mirror_exec(sql, params) ⇒ Object



1356
1357
1358
1359
1360
1361
1362
1363
1364
# File 'lib/tep/pg.rb', line 1356

def self.mirror_exec(sql, params)
  begin
    r = Tep::APP.presence_pg_conn.exec_params(sql, params)
    r.clear
  rescue PG::Error
    # swallow -- advisory mirror, local presence is authoritative
  end
  0
end

.mirror_insert(entry) ⇒ Object

—- PG mirror (cross-worker visibility) —-

The presence PG mirror is OPT-IN (#216). enable_pg_mirror / disable_pg_mirror / heartbeat / prune_stale_workers / mirror_exec / list_global / count_global / the schema DDL, and the REAL bodies of the mirror_* hooks below, live in lib/tep/pg.rb and only compile into apps that ‘require “tep/pg”`. Core Presence keeps no PG reference, so a non-PG app DCEs the libpq closure entirely.

track / untrack / set_status call these mirror_* hooks unconditionally; the core definitions are no-ops. ‘require “tep/pg”` redefines them (last-definition-wins) with the real exec_params writes.



281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/tep/presence.rb', line 281

def self.mirror_insert(entry)
  if Tep::APP.presence_pg_enabled == 0
    return 0
  end
  Tep::Presence.mirror_exec(
    "INSERT INTO tep_presence " +
    "(worker_id, topic, fd, principal_id, kind, agent_id, " +
    " since_ts, status_state, status_note, status_until) " +
    "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) " +
    "ON CONFLICT (worker_id, topic, fd) DO UPDATE SET " +
    "  principal_id = EXCLUDED.principal_id, " +
    "  kind         = EXCLUDED.kind, " +
    "  agent_id     = EXCLUDED.agent_id, " +
    "  since_ts     = EXCLUDED.since_ts, " +
    "  status_state = EXCLUDED.status_state, " +
    "  status_note  = EXCLUDED.status_note, " +
    "  status_until = EXCLUDED.status_until",
    [
      Tep::APP.presence_pg_worker_id,
      entry.topic,
      entry.fd.to_s,
      entry.principal_id,
      entry.kind.to_s,
      entry.agent_id,
      entry.since.to_s,
      entry.status_state.to_s,
      entry.status_note,
      entry.status_until.to_s
    ])
end

.mirror_status(topic, fd, state, note, until_ts) ⇒ Object



1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
# File 'lib/tep/pg.rb', line 1407

def self.mirror_status(topic, fd, state, note, until_ts)
  if Tep::APP.presence_pg_enabled == 0
    return 0
  end
  Tep::Presence.mirror_exec(
    "UPDATE tep_presence " +
    "SET status_state = $4, status_note = $5, status_until = $6 " +
    "WHERE worker_id = $1 AND topic = $2 AND fd = $3",
    [Tep::APP.presence_pg_worker_id, topic, fd.to_s,
     state.to_s, note, until_ts.to_s])
end

.prune_stale_workers(ttl_seconds) ⇒ Object



1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
# File 'lib/tep/pg.rb', line 1328

def self.prune_stale_workers(ttl_seconds)
  if Tep::APP.presence_pg_enabled == 0
    return 0
  end
  cutoff = Time.now.to_i - ttl_seconds
  conn = Tep::APP.presence_pg_conn
  begin
    # Drop dead heartbeats first; the second DELETE then walks
    # the worker_id space that's still alive.
    r1 = conn.exec_params(
      "DELETE FROM tep_presence_worker WHERE last_seen_ts < $1",
      [cutoff.to_s])
    r1.clear
    # Now drop presence rows whose worker_id isn't in the live
    # heartbeat table. NOT IN handles both crashed-and-pruned
    # workers and workers that never registered (legacy rows
    # from before this prune feature shipped).
    r2 = conn.exec(
      "DELETE FROM tep_presence " +
      "WHERE worker_id NOT IN (SELECT worker_id FROM tep_presence_worker)")
    n = r2.cmd_tuples
    r2.clear
  rescue PG::Error
    return 0
  end
  n
end

.publish_diff(kind, entry) ⇒ Object

Publish a diff via Tep::Broadcast. Subscribers to diff_topic(entry.topic) – typically WS connections via subscribe_ws – receive the encoded JSON payload as their next message. Returns the local-match count from publish (cross-worker delivery counts aren’t tracked here, same as Broadcast.publish’s documented behavior).



238
239
240
241
242
# File 'lib/tep/presence.rb', line 238

def self.publish_diff(kind, entry)
  payload = Tep::Presence.encode_diff(kind, entry)
  Tep::Broadcast.publish(
    Tep::Presence.diff_topic(entry.topic), payload)
end

.schema_sqlObject



1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
# File 'lib/tep/pg.rb', line 1283

def self.schema_sql
  "CREATE TABLE IF NOT EXISTS tep_presence (" +
    "worker_id    TEXT NOT NULL, " +
    "topic        TEXT NOT NULL, " +
    "fd           INTEGER NOT NULL, " +
    "principal_id TEXT NOT NULL, " +
    "kind         TEXT NOT NULL, " +
    "agent_id     TEXT NOT NULL, " +
    "since_ts     BIGINT NOT NULL, " +
    "status_state TEXT NOT NULL, " +
    "status_note  TEXT NOT NULL, " +
    "status_until BIGINT NOT NULL, " +
    "PRIMARY KEY (worker_id, topic, fd)" +
  ")"
end

.set_status(topic, fd, state, note, until_ts) ⇒ Object

Set the structured status on an existing entry. ‘state` ∈:busy, :blocked; `note` is free text (~140 char soft hint); `until_ts` is unix epoch seconds (0 = no identity-level expiry). Returns 1 if the entry was found and updated, 0 otherwise. Emits a “status” diff on the topic’s presence channel on update.



159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/tep/presence.rb', line 159

def self.set_status(topic, fd, state, note, until_ts)
  entry = Tep::Presence.find_entry(topic, fd)
  if entry == nil
    return 0
  end
  entry.status_state = state
  entry.status_note  = note
  entry.status_until = until_ts
  Tep::Presence.mirror_status(topic, fd, state, note, until_ts)
  Tep::Presence.publish_diff("status", entry)
  1
end

.sweep_expired_statusObject

Sweep entries whose status_until has passed: reset to :available / “” / 0 and emit a “status” diff for each. Apps call this periodically (e.g. once per HTTP request, or in a background fiber once Scheduled is reliable). Returns the count of entries reset.



249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/tep/presence.rb', line 249

def self.sweep_expired_status
  entries = Tep::APP.presence_entries
  now = Time.now.to_i
  swept = 0
  i = 0
  while i < entries.length
    e = entries[i]
    if e.status_until > 0 && e.status_until <= now && e.status_state != :available
      e.status_state = :available
      e.status_note  = ""
      e.status_until = 0
      Tep::Presence.publish_diff("status", e)
      swept += 1
    end
    i += 1
  end
  swept
end

.track(req, topic, fd) ⇒ Object

Track a presence entry. principal_id comes off req.identity; kind is :human or :agent_for depending on the identity’s delegation state. fd is the underlying connection’s socket (typically a WS-accepted fd). Returns 0 on success.

Multiple track() calls for the same (principal, topic, fd) are deduped: the existing entry stays, no second row is created. Apps can call freely from before-filters / reconnect paths without growing the registry.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/tep/presence.rb', line 42

def self.track(req, topic, fd)
  ident = req.identity
  if Tep::Presence.find_entry(topic, fd) != nil
    return 0
  end
  kind = :human
  agent_id = ""
  if ident.agent?
    kind = :agent_for
    agent_id = ident.acting_via.agent_id
  end
  entry = Tep::PresenceEntry.new(
    topic, ident.principal_id, kind, agent_id, fd, Time.now.to_i)
  Tep::APP.presence_entries.push(entry)
  Tep::Presence.mirror_insert(entry)
  Tep::Presence.publish_diff("join", entry)
  0
end

.untrack(topic, fd) ⇒ Object

Drop the entry for (topic, fd). The fd is the unique key within a topic; principal_id isn’t needed. Returns 1 if an entry was removed, 0 if none matched. Emits a “leave” diff on the topic’s presence channel when removal happens.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/tep/presence.rb', line 65

def self.untrack(topic, fd)
  entries = Tep::APP.presence_entries
  i = 0
  while i < entries.length
    if entries[i].topic == topic && entries[i].fd == fd
      e = entries[i]
      entries.delete_at(i)
      Tep::Presence.mirror_delete(topic, fd)
      Tep::Presence.publish_diff("leave", e)
      return 1
    end
    i += 1
  end
  0
end

.untrack_by_fd(fd) ⇒ Object

Drop every entry associated with ‘fd` (across all topics). Used by the WS close hook to clean up everything a connection had tracked. Returns the count dropped. Emits one “leave” diff per dropped entry, on each entry’s topic’s presence channel.



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/tep/presence.rb', line 86

def self.untrack_by_fd(fd)
  entries = Tep::APP.presence_entries
  dropped = 0
  i = entries.length - 1
  while i >= 0
    if entries[i].fd == fd
      e = entries[i]
      entries.delete_at(i)
      Tep::Presence.mirror_delete(e.topic, fd)
      Tep::Presence.publish_diff("leave", e)
      dropped += 1
    end
    i -= 1
  end
  dropped
end

.worker_schema_sqlObject



1299
1300
1301
1302
1303
1304
# File 'lib/tep/pg.rb', line 1299

def self.worker_schema_sql
  "CREATE TABLE IF NOT EXISTS tep_presence_worker (" +
    "worker_id    TEXT PRIMARY KEY, " +
    "last_seen_ts BIGINT NOT NULL" +
  ")"
end