Class: Pgoutput::Client::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/pgoutput/client/runner.rb

Overview

High-level logical replication client facade.

‘Runner` is the simplest public entry point for consumers that want to stream raw pgoutput payloads without manually managing a replication connection. It owns the connection lifecycle for one logical replication stream:

  1. Build an immutable Configuration from keyword arguments.

  2. Open a PostgreSQL replication connection.

  3. Optionally create the configured replication slot.

  4. Start logical replication.

  5. Yield raw pgoutput payload bytes and XLogData metadata.

  6. Close the connection when streaming exits.

If a live stream loses its connection, the runner retries a small number of times with a backoff and resumes from the latest confirmed WAL position. Replay, checkpointing, and deduplication are not owned here; those concerns belong to the downstream CDC runtime and sink layer.

Examples:

Stream raw pgoutput messages

runner = Pgoutput::Client::Runner.new(
  database_url: "postgres://localhost/app",
  slot_name: "cdc_slot",
  publication_names: ["app_publication"]
)

runner.start do |payload, |
  puts "received #{payload.bytesize} bytes at #{.wal_end_lsn}"
end

See Also:

Constant Summary collapse

DEFAULT_RECONNECT_ATTEMPTS =
3
DEFAULT_RECONNECT_BACKOFF =
0.5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(**options) ⇒ void

Build a runner from configuration keyword arguments.

The accepted keywords are the same as Configuration#initialize. The resulting configuration object is immutable and reused for the lifetime of this runner.

Parameters:

Raises:



63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/pgoutput/client/runner.rb', line 63

def initialize(**options)
  @configuration = Configuration.new(
    **options # : untyped
  )
  @stopped = true
  @running = false
  @stream = nil
  @resume_lsn = configuration.start_lsn
  @acked_lsn = configuration.start_lsn
  @slot_created = false
  @last_error = nil
  @reconnect_attempts = 0
end

Instance Attribute Details

#configurationConfiguration (readonly)

Configuration used by this runner.

Returns:



46
47
48
# File 'lib/pgoutput/client/runner.rb', line 46

def configuration
  @configuration
end

#last_errorException? (readonly)

Last transport error seen by the runner.

Returns:

  • (Exception, nil)


51
52
53
# File 'lib/pgoutput/client/runner.rb', line 51

def last_error
  @last_error
end

Instance Method Details

#ack(lsn) ⇒ Integer

Mark a WAL position as durably handled by downstream code.

This does not checkpoint or persist anything. It only updates transport feedback state so future standby status updates can distinguish received WAL from downstream-acknowledged WAL.

Parameters:

  • lsn (String, Integer)

    WAL position acknowledged by downstream code

Returns:

  • (Integer)

    normalized acknowledged WAL position



170
171
172
173
174
175
# File 'lib/pgoutput/client/runner.rb', line 170

def ack(lsn)
  parsed = normalize_lsn_value(lsn)
  @acked_lsn = [@acked_lsn ? normalize_lsn_value(@acked_lsn) : 0, parsed].max
  @stream&.ack(@acked_lsn)
  @acked_lsn
end

#connected?Boolean

Whether an active replication stream exists.

Returns:

  • (Boolean)


158
159
160
# File 'lib/pgoutput/client/runner.rb', line 158

def connected?
  !@stream.nil?
end

#monitorRunnerState

Return an immutable transport status snapshot.

Returns:



180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/pgoutput/client/runner.rb', line 180

def monitor
  RunnerState.new(
    running: running?,
    stop_requested: @stopped,
    connected: connected?,
    last_received_lsn: current_lsn_string(@stream&.latest_lsn || @resume_lsn),
    last_feedback_lsn: current_lsn_string(@stream&.acked_lsn || @acked_lsn),
    last_keepalive_at: @stream&.last_keepalive_at,
    last_error: @last_error&.message,
    reconnect_attempts: @reconnect_attempts
  )
end

#restart {|payload, metadata| ... } ⇒ void

This method returns an undefined value.

Stop the active stream and start again with the same block.

The runner is synchronous, so this helper is primarily useful for supervisors that call it instead of manually calling #stop followed by #start.

Yields:

  • (payload, metadata)

    called once for each XLogData payload



136
137
138
139
# File 'lib/pgoutput/client/runner.rb', line 136

def restart(&block)
  stop
  start(&block)
end

#running?Boolean

Whether the runner is currently inside its streaming loop.

Returns:

  • (Boolean)


144
145
146
# File 'lib/pgoutput/client/runner.rb', line 144

def running?
  @running
end

#start {|payload, metadata| ... } ⇒ void

This method returns an undefined value.

Start streaming raw pgoutput payloads.

This method blocks until the stream stops or the underlying connection raises an error. The yielded payload is the raw logical decoding plugin payload contained inside PostgreSQL’s XLogData envelope; callers normally pass this payload to a pgoutput parser.

Yields:

  • (payload, metadata)

    called once for each XLogData payload

Yield Parameters:

  • payload (String)

    frozen raw pgoutput payload bytes

  • metadata (XLogData)

    WAL envelope metadata for the payload

Raises:

  • (ArgumentError)

    if no block is provided

  • (ConnectionError)

    if a PostgreSQL connection or command fails

  • (ProtocolError)

    if an invalid replication message is received



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/pgoutput/client/runner.rb', line 91

def start(&block)
  raise ArgumentError, "block required" unless block

  @stopped = false
  @running = true
  @last_error = nil
  @reconnect_attempts = 0

  loop do
    current_configuration = configuration_for_resume
    case run_stream_cycle(current_configuration, &block)
    when :done
      break
    when :retry
      @reconnect_attempts += 1
      raise @last_error if @reconnect_attempts > DEFAULT_RECONNECT_ATTEMPTS

      sleep(reconnect_backoff_for(@reconnect_attempts))
    end
  end
ensure
  @running = false
  @stopped = true
end

#stopvoid

This method returns an undefined value.

Request graceful stop.

This method records the caller’s intent to stop and asks the active Stream, if any, to stop after its current iteration.



122
123
124
125
126
# File 'lib/pgoutput/client/runner.rb', line 122

def stop
  @stopped = true
  @stream&.stop
  nil
end

#stopped?Boolean

Whether the runner has stopped.

Returns:

  • (Boolean)


151
152
153
# File 'lib/pgoutput/client/runner.rb', line 151

def stopped?
  !running?
end