Module: Pgbus::Client::ReadAfter
- Included in:
- Pgbus::Client
- Defined in:
- lib/pgbus/client/read_after.rb
Overview
Non-consuming peek across PGMQ live (q_) and archive (a_) tables. Used
exclusively by Pgbus::Web::Streamer for SSE replay — workers continue to
use read_batch (claim semantics). The two read paths are disjoint.
The cursor is the highest msg_id the client has already seen. Replay returns
everything strictly greater, ordered by msg_id ASC, capped by limit.
Defined Under Namespace
Classes: Envelope
Constant Summary collapse
- DEFAULT_LIMIT =
500
Instance Method Summary collapse
- #read_after(stream_name, after_id:, limit: DEFAULT_LIMIT) ⇒ Object
- #stream_current_msg_id(stream_name) ⇒ Object
- #stream_oldest_msg_id(stream_name) ⇒ Object
Instance Method Details
#read_after(stream_name, after_id:, limit: DEFAULT_LIMIT) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/pgbus/client/read_after.rb', line 16 def read_after(stream_name, after_id:, limit: DEFAULT_LIMIT) sanitized = sanitized_queue(stream_name) sql = build_read_after_sql(sanitized) rows = synchronized do with_raw_connection do |conn| conn.exec_params(sql, [after_id.to_i, limit.to_i]).to_a end end rows.map { |row| build_envelope(row) } rescue StandardError => e raise unless missing_stream_queue?(e, sanitized) [] end |
#stream_current_msg_id(stream_name) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/pgbus/client/read_after.rb', line 33 def stream_current_msg_id(stream_name) sanitized = sanitized_queue(stream_name) sql = "SELECT COALESCE(MAX(msg_id), 0) AS max FROM pgmq.q_#{sanitized}" synchronized do with_raw_connection do |conn| conn.exec(sql).first.fetch("max").to_i end end rescue StandardError => e raise unless missing_stream_queue?(e, sanitized) 0 end |
#stream_oldest_msg_id(stream_name) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/pgbus/client/read_after.rb', line 47 def stream_oldest_msg_id(stream_name) sanitized = sanitized_queue(stream_name) sql = <<~SQL SELECT LEAST( (SELECT MIN(msg_id) FROM pgmq.q_#{sanitized}), (SELECT MIN(msg_id) FROM pgmq.a_#{sanitized}) ) AS least SQL synchronized do with_raw_connection do |conn| value = conn.exec(sql).first.fetch("least") value&.to_i end end rescue StandardError => e raise unless missing_stream_queue?(e, sanitized) nil end |