Class: Mammoth::CheckpointStore
- Inherits:
-
Object
- Object
- Mammoth::CheckpointStore
- Defined in:
- lib/mammoth/checkpoint_store.rb
Overview
Persists source checkpoints in Mammoth’s SQLite operational store.
Instance Attribute Summary collapse
-
#sqlite_store ⇒ Object
readonly
Returns the value of attribute sqlite_store.
Instance Method Summary collapse
-
#count ⇒ Integer
Count checkpoint rows.
-
#fetch(source_name:, slot_name:) ⇒ Hash?
Fetch a checkpoint row.
-
#initialize(sqlite_store) ⇒ CheckpointStore
constructor
A new instance of CheckpointStore.
-
#write(source_name:, slot_name:, publication_name:, last_lsn:) ⇒ Hash
Insert or update the last successfully delivered source position.
Constructor Details
#initialize(sqlite_store) ⇒ CheckpointStore
Returns a new instance of CheckpointStore.
11 12 13 |
# File 'lib/mammoth/checkpoint_store.rb', line 11 def initialize(sqlite_store) @sqlite_store = sqlite_store end |
Instance Attribute Details
#sqlite_store ⇒ Object (readonly)
Returns the value of attribute sqlite_store.
8 9 10 |
# File 'lib/mammoth/checkpoint_store.rb', line 8 def sqlite_store @sqlite_store end |
Instance Method Details
#count ⇒ Integer
Count checkpoint rows.
54 55 56 |
# File 'lib/mammoth/checkpoint_store.rb', line 54 def count database.get_first_value("SELECT COUNT(*) FROM checkpoints") end |
#fetch(source_name:, slot_name:) ⇒ Hash?
Fetch a checkpoint row.
44 45 46 47 48 49 |
# File 'lib/mammoth/checkpoint_store.rb', line 44 def fetch(source_name:, slot_name:) database.get_first_row( "SELECT * FROM checkpoints WHERE source_name = ? AND slot_name = ? LIMIT 1", [source_name, slot_name] ) end |
#write(source_name:, slot_name:, publication_name:, last_lsn:) ⇒ Hash
Insert or update the last successfully delivered source position.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/mammoth/checkpoint_store.rb', line 22 def write(source_name:, slot_name:, publication_name:, last_lsn:) now = Time.now.utc.iso8601 database.execute( <<~SQL, INSERT INTO checkpoints(source_name, slot_name, publication_name, last_lsn, updated_at) VALUES (?, ?, ?, ?, ?) ON CONFLICT(source_name, slot_name) DO UPDATE SET publication_name = excluded.publication_name, last_lsn = excluded.last_lsn, updated_at = excluded.updated_at SQL [source_name, slot_name, publication_name, last_lsn, now] ) fetch(source_name: source_name, slot_name: slot_name) end |