Class: Pcrd::Apply::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/pcrd/apply/worker.rb

Overview

Drains buffered transactions from the WAL consumer queue and applies them to the target, in its own thread, concurrently with backfill.

This is what makes streaming run alongside the bulk copy instead of after it: the consumer fills a bounded queue, this worker empties it, and the source slot can keep advancing so WAL is not retained for the whole backfill.

Threading contract:

- The Apply::Engine here MUST use a target connection that is not shared
  with backfill — a Connection::Client wraps a single PG connection and is
  not safe to use from two threads at once.
- on_committed is invoked (on this thread) after each transaction is
  durably applied, with the commit LSN. Wire it to checkpoint + the
  consumer's LSN acknowledgement so WAL is only released after apply.

Lifecycle:

start            — launch the background thread
stop             — drain whatever is already queued, then exit and join
failed?/error    — surface a fatal apply error to the supervising thread
last_applied_lsn — most recent commit LSN handed to on_committed

Constant Summary collapse

POLL_INTERVAL =

seconds to wait when the queue is momentarily empty

0.05

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine:, queue:, on_committed: nil) ⇒ Worker

Returns a new instance of Worker.



29
30
31
32
33
34
35
36
37
38
# File 'lib/pcrd/apply/worker.rb', line 29

def initialize(engine:, queue:, on_committed: nil)
  @engine       = engine
  @queue        = queue
  @on_committed = on_committed
  @stop         = false
  @mutex        = Mutex.new
  @error        = nil
  @last_lsn     = nil
  @thread       = nil
end

Instance Attribute Details

#errorObject (readonly)

Returns the value of attribute error.



60
61
62
# File 'lib/pcrd/apply/worker.rb', line 60

def error
  @error
end

Instance Method Details

#failed?Boolean

Returns:

  • (Boolean)


52
53
54
# File 'lib/pcrd/apply/worker.rb', line 52

def failed?
  @mutex.synchronize { !@error.nil? }
end

#last_applied_lsnObject



56
57
58
# File 'lib/pcrd/apply/worker.rb', line 56

def last_applied_lsn
  @mutex.synchronize { @last_lsn }
end

#startObject



40
41
42
43
# File 'lib/pcrd/apply/worker.rb', line 40

def start
  @thread = Thread.new { run_loop }
  self
end

#stopObject

Signals the worker to finish: it keeps applying until the queue is empty, then exits. Joins the thread before returning.



47
48
49
50
# File 'lib/pcrd/apply/worker.rb', line 47

def stop
  @mutex.synchronize { @stop = true }
  @thread&.join
end