Class: CDC::SolidQueue::PostgresqlStream

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/solid_queue/postgresql_stream.rb

Overview

Enumerable stream that normalizes PostgreSQL pgoutput payloads into CDC events.

Instance Method Summary collapse

Constructor Details

#initialize(configuration, client_runner: nil, relation_tracker: Pgoutput::RelationTracker.new, decoder: Pgoutput::Decoder.new, adapter: nil) ⇒ PostgresqlStream

Returns a new instance of PostgresqlStream.

Parameters:

  • configuration (Configuration)
  • client_runner (Object, nil) (defaults to: nil)
  • relation_tracker (Object) (defaults to: Pgoutput::RelationTracker.new)
  • decoder (Object) (defaults to: Pgoutput::Decoder.new)
  • adapter (Object, nil) (defaults to: nil)


17
18
19
20
21
22
23
24
25
# File 'lib/cdc/solid_queue/postgresql_stream.rb', line 17

def initialize(configuration, client_runner: nil, relation_tracker: Pgoutput::RelationTracker.new,
               decoder: Pgoutput::Decoder.new, adapter: nil)
  @configuration = configuration
  @client_runner = client_runner || build_client_runner
  @relation_tracker = relation_tracker
  @decoder = decoder
  @adapter = adapter || build_adapter
  @transport_metadata = nil
end

Instance Method Details

#each {|event| ... } ⇒ void

This method returns an undefined value.

Stream normalized CDC::Core::ChangeEvent instances.

Yield Parameters:

  • event (CDC::Core::ChangeEvent)


31
32
33
34
35
36
37
38
39
40
41
# File 'lib/cdc/solid_queue/postgresql_stream.rb', line 31

def each
  return enum_for(:each) unless block_given?

  @client_runner.start do |payload, |
    @transport_metadata = 
    event = normalize(payload)
    yield event unless event.nil?
  ensure
    @transport_metadata = nil
  end
end