Class: Pcrd::Replication::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/pcrd/replication/consumer.rb

Overview

Streams WAL messages from a pgoutput logical replication slot and buffers complete transactions onto a Thread::Queue for the apply engine.

Protocol (inside each raw message from the server):

0x77 ('w') — XLogData: 1 type + 8 wal_start + 8 wal_end + 8 ts + payload
0x6B ('k') — Primary keepalive: 1 type + 8 wal_end + 8 ts + 1 reply_flag

The consumer must respond to keepalives with a StandbyStatusUpdate (‘r’) within wal_sender_timeout (default 60s) or the server drops the connection.

Thread model:

start        launches a background thread that drives the stream loop
stop         signals the thread to exit cleanly after the current poll
queue        Thread::Queue; pop from the apply engine side
advance_lsn  call from apply engine after each applied transaction

Defined Under Namespace

Classes: Transaction

Constant Summary collapse

XLOG_DATA =
0x77
KEEPALIVE =
0x6B
PG_EPOCH_OFFSET_US =
946_684_800 * 1_000_000
KEEPALIVE_INTERVAL =

seconds between proactive keepalives to server

10
WAIT_TIMEOUT =

max seconds per wait_readable; limits stop latency

1
DEFAULT_MAX_QUEUE =

Backpressure cap. The queue holds at most this many buffered transactions; once full, the stream loop stops reading new WAL (the server’s flow control kicks in) until the apply side drains it. This bounds memory during a long backfill instead of letting the queue grow without limit. Tuned via :max_queue.

10_000
FULL_QUEUE_BACKOFF =

seconds to wait between retries when full

0.05

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(repl_conn:, parser:, slot_name:, pub_name:, start_lsn: "0/0", max_queue: DEFAULT_MAX_QUEUE) ⇒ Consumer

Returns a new instance of Consumer.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/pcrd/replication/consumer.rb', line 38

def initialize(repl_conn:, parser:, slot_name:, pub_name:, start_lsn: "0/0",
               max_queue: DEFAULT_MAX_QUEUE)
  @repl      = repl_conn
  @parser    = parser
  @slot_name = slot_name
  @pub_name  = pub_name
  @start_lsn = start_lsn
  @queue     = SizedQueue.new(max_queue)
  @stop      = false
  @mutex     = Mutex.new
  @conf_lsn  = 0   # last applied LSN (int64); advanced by apply engine
  @last_received_lsn = nil # commit LSN of the most recently buffered txn
  @thread    = nil
end

Instance Attribute Details

#last_errorObject (readonly)

Returns the value of attribute last_error.



53
54
55
# File 'lib/pcrd/replication/consumer.rb', line 53

def last_error
  @last_error
end

#parserObject (readonly)

Returns the value of attribute parser.



53
54
55
# File 'lib/pcrd/replication/consumer.rb', line 53

def parser
  @parser
end

#queueObject (readonly)

Returns the value of attribute queue.



53
54
55
# File 'lib/pcrd/replication/consumer.rb', line 53

def queue
  @queue
end

Instance Method Details

#advance_lsn(lsn_string) ⇒ Object

Called by the apply engine after a transaction has been applied. Updates the LSN we report back to the server (WAL reclaim point).



94
95
96
97
# File 'lib/pcrd/replication/consumer.rb', line 94

def advance_lsn(lsn_string)
  int = lsn_to_int(lsn_string)
  @mutex.synchronize { @conf_lsn = [@conf_lsn, int].max }
end

#failed?Boolean

True if the streaming thread exited because of an error. The apply side polls this when the queue drains empty so a dead consumer surfaces as a failure instead of looking like “caught up and idle”.

Returns:

  • (Boolean)


88
89
90
# File 'lib/pcrd/replication/consumer.rb', line 88

def failed?
  @mutex.synchronize { !@last_error.nil? }
end

#last_received_lsnObject

Commit LSN of the most recent transaction buffered onto the queue, for observability (“how far has streaming read?”). nil until the first txn.



57
58
59
# File 'lib/pcrd/replication/consumer.rb', line 57

def last_received_lsn
  @mutex.synchronize { @last_received_lsn }
end

#queue_depthObject

Number of buffered transactions waiting to be applied (backpressure gauge).



62
63
64
# File 'lib/pcrd/replication/consumer.rb', line 62

def queue_depth
  @queue.size
end

#startObject

Opens the replication connection and starts the background thread.



67
68
69
70
71
72
# File 'lib/pcrd/replication/consumer.rb', line 67

def start
  @repl.open
  @repl.start_replication(slot_name: @slot_name, pub_name: @pub_name, start_lsn: @start_lsn)
  @thread = Thread.new { stream_loop }
  self
end

#stopObject

Signals the consumer to stop after the current poll cycle.



75
76
77
78
79
# File 'lib/pcrd/replication/consumer.rb', line 75

def stop
  @mutex.synchronize { @stop = true }
  @thread&.join(5)
  @repl.close
end

#stopped?Boolean

Returns:

  • (Boolean)


81
82
83
# File 'lib/pcrd/replication/consumer.rb', line 81

def stopped?
  @mutex.synchronize { @stop }
end