Class: Mammoth::DeliveryWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/mammoth/delivery_worker.rb

Overview

Delivers normalized events with retry, checkpoint, and dead-letter handling.

DeliveryWorker is Mammoth’s first reliable delivery unit. It intentionally keeps the delivery contract small: attempt webhook delivery, advance the checkpoint after success, and persist the failed event to the dead letter queue after retry exhaustion.

Constant Summary collapse

DEFAULT_SOURCE =
"postgresql"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sink:, checkpoint_store:, dead_letter_store:, source_name:, slot_name:, publication_name:, max_attempts:, retry_schedule:, sleeper: Kernel.method(:sleep)) ⇒ DeliveryWorker

Returns a new instance of DeliveryWorker.

Parameters:

  • sink (#deliver)

    destination sink

  • checkpoint_store (Mammoth::CheckpointStore)

    checkpoint persistence

  • dead_letter_store (Mammoth::DeadLetterStore)

    dead letter persistence

  • source_name (String)

    logical source name

  • slot_name (String)

    replication slot name

  • publication_name (String)

    publication name

  • max_attempts (Integer)

    maximum delivery attempts

  • retry_schedule (Array<Integer>)

    retry wait schedule in seconds

  • sleeper (#call) (defaults to: Kernel.method(:sleep))

    sleep strategy, injectable for tests



25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/mammoth/delivery_worker.rb', line 25

def initialize(sink:, checkpoint_store:, dead_letter_store:, source_name:, slot_name:, publication_name:,
               max_attempts:, retry_schedule:, sleeper: Kernel.method(:sleep))
  @sink = sink
  @checkpoint_store = checkpoint_store
  @dead_letter_store = dead_letter_store
  @source_name = source_name
  @slot_name = slot_name
  @publication_name = publication_name
  @max_attempts = max_attempts
  @retry_schedule = retry_schedule
  @sleeper = sleeper
end

Instance Attribute Details

#checkpoint_storeObject (readonly)

Returns the value of attribute checkpoint_store.



13
14
15
# File 'lib/mammoth/delivery_worker.rb', line 13

def checkpoint_store
  @checkpoint_store
end

#dead_letter_storeObject (readonly)

Returns the value of attribute dead_letter_store.



13
14
15
# File 'lib/mammoth/delivery_worker.rb', line 13

def dead_letter_store
  @dead_letter_store
end

#max_attemptsObject (readonly)

Returns the value of attribute max_attempts.



13
14
15
# File 'lib/mammoth/delivery_worker.rb', line 13

def max_attempts
  @max_attempts
end

#publication_nameObject (readonly)

Returns the value of attribute publication_name.



13
14
15
# File 'lib/mammoth/delivery_worker.rb', line 13

def publication_name
  @publication_name
end

#retry_scheduleObject (readonly)

Returns the value of attribute retry_schedule.



13
14
15
# File 'lib/mammoth/delivery_worker.rb', line 13

def retry_schedule
  @retry_schedule
end

#sinkObject (readonly)

Returns the value of attribute sink.



13
14
15
# File 'lib/mammoth/delivery_worker.rb', line 13

def sink
  @sink
end

#sleeperObject (readonly)

Returns the value of attribute sleeper.



13
14
15
# File 'lib/mammoth/delivery_worker.rb', line 13

def sleeper
  @sleeper
end

#slot_nameObject (readonly)

Returns the value of attribute slot_name.



13
14
15
# File 'lib/mammoth/delivery_worker.rb', line 13

def slot_name
  @slot_name
end

#source_nameObject (readonly)

Returns the value of attribute source_name.



13
14
15
# File 'lib/mammoth/delivery_worker.rb', line 13

def source_name
  @source_name
end

Class Method Details

.from_config(config, sink:, checkpoint_store:, dead_letter_store:, sleeper: Kernel.method(:sleep)) ⇒ Mammoth::DeliveryWorker

Build a delivery worker from Mammoth configuration and stores.

Parameters:

Returns:



46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/mammoth/delivery_worker.rb', line 46

def self.from_config(config, sink:, checkpoint_store:, dead_letter_store:, sleeper: Kernel.method(:sleep))
  new(
    sink: sink,
    checkpoint_store: checkpoint_store,
    dead_letter_store: dead_letter_store,
    source_name: config.dig("mammoth", "name"),
    slot_name: config.dig("replication", "slot"),
    publication_name: config.dig("replication", "publication"),
    max_attempts: config.dig("retry", "max_attempts"),
    retry_schedule: config.dig("retry", "schedule_seconds"),
    sleeper: sleeper
  )
end

Instance Method Details

#deliver(event) ⇒ Hash

Deliver an event with retry, checkpoint, and DLQ handling.

Parameters:

  • event (Hash, #to_h)

    normalized event

Returns:

  • (Hash)

    delivery summary



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/mammoth/delivery_worker.rb', line 64

def deliver(event)
  attempts = 0

  begin
    attempts += 1
    result = sink.deliver(event)
    checkpoint(event)
    result.merge(attempts: attempts)
  rescue DeliveryError => e
    return dead_letter(event, e, attempts) if attempts >= max_attempts

    wait_before_retry(attempts)
    retry
  end
end