Class: Pgoutput::Client::Runner
- Inherits:
-
Object
- Object
- Pgoutput::Client::Runner
- Defined in:
- lib/pgoutput/client/runner.rb
Overview
High-level logical replication client facade.
‘Runner` is the simplest public entry point for consumers that want to stream raw pgoutput payloads without manually managing a replication connection. It owns the connection lifecycle for one logical replication stream:
-
Build an immutable Configuration from keyword arguments.
-
Open a PostgreSQL replication connection.
-
Optionally create the configured replication slot.
-
Start logical replication.
-
Yield raw pgoutput payload bytes and XLogData metadata.
-
Close the connection when streaming exits.
If a live stream loses its connection, the runner retries a small number of times with a backoff and resumes from the latest confirmed WAL position. Replay, checkpointing, and deduplication are not owned here; those concerns belong to the downstream CDC runtime and sink layer.
Constant Summary collapse
- DEFAULT_RECONNECT_ATTEMPTS =
3- DEFAULT_RECONNECT_BACKOFF =
0.5
Instance Attribute Summary collapse
-
#configuration ⇒ Configuration
readonly
Configuration used by this runner.
-
#last_error ⇒ Exception?
readonly
Last transport error seen by the runner.
Instance Method Summary collapse
-
#ack(lsn) ⇒ Integer
Mark a WAL position as durably handled by downstream code.
-
#connected? ⇒ Boolean
Whether an active replication stream exists.
-
#initialize(**options) ⇒ void
constructor
Build a runner from configuration keyword arguments.
-
#monitor ⇒ RunnerState
Return an immutable transport status snapshot.
-
#restart {|payload, metadata| ... } ⇒ void
Stop the active stream and start again with the same block.
-
#running? ⇒ Boolean
Whether the runner is currently inside its streaming loop.
-
#start {|payload, metadata| ... } ⇒ void
Start streaming raw pgoutput payloads.
-
#stop ⇒ void
Request graceful stop.
-
#stopped? ⇒ Boolean
Whether the runner has stopped.
Constructor Details
#initialize(**options) ⇒ void
Build a runner from configuration keyword arguments.
The accepted keywords are the same as Configuration#initialize. The resulting configuration object is immutable and reused for the lifetime of this runner.
63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/pgoutput/client/runner.rb', line 63 def initialize(**) @configuration = Configuration.new( ** # : untyped ) @stopped = true @running = false @stream = nil @resume_lsn = configuration.start_lsn @acked_lsn = configuration.start_lsn @slot_created = false @last_error = nil @reconnect_attempts = 0 end |
Instance Attribute Details
#configuration ⇒ Configuration (readonly)
Configuration used by this runner.
46 47 48 |
# File 'lib/pgoutput/client/runner.rb', line 46 def configuration @configuration end |
#last_error ⇒ Exception? (readonly)
Last transport error seen by the runner.
51 52 53 |
# File 'lib/pgoutput/client/runner.rb', line 51 def last_error @last_error end |
Instance Method Details
#ack(lsn) ⇒ Integer
Mark a WAL position as durably handled by downstream code.
This does not checkpoint or persist anything. It only updates transport feedback state so future standby status updates can distinguish received WAL from downstream-acknowledged WAL.
170 171 172 173 174 175 |
# File 'lib/pgoutput/client/runner.rb', line 170 def ack(lsn) parsed = normalize_lsn_value(lsn) @acked_lsn = [@acked_lsn ? normalize_lsn_value(@acked_lsn) : 0, parsed].max @stream&.ack(@acked_lsn) @acked_lsn end |
#connected? ⇒ Boolean
Whether an active replication stream exists.
158 159 160 |
# File 'lib/pgoutput/client/runner.rb', line 158 def connected? !@stream.nil? end |
#monitor ⇒ RunnerState
Return an immutable transport status snapshot.
180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/pgoutput/client/runner.rb', line 180 def monitor RunnerState.new( running: running?, stop_requested: @stopped, connected: connected?, last_received_lsn: current_lsn_string(@stream&.latest_lsn || @resume_lsn), last_feedback_lsn: current_lsn_string(@stream&.acked_lsn || @acked_lsn), last_keepalive_at: @stream&.last_keepalive_at, last_error: @last_error&., reconnect_attempts: @reconnect_attempts ) end |
#restart {|payload, metadata| ... } ⇒ void
136 137 138 139 |
# File 'lib/pgoutput/client/runner.rb', line 136 def restart(&block) stop start(&block) end |
#running? ⇒ Boolean
Whether the runner is currently inside its streaming loop.
144 145 146 |
# File 'lib/pgoutput/client/runner.rb', line 144 def running? @running end |
#start {|payload, metadata| ... } ⇒ void
This method returns an undefined value.
Start streaming raw pgoutput payloads.
This method blocks until the stream stops or the underlying connection raises an error. The yielded payload is the raw logical decoding plugin payload contained inside PostgreSQL’s XLogData envelope; callers normally pass this payload to a pgoutput parser.
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/pgoutput/client/runner.rb', line 91 def start(&block) raise ArgumentError, "block required" unless block @stopped = false @running = true @last_error = nil @reconnect_attempts = 0 loop do current_configuration = configuration_for_resume case run_stream_cycle(current_configuration, &block) when :done break when :retry @reconnect_attempts += 1 raise @last_error if @reconnect_attempts > DEFAULT_RECONNECT_ATTEMPTS sleep(reconnect_backoff_for(@reconnect_attempts)) end end ensure @running = false @stopped = true end |
#stop ⇒ void
This method returns an undefined value.
Request graceful stop.
This method records the caller’s intent to stop and asks the active Stream, if any, to stop after its current iteration.
122 123 124 125 126 |
# File 'lib/pgoutput/client/runner.rb', line 122 def stop @stopped = true @stream&.stop nil end |
#stopped? ⇒ Boolean
Whether the runner has stopped.
151 152 153 |
# File 'lib/pgoutput/client/runner.rb', line 151 def stopped? !running? end |