Class: Mammoth::DeliveryWorker
- Inherits:
-
Object
- Object
- Mammoth::DeliveryWorker
- Defined in:
- lib/mammoth/delivery_worker.rb
Overview
Delivers normalized events with retry, checkpoint, and dead-letter handling.
DeliveryWorker is Mammoth’s first reliable delivery unit. It intentionally keeps the delivery contract small: attempt webhook delivery, advance the checkpoint after success, and persist the failed event to the dead letter queue after retry exhaustion.
Constant Summary collapse
- DEFAULT_SOURCE =
"postgresql"
Instance Attribute Summary collapse
-
#checkpoint_store ⇒ Object
readonly
Returns the value of attribute checkpoint_store.
-
#dead_letter_store ⇒ Object
readonly
Returns the value of attribute dead_letter_store.
-
#max_attempts ⇒ Object
readonly
Returns the value of attribute max_attempts.
-
#publication_name ⇒ Object
readonly
Returns the value of attribute publication_name.
-
#retry_schedule ⇒ Object
readonly
Returns the value of attribute retry_schedule.
-
#sink ⇒ Object
readonly
Returns the value of attribute sink.
-
#sleeper ⇒ Object
readonly
Returns the value of attribute sleeper.
-
#slot_name ⇒ Object
readonly
Returns the value of attribute slot_name.
-
#source_name ⇒ Object
readonly
Returns the value of attribute source_name.
Class Method Summary collapse
-
.from_config(config, sink:, checkpoint_store:, dead_letter_store:, sleeper: Kernel.method(:sleep)) ⇒ Mammoth::DeliveryWorker
Build a delivery worker from Mammoth configuration and stores.
Instance Method Summary collapse
-
#deliver(event) ⇒ Hash
Deliver an event with retry, checkpoint, and DLQ handling.
-
#initialize(sink:, checkpoint_store:, dead_letter_store:, source_name:, slot_name:, publication_name:, max_attempts:, retry_schedule:, sleeper: Kernel.method(:sleep)) ⇒ DeliveryWorker
constructor
A new instance of DeliveryWorker.
Constructor Details
#initialize(sink:, checkpoint_store:, dead_letter_store:, source_name:, slot_name:, publication_name:, max_attempts:, retry_schedule:, sleeper: Kernel.method(:sleep)) ⇒ DeliveryWorker
Returns a new instance of DeliveryWorker.
25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/mammoth/delivery_worker.rb', line 25 def initialize(sink:, checkpoint_store:, dead_letter_store:, source_name:, slot_name:, publication_name:, max_attempts:, retry_schedule:, sleeper: Kernel.method(:sleep)) @sink = sink @checkpoint_store = checkpoint_store @dead_letter_store = dead_letter_store @source_name = source_name @slot_name = slot_name @publication_name = publication_name @max_attempts = max_attempts @retry_schedule = retry_schedule @sleeper = sleeper end |
Instance Attribute Details
#checkpoint_store ⇒ Object (readonly)
Returns the value of attribute checkpoint_store.
13 14 15 |
# File 'lib/mammoth/delivery_worker.rb', line 13 def checkpoint_store @checkpoint_store end |
#dead_letter_store ⇒ Object (readonly)
Returns the value of attribute dead_letter_store.
13 14 15 |
# File 'lib/mammoth/delivery_worker.rb', line 13 def dead_letter_store @dead_letter_store end |
#max_attempts ⇒ Object (readonly)
Returns the value of attribute max_attempts.
13 14 15 |
# File 'lib/mammoth/delivery_worker.rb', line 13 def max_attempts @max_attempts end |
#publication_name ⇒ Object (readonly)
Returns the value of attribute publication_name.
13 14 15 |
# File 'lib/mammoth/delivery_worker.rb', line 13 def publication_name @publication_name end |
#retry_schedule ⇒ Object (readonly)
Returns the value of attribute retry_schedule.
13 14 15 |
# File 'lib/mammoth/delivery_worker.rb', line 13 def retry_schedule @retry_schedule end |
#sink ⇒ Object (readonly)
Returns the value of attribute sink.
13 14 15 |
# File 'lib/mammoth/delivery_worker.rb', line 13 def sink @sink end |
#sleeper ⇒ Object (readonly)
Returns the value of attribute sleeper.
13 14 15 |
# File 'lib/mammoth/delivery_worker.rb', line 13 def sleeper @sleeper end |
#slot_name ⇒ Object (readonly)
Returns the value of attribute slot_name.
13 14 15 |
# File 'lib/mammoth/delivery_worker.rb', line 13 def slot_name @slot_name end |
#source_name ⇒ Object (readonly)
Returns the value of attribute source_name.
13 14 15 |
# File 'lib/mammoth/delivery_worker.rb', line 13 def source_name @source_name end |
Class Method Details
.from_config(config, sink:, checkpoint_store:, dead_letter_store:, sleeper: Kernel.method(:sleep)) ⇒ Mammoth::DeliveryWorker
Build a delivery worker from Mammoth configuration and stores.
46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/mammoth/delivery_worker.rb', line 46 def self.from_config(config, sink:, checkpoint_store:, dead_letter_store:, sleeper: Kernel.method(:sleep)) new( sink: sink, checkpoint_store: checkpoint_store, dead_letter_store: dead_letter_store, source_name: config.dig("mammoth", "name"), slot_name: config.dig("replication", "slot"), publication_name: config.dig("replication", "publication"), max_attempts: config.dig("retry", "max_attempts"), retry_schedule: config.dig("retry", "schedule_seconds"), sleeper: sleeper ) end |
Instance Method Details
#deliver(event) ⇒ Hash
Deliver an event with retry, checkpoint, and DLQ handling.
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/mammoth/delivery_worker.rb', line 64 def deliver(event) attempts = 0 begin attempts += 1 result = sink.deliver(event) checkpoint(event) result.merge(attempts: attempts) rescue DeliveryError => e return dead_letter(event, e, attempts) if attempts >= max_attempts wait_before_retry(attempts) retry end end |