Class: Pgoutput::Client::Stream Private
- Inherits:
-
Object
- Object
- Pgoutput::Client::Stream
- Defined in:
- lib/pgoutput/client/stream.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Logical replication stream loop.
‘Stream` consumes PostgreSQL CopyData payloads from a Connection. It understands the two replication envelope message types used by PostgreSQL’s streaming replication protocol:
-
‘w` — XLogData, containing logical decoding plugin payload bytes.
-
‘k` — primary keepalive, optionally requesting immediate feedback.
The stream yields only XLogData plugin payloads. Keepalive messages are handled internally by updating the latest known WAL position and sending standby feedback when requested.
Feedback separates receipt from downstream acknowledgment. Received LSN follows the latest WAL position seen from PostgreSQL. Flushed/applied LSN follows #ack, allowing downstream consumers to decide when a WAL position is safe to report as durable.
Instance Attribute Summary collapse
-
#acked_lsn ⇒ Integer
readonly
private
Latest downstream-acknowledged WAL position used as flushed/applied LSN in standby feedback.
-
#last_keepalive_at ⇒ Time?
readonly
private
Last time a primary keepalive was observed.
-
#latest_lsn ⇒ Integer
readonly
private
Latest WAL position observed from XLogData or keepalive messages.
Instance Method Summary collapse
-
#ack(lsn) ⇒ Integer
private
Mark a WAL position as durably handled by downstream code.
-
#initialize(connection:, configuration:, acked_lsn: nil) ⇒ void
constructor
private
Build a stream loop.
-
#running? ⇒ Boolean
private
Whether the stream loop is active.
-
#start {|payload, metadata| ... } ⇒ void
private
Start the stream loop.
-
#stop ⇒ void
private
Stop the stream loop after the current iteration.
Constructor Details
#initialize(connection:, configuration:, acked_lsn: nil) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Build a stream loop.
48 49 50 51 52 53 54 55 56 57 |
# File 'lib/pgoutput/client/stream.rb', line 48 def initialize(connection:, configuration:, acked_lsn: nil) @connection = connection @configuration = configuration @latest_lsn = LSN.parse(configuration.start_lsn_string) @acked_lsn = acked_lsn ? normalize_lsn_value(acked_lsn) : @latest_lsn @last_feedback_at = monotonic_time @last_keepalive_at = nil @running = false @stop_requested = false end |
Instance Attribute Details
#acked_lsn ⇒ Integer (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Latest downstream-acknowledged WAL position used as flushed/applied LSN in standby feedback.
34 35 36 |
# File 'lib/pgoutput/client/stream.rb', line 34 def acked_lsn @acked_lsn end |
#last_keepalive_at ⇒ Time? (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Last time a primary keepalive was observed.
39 40 41 |
# File 'lib/pgoutput/client/stream.rb', line 39 def last_keepalive_at @last_keepalive_at end |
#latest_lsn ⇒ Integer (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Latest WAL position observed from XLogData or keepalive messages.
28 29 30 |
# File 'lib/pgoutput/client/stream.rb', line 28 def latest_lsn @latest_lsn end |
Instance Method Details
#ack(lsn) ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Mark a WAL position as durably handled by downstream code.
The stream never decides sink durability on its own. Downstream code may call this after checkpointing, enqueueing, or otherwise making progress durable. Feedback sent after this call reports the acknowledged LSN as both flushed and applied.
118 119 120 121 |
# File 'lib/pgoutput/client/stream.rb', line 118 def ack(lsn) parsed = normalize_lsn_value(lsn) @acked_lsn = [@acked_lsn, parsed].max end |
#running? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Whether the stream loop is active.
105 106 107 |
# File 'lib/pgoutput/client/stream.rb', line 105 def running? @running end |
#start {|payload, metadata| ... } ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Start the stream loop.
The method blocks while the stream is running. For every XLogData envelope, it yields the raw pgoutput payload and the parsed envelope metadata. When no CopyData payload is currently available, the loop pauses briefly before checking again.
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/pgoutput/client/stream.rb', line 74 def start(&block) raise ArgumentError, "block required" unless block_given? @running = true while @running copy_data = @connection.get_copy_data if copy_data.nil? sleep 0.01 next end process_copy_data(copy_data, &block) send_periodic_feedback end ensure send_feedback(reply_requested: false) if @stop_requested @running = false end |
#stop ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Stop the stream loop after the current iteration.
96 97 98 99 100 |
# File 'lib/pgoutput/client/stream.rb', line 96 def stop @stop_requested = true @running = false nil end |