Class: Pgbus::Streams::Presence

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/streams/presence.rb

Overview

Presence tracking for SSE streams. A “presence member” is an arbitrary identifier (typically a user id) marked as currently subscribed to a given stream. Members live in pgbus_presence_members with a (stream_name, member_id) primary key and a ‘last_seen_at` timestamp; the table is the source of truth across all Puma workers and Falcon reactors.

Typical usage in a controller:

def show
  @room = Room.find(params[:id])
  Pgbus.stream(@room).presence.join(
    member_id: current_user.id.to_s,
    metadata: { name: current_user.name, avatar: current_user.avatar_url }
  )
end

And when the user leaves:

Pgbus.stream(@room).presence.leave(member_id: current_user.id.to_s)

Reading the current member list:

Pgbus.stream(@room).presence.members
  # => [{ "id" => "7", "metadata" => {"name" => ..., "avatar" => ...},
  #      "joined_at" => "...", "last_seen_at" => "..." }]

The join/leave operations also fire a stream broadcast (via the caller-provided block) so connected clients see the change in real time. The library emits the broadcast through the regular pgbus stream pipeline, which means it’s transactional, replayable, and filterable just like any other broadcast.

NOT included in this v1:

- Automatic connection-driven join/leave (the application must
  call join/leave explicitly).
- Automatic stale-member sweeping (call Presence.sweep!(stream)
  from a cron or after a heartbeat to expire idle members).
- Built-in DOM events on the <pgbus-stream-source> element
  (the application's broadcast block decides the HTML).

Instance Method Summary collapse

Constructor Details

#initialize(stream) ⇒ Presence

Returns a new instance of Presence.



46
47
48
# File 'lib/pgbus/streams/presence.rb', line 46

def initialize(stream)
  @stream = stream
end

Instance Method Details

#countObject

Returns the count of current members. Faster than members.size because it doesn’t deserialize metadata.



117
118
119
120
121
122
# File 'lib/pgbus/streams/presence.rb', line 117

def count
  rows = connection.exec_params(<<~SQL, [@stream.name])
    SELECT COUNT(*) AS n FROM pgbus_presence_members WHERE stream_name = $1
  SQL
  rows.first["n"].to_i
end

#join(member_id:, metadata: {}) ⇒ Object

Adds (or refreshes) a member on this stream. Idempotent: calling join twice with the same member_id updates last_seen_at and metadata without creating a duplicate row. Yields the member hash to the optional block so the caller can render an ‘<turbo-stream action=“append”>` to broadcast the join.



55
56
57
58
59
60
61
62
63
64
65
# File 'lib/pgbus/streams/presence.rb', line 55

def join(member_id:, metadata: {})
  member_id = member_id.to_s
  record = upsert_member(member_id, )

  if block_given?
    html = yield(record)
    @stream.broadcast(html) if html.is_a?(String) && !html.empty?
  end

  record
end

#leave(member_id:) ⇒ Object

Removes a member. Yields the (former) member hash to the optional block so the caller can broadcast a ‘remove` action. Returns the deleted record, or nil if the member wasn’t present.



70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/pgbus/streams/presence.rb', line 70

def leave(member_id:)
  member_id = member_id.to_s
  record = delete_member(member_id)
  return nil unless record

  if block_given?
    html = yield(record)
    @stream.broadcast(html) if html.is_a?(String) && !html.empty?
  end

  record
end

#membersObject

Returns the current list of members on this stream as an array of hashes (id, metadata, joined_at, last_seen_at).



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/pgbus/streams/presence.rb', line 98

def members
  rows = connection.exec_params(<<~SQL, [@stream.name])
    SELECT member_id, metadata, joined_at, last_seen_at
    FROM pgbus_presence_members
    WHERE stream_name = $1
    ORDER BY joined_at
  SQL
  rows.to_a.map do |row|
    {
      "id" => row["member_id"],
      "metadata" => (row["metadata"]),
      "joined_at" => row["joined_at"],
      "last_seen_at" => row["last_seen_at"]
    }
  end
end

#sweep!(older_than:) ⇒ Object

Removes members whose last_seen_at is older than the given cutoff. Returns the number of expired rows. Use as a sweeper:

Pgbus.stream("room:42").presence.sweep!(older_than: 60.seconds.ago)

Atomically claims the deletion via DELETE RETURNING so multiple workers running the sweep concurrently won’t double-emit leave events.



132
133
134
135
136
137
138
139
# File 'lib/pgbus/streams/presence.rb', line 132

def sweep!(older_than:)
  rows = connection.exec_params(<<~SQL, [@stream.name, older_than])
    DELETE FROM pgbus_presence_members
    WHERE stream_name = $1 AND last_seen_at < $2
    RETURNING member_id, metadata, joined_at, last_seen_at
  SQL
  rows.to_a.size
end

#touch(member_id:) ⇒ Object

Refreshes the last_seen_at timestamp without re-broadcasting. Used as a heartbeat mechanism: clients ping touch periodically to stay in the member list, and a sweeper expires anyone who hasn’t pinged in N seconds.



87
88
89
90
91
92
93
94
# File 'lib/pgbus/streams/presence.rb', line 87

def touch(member_id:)
  member_id = member_id.to_s
  connection.exec_params(<<~SQL, [@stream.name, member_id])
    UPDATE pgbus_presence_members
    SET last_seen_at = NOW()
    WHERE stream_name = $1 AND member_id = $2
  SQL
end