Class: Kaal::Dispatch::DatabaseEngine
- Defined in:
- lib/kaal/dispatch/database_engine.rb
Overview
Sequel-backed dispatch registry stored in kaal_dispatches.
Class Method Summary collapse
Instance Method Summary collapse
- #cleanup(recovery_window: 86_400) ⇒ Object
- #find_by_key(key) ⇒ Object
- #find_by_node(node_id) ⇒ Object
- #find_by_status(status) ⇒ Object
- #find_dispatch(key, fire_time) ⇒ Object
-
#initialize(database:, namespace: nil) ⇒ DatabaseEngine
constructor
A new instance of DatabaseEngine.
- #log_dispatch(key, fire_time, node_id, status = 'dispatched') ⇒ Object
Methods inherited from Registry
Constructor Details
#initialize(database:, namespace: nil) ⇒ DatabaseEngine
Returns a new instance of DatabaseEngine.
14 15 16 17 18 |
# File 'lib/kaal/dispatch/database_engine.rb', line 14 def initialize(database:, namespace: nil) super() @database = Kaal::Persistence::Database.new(database) @namespace = namespace end |
Class Method Details
.normalize_row(row, namespace: nil) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/kaal/dispatch/database_engine.rb', line 71 def self.normalize_row(row, namespace: nil) return nil unless row { key: strip_namespace(row[:key], namespace:), fire_time: row[:fire_time], dispatched_at: row[:dispatched_at], node_id: row[:node_id], status: row[:status] } end |
.strip_namespace(key, namespace:) ⇒ Object
83 84 85 86 87 88 |
# File 'lib/kaal/dispatch/database_engine.rb', line 83 def self.strip_namespace(key, namespace:) return key if namespace.to_s.empty? prefix = "#{namespace}:" key.start_with?(prefix) ? key.delete_prefix(prefix) : key end |
Instance Method Details
#cleanup(recovery_window: 86_400) ⇒ Object
66 67 68 69 |
# File 'lib/kaal/dispatch/database_engine.rb', line 66 def cleanup(recovery_window: 86_400) cutoff_time = Time.now.utc - recovery_window cleanup_dataset.where { fire_time < cutoff_time }.delete end |
#find_by_key(key) ⇒ Object
54 55 56 |
# File 'lib/kaal/dispatch/database_engine.rb', line 54 def find_by_key(key) query(key: namespaced_key(key)) end |
#find_by_node(node_id) ⇒ Object
58 59 60 |
# File 'lib/kaal/dispatch/database_engine.rb', line 58 def find_by_node(node_id) query(node_id: node_id) end |
#find_by_status(status) ⇒ Object
62 63 64 |
# File 'lib/kaal/dispatch/database_engine.rb', line 62 def find_by_status(status) query(status: status) end |
#find_dispatch(key, fire_time) ⇒ Object
50 51 52 |
# File 'lib/kaal/dispatch/database_engine.rb', line 50 def find_dispatch(key, fire_time) self.class.normalize_row(dataset.where(key: namespaced_key(key), fire_time: fire_time).first, namespace: @namespace) end |
#log_dispatch(key, fire_time, node_id, status = 'dispatched') ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/kaal/dispatch/database_engine.rb', line 20 def log_dispatch(key, fire_time, node_id, status = 'dispatched') now = Time.now.utc storage_key = namespaced_key(key) attributes = { key: storage_key, fire_time: fire_time, dispatched_at: now, node_id: node_id, status: status } dispatches_dataset = dataset update_values = { dispatched_at: now, node_id: node_id, status: status } begin dispatches_dataset.insert_conflict( target: %i[key fire_time], update: update_values ).insert(attributes) rescue NoMethodError => e raise unless e.name == :insert_conflict begin dispatches_dataset.insert(attributes) rescue ::Sequel::UniqueConstraintViolation dispatches_dataset.where(key: storage_key, fire_time: fire_time).update(update_values) end end find_dispatch(key, fire_time) end |