Module: Pgbus::Client::ReadAfter
- Included in:
- Pgbus::Client
- Defined in:
- lib/pgbus/client/read_after.rb
Overview
Non-consuming peek across PGMQ live (‘q_`) and archive (`a_`) tables. Used exclusively by `Pgbus::Web::Streamer` for SSE replay — workers continue to use `read_batch` (claim semantics). The two read paths are disjoint.
The cursor is the highest msg_id the client has already seen. Replay returns everything strictly greater, ordered by msg_id ASC, capped by ‘limit`.
Defined Under Namespace
Classes: Envelope
Constant Summary collapse
- DEFAULT_LIMIT =
500
Instance Method Summary collapse
- #read_after(stream_name, after_id:, limit: DEFAULT_LIMIT) ⇒ Object
- #stream_current_msg_id(stream_name) ⇒ Object
- #stream_oldest_msg_id(stream_name) ⇒ Object
Instance Method Details
#read_after(stream_name, after_id:, limit: DEFAULT_LIMIT) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/pgbus/client/read_after.rb', line 16 def read_after(stream_name, after_id:, limit: DEFAULT_LIMIT) sanitized = sanitized_queue(stream_name) sql = build_read_after_sql(sanitized) rows = synchronized do with_raw_connection do |conn| conn.exec_params(sql, [after_id.to_i, limit.to_i]).to_a end end rows.map { |row| build_envelope(row) } rescue StandardError => e raise unless missing_stream_queue?(e, sanitized) [] end |
#stream_current_msg_id(stream_name) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/pgbus/client/read_after.rb', line 33 def stream_current_msg_id(stream_name) sanitized = sanitized_queue(stream_name) sql = "SELECT COALESCE(MAX(msg_id), 0) AS max FROM pgmq.q_#{sanitized}" synchronized do with_raw_connection do |conn| conn.exec(sql).first.fetch("max").to_i end end rescue StandardError => e raise unless missing_stream_queue?(e, sanitized) 0 end |
#stream_oldest_msg_id(stream_name) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/pgbus/client/read_after.rb', line 47 def stream_oldest_msg_id(stream_name) sanitized = sanitized_queue(stream_name) sql = <<~SQL SELECT LEAST( (SELECT MIN(msg_id) FROM pgmq.q_#{sanitized}), (SELECT MIN(msg_id) FROM pgmq.a_#{sanitized}) ) AS least SQL synchronized do with_raw_connection do |conn| value = conn.exec(sql).first.fetch("least") value&.to_i end end rescue StandardError => e raise unless missing_stream_queue?(e, sanitized) nil end |