Class: Pgoutput::Client::Stream Private
- Inherits:
-
Object
- Object
- Pgoutput::Client::Stream
- Defined in:
- lib/pgoutput/client/stream.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Logical replication stream loop.
‘Stream` consumes PostgreSQL CopyData payloads from a Connection. It understands the two replication envelope message types used by PostgreSQL’s streaming replication protocol:
-
‘w` — XLogData, containing logical decoding plugin payload bytes.
-
‘k` — primary keepalive, optionally requesting immediate feedback.
The stream yields only XLogData plugin payloads. Keepalive messages are handled internally by updating the latest known WAL position and sending standby feedback when requested.
Instance Method Summary collapse
-
#initialize(connection:, configuration:) ⇒ void
constructor
private
Build a stream loop.
-
#start {|payload, metadata| ... } ⇒ void
private
Start the stream loop.
-
#stop ⇒ void
private
Stop the stream loop after the current iteration.
Constructor Details
#initialize(connection:, configuration:) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Build a stream loop.
25 26 27 28 29 30 31 |
# File 'lib/pgoutput/client/stream.rb', line 25 def initialize(connection:, configuration:) @connection = connection @configuration = configuration @latest_lsn = LSN.parse(configuration.start_lsn_string) @last_feedback_at = monotonic_time @running = false end |
Instance Method Details
#start {|payload, metadata| ... } ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Start the stream loop.
The method blocks while the stream is running. For every XLogData envelope, it yields the raw pgoutput payload and the parsed envelope metadata. When no CopyData payload is currently available, the loop continues and checks again.
48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/pgoutput/client/stream.rb', line 48 def start(&block) raise ArgumentError, "block required" unless block_given? @running = true while @running copy_data = @connection.get_copy_data next unless copy_data process_copy_data(copy_data, &block) send_periodic_feedback end end |
#stop ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Stop the stream loop after the current iteration.
64 65 66 |
# File 'lib/pgoutput/client/stream.rb', line 64 def stop @running = false end |