Class: Pcrd::Apply::Worker
- Inherits:
-
Object
- Object
- Pcrd::Apply::Worker
- Defined in:
- lib/pcrd/apply/worker.rb
Overview
Drains buffered transactions from the WAL consumer queue and applies them to the target, in its own thread, concurrently with backfill.
This is what makes streaming run alongside the bulk copy instead of after it: the consumer fills a bounded queue, this worker empties it, and the source slot can keep advancing so WAL is not retained for the whole backfill.
Threading contract:
- The Apply::Engine here MUST use a target connection that is not shared
with backfill — a Connection::Client wraps a single PG connection and is
not safe to use from two threads at once.
- on_committed is invoked (on this thread) after each transaction is
durably applied, with the commit LSN. Wire it to checkpoint + the
consumer's LSN acknowledgement so WAL is only released after apply.
Lifecycle:
start — launch the background thread
stop — drain whatever is already queued, then exit and join
failed?/error — surface a fatal apply error to the supervising thread
last_applied_lsn — most recent commit LSN handed to on_committed
Constant Summary collapse
- POLL_INTERVAL =
seconds to wait when the queue is momentarily empty
0.05
Instance Attribute Summary collapse
-
#error ⇒ Object
readonly
Returns the value of attribute error.
Instance Method Summary collapse
- #failed? ⇒ Boolean
-
#initialize(engine:, queue:, on_committed: nil) ⇒ Worker
constructor
A new instance of Worker.
- #last_applied_lsn ⇒ Object
- #start ⇒ Object
-
#stop ⇒ Object
Signals the worker to finish: it keeps applying until the queue is empty, then exits.
Constructor Details
#initialize(engine:, queue:, on_committed: nil) ⇒ Worker
Returns a new instance of Worker.
29 30 31 32 33 34 35 36 37 38 |
# File 'lib/pcrd/apply/worker.rb', line 29 def initialize(engine:, queue:, on_committed: nil) @engine = engine @queue = queue @on_committed = on_committed @stop = false @mutex = Mutex.new @error = nil @last_lsn = nil @thread = nil end |
Instance Attribute Details
#error ⇒ Object (readonly)
Returns the value of attribute error.
60 61 62 |
# File 'lib/pcrd/apply/worker.rb', line 60 def error @error end |
Instance Method Details
#failed? ⇒ Boolean
52 53 54 |
# File 'lib/pcrd/apply/worker.rb', line 52 def failed? @mutex.synchronize { !@error.nil? } end |
#last_applied_lsn ⇒ Object
56 57 58 |
# File 'lib/pcrd/apply/worker.rb', line 56 def last_applied_lsn @mutex.synchronize { @last_lsn } end |
#start ⇒ Object
40 41 42 43 |
# File 'lib/pcrd/apply/worker.rb', line 40 def start @thread = Thread.new { run_loop } self end |
#stop ⇒ Object
Signals the worker to finish: it keeps applying until the queue is empty, then exits. Joins the thread before returning.
47 48 49 50 |
# File 'lib/pcrd/apply/worker.rb', line 47 def stop @mutex.synchronize { @stop = true } @thread&.join end |