Class: Pcrd::Replication::Consumer
- Inherits:
-
Object
- Object
- Pcrd::Replication::Consumer
- Defined in:
- lib/pcrd/replication/consumer.rb
Overview
Streams WAL messages from a pgoutput logical replication slot and buffers complete transactions onto a Thread::Queue for the apply engine.
Protocol (inside each raw message from the server):
0x77 ('w') — XLogData: 1 type + 8 wal_start + 8 wal_end + 8 ts + payload
0x6B ('k') — Primary keepalive: 1 type + 8 wal_end + 8 ts + 1 reply_flag
The consumer must respond to keepalives with a StandbyStatusUpdate (‘r’) within wal_sender_timeout (default 60s) or the server drops the connection.
Thread model:
start — launches a background thread that drives the stream loop
stop — signals the thread to exit cleanly after the current poll
queue — Thread::Queue; pop from the apply engine side
advance_lsn — call from apply engine after each applied transaction
Defined Under Namespace
Classes: Transaction
Constant Summary collapse
- XLOG_DATA =
0x77- KEEPALIVE =
0x6B- PG_EPOCH_OFFSET_US =
946_684_800 * 1_000_000
- KEEPALIVE_INTERVAL =
seconds between proactive keepalives to server
10- WAIT_TIMEOUT =
max seconds per wait_readable; limits stop latency
1- DEFAULT_MAX_QUEUE =
Backpressure cap. The queue holds at most this many buffered transactions; once full, the stream loop stops reading new WAL (the server’s flow control kicks in) until the apply side drains it. This bounds memory during a long backfill instead of letting the queue grow without limit. Tuned via :max_queue.
10_000- FULL_QUEUE_BACKOFF =
seconds to wait between retries when full
0.05
Instance Attribute Summary collapse
-
#last_error ⇒ Object
readonly
Returns the value of attribute last_error.
-
#parser ⇒ Object
readonly
Returns the value of attribute parser.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
-
#advance_lsn(lsn_string) ⇒ Object
Called by the apply engine after a transaction has been applied.
-
#failed? ⇒ Boolean
True if the streaming thread exited because of an error.
-
#initialize(repl_conn:, parser:, slot_name:, pub_name:, start_lsn: "0/0", max_queue: DEFAULT_MAX_QUEUE) ⇒ Consumer
constructor
A new instance of Consumer.
-
#last_received_lsn ⇒ Object
Commit LSN of the most recent transaction buffered onto the queue, for observability (“how far has streaming read?”).
-
#queue_depth ⇒ Object
Number of buffered transactions waiting to be applied (backpressure gauge).
-
#start ⇒ Object
Opens the replication connection and starts the background thread.
-
#stop ⇒ Object
Signals the consumer to stop after the current poll cycle.
- #stopped? ⇒ Boolean
Constructor Details
#initialize(repl_conn:, parser:, slot_name:, pub_name:, start_lsn: "0/0", max_queue: DEFAULT_MAX_QUEUE) ⇒ Consumer
Returns a new instance of Consumer.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/pcrd/replication/consumer.rb', line 38 def initialize(repl_conn:, parser:, slot_name:, pub_name:, start_lsn: "0/0", max_queue: DEFAULT_MAX_QUEUE) @repl = repl_conn @parser = parser @slot_name = slot_name @pub_name = pub_name @start_lsn = start_lsn @queue = SizedQueue.new(max_queue) @stop = false @mutex = Mutex.new @conf_lsn = 0 # last applied LSN (int64); advanced by apply engine @last_received_lsn = nil # commit LSN of the most recently buffered txn @thread = nil end |
Instance Attribute Details
#last_error ⇒ Object (readonly)
Returns the value of attribute last_error.
53 54 55 |
# File 'lib/pcrd/replication/consumer.rb', line 53 def last_error @last_error end |
#parser ⇒ Object (readonly)
Returns the value of attribute parser.
53 54 55 |
# File 'lib/pcrd/replication/consumer.rb', line 53 def parser @parser end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
53 54 55 |
# File 'lib/pcrd/replication/consumer.rb', line 53 def queue @queue end |
Instance Method Details
#advance_lsn(lsn_string) ⇒ Object
Called by the apply engine after a transaction has been applied. Updates the LSN we report back to the server (WAL reclaim point).
94 95 96 97 |
# File 'lib/pcrd/replication/consumer.rb', line 94 def advance_lsn(lsn_string) int = lsn_to_int(lsn_string) @mutex.synchronize { @conf_lsn = [@conf_lsn, int].max } end |
#failed? ⇒ Boolean
True if the streaming thread exited because of an error. The apply side polls this when the queue drains empty so a dead consumer surfaces as a failure instead of looking like “caught up and idle”.
88 89 90 |
# File 'lib/pcrd/replication/consumer.rb', line 88 def failed? @mutex.synchronize { !@last_error.nil? } end |
#last_received_lsn ⇒ Object
Commit LSN of the most recent transaction buffered onto the queue, for observability (“how far has streaming read?”). nil until the first txn.
57 58 59 |
# File 'lib/pcrd/replication/consumer.rb', line 57 def last_received_lsn @mutex.synchronize { @last_received_lsn } end |
#queue_depth ⇒ Object
Number of buffered transactions waiting to be applied (backpressure gauge).
62 63 64 |
# File 'lib/pcrd/replication/consumer.rb', line 62 def queue_depth @queue.size end |
#start ⇒ Object
Opens the replication connection and starts the background thread.
67 68 69 70 71 72 |
# File 'lib/pcrd/replication/consumer.rb', line 67 def start @repl.open @repl.start_replication(slot_name: @slot_name, pub_name: @pub_name, start_lsn: @start_lsn) @thread = Thread.new { stream_loop } self end |
#stop ⇒ Object
Signals the consumer to stop after the current poll cycle.
75 76 77 78 79 |
# File 'lib/pcrd/replication/consumer.rb', line 75 def stop @mutex.synchronize { @stop = true } @thread&.join(5) @repl.close end |
#stopped? ⇒ Boolean
81 82 83 |
# File 'lib/pcrd/replication/consumer.rb', line 81 def stopped? @mutex.synchronize { @stop } end |