Class: Pgoutput::Client::Runner
- Inherits:
-
Object
- Object
- Pgoutput::Client::Runner
- Defined in:
- lib/pgoutput/client/runner.rb
Overview
High-level logical replication client facade.
‘Runner` is the simplest public entry point for consumers that want to stream raw pgoutput payloads without manually managing a replication connection. It owns the connection lifecycle for one logical replication stream:
-
Build an immutable Configuration from keyword arguments.
-
Open a PostgreSQL replication connection.
-
Optionally create the configured replication slot.
-
Start logical replication.
-
Yield raw pgoutput payload bytes and XLogData metadata.
-
Close the connection when streaming exits.
If a live stream loses its connection, the runner retries a small number of times with a backoff and resumes from the latest confirmed WAL position. Replay, checkpointing, and deduplication are not owned here; those concerns belong to the downstream CDC runtime and sink layer.
Constant Summary collapse
- DEFAULT_RECONNECT_ATTEMPTS =
Default number of reconnect attempts after a previously healthy stream fails. The default is intentionally large enough to survive ordinary PostgreSQL restart windows.
30- DEFAULT_RECONNECT_BACKOFF =
Base reconnect backoff, in seconds. Attempt ‘n` sleeps for `n * DEFAULT_RECONNECT_BACKOFF`.
0.5
Instance Attribute Summary collapse
-
#configuration ⇒ Configuration
readonly
Configuration used by this runner.
-
#last_error ⇒ Exception?
readonly
Last transport error seen by the runner.
Instance Method Summary collapse
-
#ack(lsn) ⇒ Integer
Mark a WAL position as durably handled by downstream code.
-
#connected? ⇒ Boolean
Whether an active replication stream exists.
-
#initialize(**options) ⇒ void
constructor
Build a runner from configuration keyword arguments.
-
#monitor ⇒ RunnerState
Return an immutable transport status snapshot.
-
#restart {|payload, metadata| ... } ⇒ void
Stop the active stream and start again with the same block.
-
#running? ⇒ Boolean
Whether the runner is currently inside its streaming loop.
-
#start {|payload, metadata| ... } ⇒ void
Start streaming raw pgoutput payloads.
-
#stop ⇒ void
Request graceful stop.
-
#stopped? ⇒ Boolean
Whether the runner has stopped.
Constructor Details
#initialize(**options) ⇒ void
Build a runner from configuration keyword arguments.
The accepted keywords are the same as Configuration#initialize. The resulting configuration object is immutable and reused for the lifetime of this runner.
73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/pgoutput/client/runner.rb', line 73 def initialize(**) @configuration = Configuration.new( ** # : untyped ) @stopped = true @running = false @stream = nil @resume_lsn = configuration.start_lsn @acked_lsn = configuration.start_lsn @slot_created = false @connected_once = false @last_error = nil @reconnect_attempts = 0 end |
Instance Attribute Details
#configuration ⇒ Configuration (readonly)
Configuration used by this runner.
56 57 58 |
# File 'lib/pgoutput/client/runner.rb', line 56 def configuration @configuration end |
#last_error ⇒ Exception? (readonly)
Last transport error seen by the runner.
61 62 63 |
# File 'lib/pgoutput/client/runner.rb', line 61 def last_error @last_error end |
Instance Method Details
#ack(lsn) ⇒ Integer
Mark a WAL position as durably handled by downstream code.
This does not checkpoint or persist anything. It only updates transport feedback state so future standby status updates can distinguish received WAL from downstream-acknowledged WAL.
181 182 183 184 185 186 |
# File 'lib/pgoutput/client/runner.rb', line 181 def ack(lsn) parsed = normalize_lsn_value(lsn) @acked_lsn = [@acked_lsn ? normalize_lsn_value(@acked_lsn) : 0, parsed].max @stream&.ack(@acked_lsn) @acked_lsn end |
#connected? ⇒ Boolean
Whether an active replication stream exists.
169 170 171 |
# File 'lib/pgoutput/client/runner.rb', line 169 def connected? !@stream.nil? end |
#monitor ⇒ RunnerState
Return an immutable transport status snapshot.
191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/pgoutput/client/runner.rb', line 191 def monitor RunnerState.new( running: running?, stop_requested: @stopped, connected: connected?, last_received_lsn: current_lsn_string(@stream&.latest_lsn || @resume_lsn), last_feedback_lsn: current_lsn_string(@stream&.acked_lsn || @acked_lsn), last_keepalive_at: @stream&.last_keepalive_at, last_error: @last_error&., reconnect_attempts: @reconnect_attempts ) end |
#restart {|payload, metadata| ... } ⇒ void
147 148 149 150 |
# File 'lib/pgoutput/client/runner.rb', line 147 def restart(&block) stop start(&block) end |
#running? ⇒ Boolean
Whether the runner is currently inside its streaming loop.
155 156 157 |
# File 'lib/pgoutput/client/runner.rb', line 155 def running? @running end |
#start {|payload, metadata| ... } ⇒ void
This method returns an undefined value.
Start streaming raw pgoutput payloads.
This method blocks until the stream stops or the underlying connection raises an error. The yielded payload is the raw logical decoding plugin payload contained inside PostgreSQL’s XLogData envelope; callers normally pass this payload to a pgoutput parser.
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/pgoutput/client/runner.rb', line 102 def start(&block) raise ArgumentError, "block required" unless block @stopped = false @running = true @last_error = nil @reconnect_attempts = 0 loop do current_configuration = configuration_for_resume case run_stream_cycle(current_configuration, &block) when :done break when :retry @reconnect_attempts += 1 raise @last_error if @reconnect_attempts > DEFAULT_RECONNECT_ATTEMPTS sleep(reconnect_backoff_for(@reconnect_attempts)) end end ensure @running = false @stopped = true end |
#stop ⇒ void
This method returns an undefined value.
Request graceful stop.
This method records the caller’s intent to stop and asks the active Stream, if any, to stop after its current iteration.
133 134 135 136 137 |
# File 'lib/pgoutput/client/runner.rb', line 133 def stop @stopped = true @stream&.stop nil end |
#stopped? ⇒ Boolean
Whether the runner has stopped.
162 163 164 |
# File 'lib/pgoutput/client/runner.rb', line 162 def stopped? !running? end |