Class: Pgbus::Streams::Presence
- Inherits:
-
Object
- Object
- Pgbus::Streams::Presence
- Defined in:
- lib/pgbus/streams/presence.rb
Overview
Presence tracking for SSE streams. A “presence member” is an arbitrary identifier (typically a user id) marked as currently subscribed to a given stream. Members live in pgbus_presence_members with a (stream_name, member_id) primary key and a ‘last_seen_at` timestamp; the table is the source of truth across all Puma workers and Falcon reactors.
Typical usage in a controller:
def show
@room = Room.find(params[:id])
Pgbus.stream(@room).presence.join(
member_id: current_user.id.to_s,
metadata: { name: current_user.name, avatar: current_user.avatar_url }
)
end
And when the user leaves:
Pgbus.stream(@room).presence.leave(member_id: current_user.id.to_s)
Reading the current member list:
Pgbus.stream(@room).presence.members
# => [{ "id" => "7", "metadata" => {"name" => ..., "avatar" => ...},
# "joined_at" => "...", "last_seen_at" => "..." }]
The join/leave operations also fire a stream broadcast (via the caller-provided block) so connected clients see the change in real time. The library emits the broadcast through the regular pgbus stream pipeline, which means it’s transactional, replayable, and filterable just like any other broadcast.
NOT included in this v1:
- Automatic connection-driven join/leave (the application must
call join/leave explicitly).
- Automatic stale-member sweeping (call Presence.sweep!(stream)
from a cron or after a heartbeat to expire idle members).
- Built-in DOM events on the <pgbus-stream-source> element
(the application's broadcast block decides the HTML).
Instance Method Summary collapse
-
#count ⇒ Object
Returns the count of current members.
-
#initialize(stream) ⇒ Presence
constructor
A new instance of Presence.
-
#join(member_id:, metadata: {}) ⇒ Object
Adds (or refreshes) a member on this stream.
-
#leave(member_id:) ⇒ Object
Removes a member.
-
#members ⇒ Object
Returns the current list of members on this stream as an array of hashes (id, metadata, joined_at, last_seen_at).
-
#sweep!(older_than:) ⇒ Object
Removes members whose last_seen_at is older than the given cutoff.
-
#touch(member_id:) ⇒ Object
Refreshes the last_seen_at timestamp without re-broadcasting.
Constructor Details
#initialize(stream) ⇒ Presence
Returns a new instance of Presence.
46 47 48 |
# File 'lib/pgbus/streams/presence.rb', line 46 def initialize(stream) @stream = stream end |
Instance Method Details
#count ⇒ Object
Returns the count of current members. Faster than members.size because it doesn’t deserialize metadata.
117 118 119 120 121 122 |
# File 'lib/pgbus/streams/presence.rb', line 117 def count rows = connection.exec_params(<<~SQL, [@stream.name]) SELECT COUNT(*) AS n FROM pgbus_presence_members WHERE stream_name = $1 SQL rows.first["n"].to_i end |
#join(member_id:, metadata: {}) ⇒ Object
Adds (or refreshes) a member on this stream. Idempotent: calling join twice with the same member_id updates last_seen_at and metadata without creating a duplicate row. Yields the member hash to the optional block so the caller can render an ‘<turbo-stream action=“append”>` to broadcast the join.
55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/pgbus/streams/presence.rb', line 55 def join(member_id:, metadata: {}) member_id = member_id.to_s record = upsert_member(member_id, ) if block_given? html = yield(record) @stream.broadcast(html) if html.is_a?(String) && !html.empty? end record end |
#leave(member_id:) ⇒ Object
Removes a member. Yields the (former) member hash to the optional block so the caller can broadcast a ‘remove` action. Returns the deleted record, or nil if the member wasn’t present.
70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/pgbus/streams/presence.rb', line 70 def leave(member_id:) member_id = member_id.to_s record = delete_member(member_id) return nil unless record if block_given? html = yield(record) @stream.broadcast(html) if html.is_a?(String) && !html.empty? end record end |
#members ⇒ Object
Returns the current list of members on this stream as an array of hashes (id, metadata, joined_at, last_seen_at).
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/pgbus/streams/presence.rb', line 98 def members rows = connection.exec_params(<<~SQL, [@stream.name]) SELECT member_id, metadata, joined_at, last_seen_at FROM pgbus_presence_members WHERE stream_name = $1 ORDER BY joined_at SQL rows.to_a.map do |row| { "id" => row["member_id"], "metadata" => (row["metadata"]), "joined_at" => row["joined_at"], "last_seen_at" => row["last_seen_at"] } end end |
#sweep!(older_than:) ⇒ Object
Removes members whose last_seen_at is older than the given cutoff. Returns the number of expired rows. Use as a sweeper:
Pgbus.stream("room:42").presence.sweep!(older_than: 60.seconds.ago)
Atomically claims the deletion via DELETE RETURNING so multiple workers running the sweep concurrently won’t double-emit leave events.
132 133 134 135 136 137 138 139 |
# File 'lib/pgbus/streams/presence.rb', line 132 def sweep!(older_than:) rows = connection.exec_params(<<~SQL, [@stream.name, older_than]) DELETE FROM pgbus_presence_members WHERE stream_name = $1 AND last_seen_at < $2 RETURNING member_id, metadata, joined_at, last_seen_at SQL rows.to_a.size end |
#touch(member_id:) ⇒ Object
Refreshes the last_seen_at timestamp without re-broadcasting. Used as a heartbeat mechanism: clients ping touch periodically to stay in the member list, and a sweeper expires anyone who hasn’t pinged in N seconds.
87 88 89 90 91 92 93 94 |
# File 'lib/pgbus/streams/presence.rb', line 87 def touch(member_id:) member_id = member_id.to_s connection.exec_params(<<~SQL, [@stream.name, member_id]) UPDATE pgbus_presence_members SET last_seen_at = NOW() WHERE stream_name = $1 AND member_id = $2 SQL end |