Class: Kaal::Dispatch::DatabaseEngine

Inherits:
Registry
  • Object
show all
Defined in:
lib/kaal/dispatch/database_engine.rb

Overview

Sequel-backed dispatch registry stored in kaal_dispatches.

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Registry

#dispatched?

Constructor Details

#initialize(database:, namespace: nil) ⇒ DatabaseEngine

Returns a new instance of DatabaseEngine.



14
15
16
17
18
# File 'lib/kaal/dispatch/database_engine.rb', line 14

def initialize(database:, namespace: nil)
  super()
  @database = Kaal::Persistence::Database.new(database)
  @namespace = namespace
end

Class Method Details

.normalize_row(row, namespace: nil) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
# File 'lib/kaal/dispatch/database_engine.rb', line 71

def self.normalize_row(row, namespace: nil)
  return nil unless row

  {
    key: strip_namespace(row[:key], namespace:),
    fire_time: row[:fire_time],
    dispatched_at: row[:dispatched_at],
    node_id: row[:node_id],
    status: row[:status]
  }
end

.strip_namespace(key, namespace:) ⇒ Object



83
84
85
86
87
88
# File 'lib/kaal/dispatch/database_engine.rb', line 83

def self.strip_namespace(key, namespace:)
  return key if namespace.to_s.empty?

  prefix = "#{namespace}:"
  key.start_with?(prefix) ? key.delete_prefix(prefix) : key
end

Instance Method Details

#cleanup(recovery_window: 86_400) ⇒ Object



66
67
68
69
# File 'lib/kaal/dispatch/database_engine.rb', line 66

def cleanup(recovery_window: 86_400)
  cutoff_time = Time.now.utc - recovery_window
  cleanup_dataset.where { fire_time < cutoff_time }.delete
end

#find_by_key(key) ⇒ Object



54
55
56
# File 'lib/kaal/dispatch/database_engine.rb', line 54

def find_by_key(key)
  query(key: namespaced_key(key))
end

#find_by_node(node_id) ⇒ Object



58
59
60
# File 'lib/kaal/dispatch/database_engine.rb', line 58

def find_by_node(node_id)
  query(node_id: node_id)
end

#find_by_status(status) ⇒ Object



62
63
64
# File 'lib/kaal/dispatch/database_engine.rb', line 62

def find_by_status(status)
  query(status: status)
end

#find_dispatch(key, fire_time) ⇒ Object



50
51
52
# File 'lib/kaal/dispatch/database_engine.rb', line 50

def find_dispatch(key, fire_time)
  self.class.normalize_row(dataset.where(key: namespaced_key(key), fire_time: fire_time).first, namespace: @namespace)
end

#log_dispatch(key, fire_time, node_id, status = 'dispatched') ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/kaal/dispatch/database_engine.rb', line 20

def log_dispatch(key, fire_time, node_id, status = 'dispatched')
  now = Time.now.utc
  storage_key = namespaced_key(key)
  attributes = {
    key: storage_key,
    fire_time: fire_time,
    dispatched_at: now,
    node_id: node_id,
    status: status
  }
  dispatches_dataset = dataset
  update_values = { dispatched_at: now, node_id: node_id, status: status }
  begin
    dispatches_dataset.insert_conflict(
      target: %i[key fire_time],
      update: update_values
    ).insert(attributes)
  rescue NoMethodError => e
    raise unless e.name == :insert_conflict

    begin
      dispatches_dataset.insert(attributes)
    rescue ::Sequel::UniqueConstraintViolation
      dispatches_dataset.where(key: storage_key, fire_time: fire_time).update(update_values)
    end
  end

  find_dispatch(key, fire_time)
end