Class: Pgoutput::Decoder

Inherits:
Object
  • Object
show all
Defined in:
lib/pgoutput/decoder.rb,
lib/pgoutput/decoder/errors.rb,
lib/pgoutput/decoder/events.rb,
lib/pgoutput/decoder/version.rb,
lib/pgoutput/decoder/row_builder.rb,
lib/pgoutput/decoder/type_registry.rb,
lib/pgoutput/decoder/value_decoder.rb,
lib/pgoutput/decoder/relation_cache.rb

Overview

Stateful high-level decoder for pgoutput-parser protocol messages.

Decoder accepts immutable protocol messages from pgoutput-parser and returns immutable, Ractor-shareable row-change events. The decoder maintains relation and active transaction context, so one instance should be used per logical replication stream.

Defined Under Namespace

Modules: Events Classes: Error, RelationCache, RowBuilder, TransactionStateError, TypeRegistry, UnknownRelationError, UnsupportedMessageError, ValueDecodeError, ValueDecoder

Constant Summary collapse

VERSION =

Gem version.

"0.1.0"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(type_registry: TypeRegistry.default) ⇒ void

Parameters:

  • type_registry (TypeRegistry) (defaults to: TypeRegistry.default)

    immutable OID decoder registry.



32
33
34
35
36
37
38
39
# File 'lib/pgoutput/decoder.rb', line 32

def initialize(type_registry: TypeRegistry.default)
  @type_registry = type_registry
  @relations = RelationCache.new
  @row_builder = RowBuilder.new(type_registry: type_registry)
  @current_transaction_id = nil
  @current_final_lsn = nil
  @current_commit_timestamp = nil
end

Instance Attribute Details

#type_registryTypeRegistry (readonly)

Returns:



28
29
30
# File 'lib/pgoutput/decoder.rb', line 28

def type_registry
  @type_registry
end

Instance Method Details

#decode(message) ⇒ Events::Begin, ...

Decode one pgoutput-parser protocol message.

Parameters:

  • message (Object)

    protocol message from pgoutput-parser.

Returns:

Raises:



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/pgoutput/decoder.rb', line 47

def decode(message)
  case message
  when parser_messages::Begin
    decode_begin(message)
  when parser_messages::Relation
    @relations.store(message)
    nil
  when parser_messages::Insert
    decode_insert(message)
  when parser_messages::Update
    decode_update(message)
  when parser_messages::Delete
    decode_delete(message)
  when parser_messages::Commit
    decode_commit(message)
  else
    raise UnsupportedMessageError, "unsupported message: #{message.class}"
  end
end