Class: Mammoth::DeadLetterStore
- Inherits:
-
Object
- Object
- Mammoth::DeadLetterStore
- Defined in:
- lib/mammoth/dead_letter_store.rb
Overview
Persists failed deliveries in Mammoth’s SQLite dead letter queue.
Instance Attribute Summary collapse
-
#sqlite_store ⇒ Object
readonly
Returns the value of attribute sqlite_store.
Instance Method Summary collapse
-
#count(status: nil) ⇒ Integer
Count dead letters by status.
-
#ignore(id) ⇒ void
Mark a dead letter as ignored.
-
#initialize(sqlite_store) ⇒ DeadLetterStore
constructor
A new instance of DeadLetterStore.
-
#pending(limit: 100) ⇒ Array<Hash>
Fetch pending dead letters.
-
#resolve(id) ⇒ void
Mark a dead letter as resolved.
-
#write(event:, destination_name:, error: nil, retry_count: 0) ⇒ Integer
Store a failed delivery.
Constructor Details
#initialize(sqlite_store) ⇒ DeadLetterStore
Returns a new instance of DeadLetterStore.
12 13 14 |
# File 'lib/mammoth/dead_letter_store.rb', line 12 def initialize(sqlite_store) @sqlite_store = sqlite_store end |
Instance Attribute Details
#sqlite_store ⇒ Object (readonly)
Returns the value of attribute sqlite_store.
9 10 11 |
# File 'lib/mammoth/dead_letter_store.rb', line 9 def sqlite_store @sqlite_store end |
Instance Method Details
#count(status: nil) ⇒ Integer
Count dead letters by status.
79 80 81 82 83 84 85 |
# File 'lib/mammoth/dead_letter_store.rb', line 79 def count(status: nil) if status database.get_first_value("SELECT COUNT(*) FROM dead_letters WHERE status = ?", [status]) else database.get_first_value("SELECT COUNT(*) FROM dead_letters") end end |
#ignore(id) ⇒ void
This method returns an undefined value.
Mark a dead letter as ignored.
99 100 101 |
# File 'lib/mammoth/dead_letter_store.rb', line 99 def ignore(id) update_status(id, "ignored") end |
#pending(limit: 100) ⇒ Array<Hash>
Fetch pending dead letters.
68 69 70 71 72 73 |
# File 'lib/mammoth/dead_letter_store.rb', line 68 def pending(limit: 100) database.execute( "SELECT * FROM dead_letters WHERE status = ? ORDER BY failed_at ASC LIMIT ?", ["pending", limit] ) end |
#resolve(id) ⇒ void
This method returns an undefined value.
Mark a dead letter as resolved.
91 92 93 |
# File 'lib/mammoth/dead_letter_store.rb', line 91 def resolve(id) update_status(id, "resolved") end |
#write(event:, destination_name:, error: nil, retry_count: 0) ⇒ Integer
Store a failed delivery.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/mammoth/dead_letter_store.rb', line 23 def write(event:, destination_name:, error: nil, retry_count: 0) # rubocop:disable Metrics/MethodLength now = Time.now.utc.iso8601 payload = EventSerializer.call(event) database.execute( <<~SQL, INSERT INTO dead_letters( event_id, source_name, destination_name, operation, namespace, entity, source_position, payload_json, error_class, error_message, retry_count, status, failed_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending', ?, ?) SQL [ payload.fetch("event_id"), payload.fetch("source"), destination_name, payload.fetch("operation"), payload["namespace"], payload["entity"], payload["source_position"], JSON.generate(payload), error&.class&.name, error&., retry_count, now, now ] ) database.last_insert_row_id end |