Module: Pgbus::Client::ReadAfter

Included in:
Pgbus::Client
Defined in:
lib/pgbus/client/read_after.rb

Overview

Non-consuming peek across PGMQ live (‘q_`) and archive (`a_`) tables. Used exclusively by `Pgbus::Web::Streamer` for SSE replay — workers continue to use `read_batch` (claim semantics). The two read paths are disjoint.

The cursor is the highest msg_id the client has already seen. Replay returns everything strictly greater, ordered by msg_id ASC, capped by ‘limit`.

Defined Under Namespace

Classes: Envelope

Constant Summary collapse

DEFAULT_LIMIT =
500

Instance Method Summary collapse

Instance Method Details

#read_after(stream_name, after_id:, limit: DEFAULT_LIMIT) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/pgbus/client/read_after.rb', line 16

def read_after(stream_name, after_id:, limit: DEFAULT_LIMIT)
  sanitized = sanitized_queue(stream_name)
  sql = build_read_after_sql(sanitized)

  rows = synchronized do
    with_raw_connection do |conn|
      conn.exec_params(sql, [after_id.to_i, limit.to_i]).to_a
    end
  end

  rows.map { |row| build_envelope(row) }
rescue StandardError => e
  raise unless missing_stream_queue?(e, sanitized)

  []
end

#stream_current_msg_id(stream_name) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/pgbus/client/read_after.rb', line 33

def stream_current_msg_id(stream_name)
  sanitized = sanitized_queue(stream_name)
  sql = "SELECT COALESCE(MAX(msg_id), 0) AS max FROM pgmq.q_#{sanitized}"
  synchronized do
    with_raw_connection do |conn|
      conn.exec(sql).first.fetch("max").to_i
    end
  end
rescue StandardError => e
  raise unless missing_stream_queue?(e, sanitized)

  0
end

#stream_oldest_msg_id(stream_name) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/pgbus/client/read_after.rb', line 47

def stream_oldest_msg_id(stream_name)
  sanitized = sanitized_queue(stream_name)
  sql = <<~SQL
    SELECT LEAST(
      (SELECT MIN(msg_id) FROM pgmq.q_#{sanitized}),
      (SELECT MIN(msg_id) FROM pgmq.a_#{sanitized})
    ) AS least
  SQL
  synchronized do
    with_raw_connection do |conn|
      value = conn.exec(sql).first.fetch("least")
      value&.to_i
    end
  end
rescue StandardError => e
  raise unless missing_stream_queue?(e, sanitized)

  nil
end