Class: CDC::SolidQueue::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/solid_queue/runner.rb

Overview

Minimal ingestion runner boundary.

The runner accepts any stream object that yields PostgreSQL-derived CDC events. This keeps the class testable while production code can supply a pgoutput-client backed stream.

Instance Method Summary collapse

Constructor Details

#initialize(stream:, enqueuer:) ⇒ Runner

Returns a new instance of Runner.

Parameters:

Raises:

  • (ArgumentError)


13
14
15
16
17
18
# File 'lib/cdc/solid_queue/runner.rb', line 13

def initialize(stream:, enqueuer:)
  raise ArgumentError, 'stream must respond to #each' unless stream.respond_to?(:each)

  @stream = stream
  @enqueuer = enqueuer
end

Instance Method Details

#startInteger

Start reading events and enqueueing jobs.

Returns:

  • (Integer)

    number of enqueued events



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/cdc/solid_queue/runner.rb', line 23

def start
  # @type var batch: Array[untyped]
  batch = []
  count = 0

  @stream.each do |event|
    batch << event
    next unless batch.length >= @enqueuer.configuration.batch_size

    count += flush_batch(batch)
    batch = []
  end

  count + flush_batch(batch)
end