Class: Honker::Database

Inherits:
Object
  • Object
show all
Defined in:
lib/honker.rb

Overview

Database is a Honker handle over a SQLite file with the Honker extension loaded. The constructor bootstraps the schema; safe to open the same path from multiple processes.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path, extension_path:) ⇒ Database

Returns a new instance of Database.



46
47
48
49
50
51
52
53
54
# File 'lib/honker.rb', line 46

def initialize(path, extension_path:)
  @db = SQLite3::Database.new(path)
  @local_update_seq = 0
  @db.enable_load_extension(true)
  @db.load_extension(extension_path)
  @db.enable_load_extension(false)
  @db.execute_batch(DEFAULT_PRAGMAS)
  @db.execute("SELECT honker_bootstrap()")
end

Instance Attribute Details

#dbObject (readonly)

Returns the value of attribute db.



44
45
46
# File 'lib/honker.rb', line 44

def db
  @db
end

Instance Method Details

#closeObject



56
57
58
# File 'lib/honker.rb', line 56

def close
  @db&.close
end

#get_result(job_id) ⇒ Object

Fetch a stored result, or nil if absent or expired.



171
172
173
174
175
176
# File 'lib/honker.rb', line 171

def get_result(job_id)
  @db.get_first_row(
    "SELECT honker_result_get(?)",
    [job_id],
  )[0]
end

#mark_updatedObject



60
61
62
# File 'lib/honker.rb', line 60

def mark_updated
  @local_update_seq += 1
end

#notify(channel, payload) ⇒ Object

Fire a pg_notify-style pub/sub signal. Returns the notification id.



93
94
95
96
# File 'lib/honker.rb', line 93

def notify(channel, payload)
  row = @db.get_first_row("SELECT notify(?, ?)", [channel, JSON.dump(payload)])
  row[0]
end

#notify_tx(tx, channel, payload) ⇒ Object

Fire a notification inside an open transaction. The signal lands only when the transaction commits.



100
101
102
103
104
105
106
# File 'lib/honker.rb', line 100

def notify_tx(tx, channel, payload)
  row = tx.query_row(
    "SELECT notify(?, ?)",
    [channel, JSON.dump(payload)],
  )
  row[0]
end

#prune_notifications(older_than_s:) ⇒ Object

Delete notifications older than ‘older_than_s` seconds. Returns the number of rows deleted.



185
186
187
188
189
190
191
# File 'lib/honker.rb', line 185

def prune_notifications(older_than_s:)
  @db.execute(
    "DELETE FROM _honker_notifications WHERE created_at < unixepoch() - ?",
    [older_than_s],
  )
  @db.changes
end

#queue(name, visibility_timeout_s: 300, max_attempts: 3) ⇒ Object

Returns a Queue handle for a named queue.

visibility_timeout_s: 300   # claim expiry before reclaim
max_attempts:         3     # retries before moving to dead


72
73
74
75
76
77
78
79
# File 'lib/honker.rb', line 72

def queue(name, visibility_timeout_s: 300, max_attempts: 3)
  Queue.new(
    self,
    name,
    visibility_timeout_s: visibility_timeout_s,
    max_attempts: max_attempts,
  )
end

#save_result(job_id, value, ttl_s:) ⇒ Object

Persist a job result for later retrieval via ‘get_result`. `value` is stored verbatim — JSON-encode it yourself if you want to round-trip structured data.



162
163
164
165
166
167
168
# File 'lib/honker.rb', line 162

def save_result(job_id, value, ttl_s:)
  @db.get_first_row(
    "SELECT honker_result_save(?, ?, ?)",
    [job_id, value, ttl_s],
  )
  nil
end

#schedulerObject

Returns the time-trigger Scheduler facade. Cheap — no allocation beyond the wrapper object.



88
89
90
# File 'lib/honker.rb', line 88

def scheduler
  Scheduler.new(self)
end

#stream(name) ⇒ Object

Returns a Stream handle for an append-only ordered log.



82
83
84
# File 'lib/honker.rb', line 82

def stream(name)
  Stream.new(self, name)
end

#sweep_rate_limits(older_than_s:) ⇒ Object

Sweep old rate-limit window rows. Returns count deleted.



152
153
154
155
156
157
# File 'lib/honker.rb', line 152

def sweep_rate_limits(older_than_s:)
  @db.get_first_row(
    "SELECT honker_rate_limit_sweep(?)",
    [older_than_s],
  )[0]
end

#sweep_resultsObject

Drop expired result rows. Returns count swept.



179
180
181
# File 'lib/honker.rb', line 179

def sweep_results
  @db.get_first_row("SELECT honker_result_sweep()")[0]
end

#transactionObject

Run a block inside a SQLite transaction. The block receives a Honker::Transaction; returning normally commits, raising rolls back, and ‘tx.rollback!` rolls back without surfacing an error.

db.transaction do |tx|
  tx.execute("INSERT INTO orders ...")
  q.enqueue_tx(tx, {order_id: 1})
end


116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/honker.rb', line 116

def transaction
  tx = Transaction.new(@db)
  begin
    @db.transaction do
      yield tx
    end
  rescue Transaction::Rollback
    # Caller used tx.rollback! to abort. The block exited with an
    # exception so the sqlite3 gem already rolled back — just
    # swallow the sentinel.
    nil
  end
end

#try_lock(name, owner:, ttl_s:) ⇒ Object

Try to acquire an advisory lock. Returns a ‘Lock` handle on success, `nil` if another owner holds it.



132
133
134
135
136
137
138
139
140
# File 'lib/honker.rb', line 132

def try_lock(name, owner:, ttl_s:)
  acquired = @db.get_first_row(
    "SELECT honker_lock_acquire(?, ?, ?)",
    [name, owner, ttl_s],
  )[0]
  return nil unless acquired == 1

  Lock.new(self, name, owner)
end

#try_rate_limit(name, limit:, per:) ⇒ Object

Fixed-window rate limiter. Returns true if this call fits within ‘limit` requests per `per` seconds.



144
145
146
147
148
149
# File 'lib/honker.rb', line 144

def try_rate_limit(name, limit:, per:)
  @db.get_first_row(
    "SELECT honker_rate_limit_try(?, ?, ?)",
    [name, limit, per],
  )[0] == 1
end

#update_snapshotObject



64
65
66
# File 'lib/honker.rb', line 64

def update_snapshot
  @local_update_seq
end