Class: Honker::Database

Inherits:
Object
  • Object
show all
Defined in:
lib/honker.rb

Overview

Database is a Honker handle over a SQLite file with the Honker extension loaded. The constructor bootstraps the schema; safe to open the same path from multiple processes.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path, extension_path:, watcher_backend: nil) ⇒ Database

Returns a new instance of Database.



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/honker.rb', line 89

def initialize(path, extension_path:, watcher_backend: nil)
  unless watcher_backend.nil? || watcher_backend.is_a?(String)
    raise ArgumentError, "unknown watcher backend"
  end

  @db = SQLite3::Database.new(path)
  @local_update_seq = 0
  @db.busy_timeout = 5000
  @db.execute("PRAGMA mmap_size = 0")
  @db.enable_load_extension(true)
  @db.load_extension(extension_path)
  @db.enable_load_extension(false)
  @db.execute_batch(DEFAULT_PRAGMAS)
  @db.execute("SELECT honker_bootstrap()")
  @watcher = CoreWatcher.new(path, extension_path, watcher_backend)
end

Instance Attribute Details

#dbObject (readonly)

Returns the value of attribute db.



87
88
89
# File 'lib/honker.rb', line 87

def db
  @db
end

Instance Method Details

#closeObject



106
107
108
109
# File 'lib/honker.rb', line 106

def close
  @watcher&.close
  @db&.close
end

#get_result(job_id) ⇒ Object

Fetch a stored result, or nil if absent or expired.



238
239
240
241
242
243
# File 'lib/honker.rb', line 238

def get_result(job_id)
  @db.get_first_row(
    "SELECT honker_result_get(?)",
    [job_id],
  )[0]
end

#mark_updatedObject



111
112
113
# File 'lib/honker.rb', line 111

def mark_updated
  @local_update_seq += 1
end

#notify(channel, payload) ⇒ Object

Fire a pg_notify-style pub/sub signal. Returns the notification id.



160
161
162
163
# File 'lib/honker.rb', line 160

def notify(channel, payload)
  row = @db.get_first_row("SELECT notify(?, ?)", [channel, JSON.dump(payload)])
  row[0]
end

#notify_tx(tx, channel, payload) ⇒ Object

Fire a notification inside an open transaction. The signal lands only when the transaction commits.



167
168
169
170
171
172
173
# File 'lib/honker.rb', line 167

def notify_tx(tx, channel, payload)
  row = tx.query_row(
    "SELECT notify(?, ?)",
    [channel, JSON.dump(payload)],
  )
  row[0]
end

#outbox(name, delivery, visibility_timeout_s: 60, max_attempts: 5, base_backoff_s: 5) ⇒ Object

Transactional side-effect delivery built on a reserved queue.



137
138
139
140
141
142
143
144
145
146
# File 'lib/honker.rb', line 137

def outbox(name, delivery, visibility_timeout_s: 60, max_attempts: 5, base_backoff_s: 5)
  Outbox.new(
    self,
    name,
    delivery,
    visibility_timeout_s: visibility_timeout_s,
    max_attempts: max_attempts,
    base_backoff_s: base_backoff_s,
  )
end

#prune_notifications(older_than_s:) ⇒ Object

Delete notifications older than ‘older_than_s` seconds. Returns the number of rows deleted.



252
253
254
255
256
257
258
# File 'lib/honker.rb', line 252

def prune_notifications(older_than_s:)
  @db.execute(
    "DELETE FROM _honker_notifications WHERE created_at < unixepoch() - ?",
    [older_than_s],
  )
  @db.changes
end

#queue(name, visibility_timeout_s: 300, max_attempts: 3) ⇒ Object

Returns a Queue handle for a named queue.

visibility_timeout_s: 300   # claim expiry before reclaim
max_attempts:         3     # retries before moving to dead


127
128
129
130
131
132
133
134
# File 'lib/honker.rb', line 127

def queue(name, visibility_timeout_s: 300, max_attempts: 3)
  Queue.new(
    self,
    name,
    visibility_timeout_s: visibility_timeout_s,
    max_attempts: max_attempts,
  )
end

#save_result(job_id, value, ttl_s:) ⇒ Object

Persist a job result for later retrieval via ‘get_result`. `value` is stored verbatim — JSON-encode it yourself if you want to round-trip structured data.



229
230
231
232
233
234
235
# File 'lib/honker.rb', line 229

def save_result(job_id, value, ttl_s:)
  @db.get_first_row(
    "SELECT honker_result_save(?, ?, ?)",
    [job_id, value, ttl_s],
  )
  nil
end

#schedulerObject

Returns the time-trigger Scheduler facade. Cheap — no allocation beyond the wrapper object.



155
156
157
# File 'lib/honker.rb', line 155

def scheduler
  Scheduler.new(self)
end

#stream(name) ⇒ Object

Returns a Stream handle for an append-only ordered log.



149
150
151
# File 'lib/honker.rb', line 149

def stream(name)
  Stream.new(self, name)
end

#sweep_rate_limits(older_than_s:) ⇒ Object

Sweep old rate-limit window rows. Returns count deleted.



219
220
221
222
223
224
# File 'lib/honker.rb', line 219

def sweep_rate_limits(older_than_s:)
  @db.get_first_row(
    "SELECT honker_rate_limit_sweep(?)",
    [older_than_s],
  )[0]
end

#sweep_resultsObject

Drop expired result rows. Returns count swept.



246
247
248
# File 'lib/honker.rb', line 246

def sweep_results
  @db.get_first_row("SELECT honker_result_sweep()")[0]
end

#transactionObject

Run a block inside a SQLite transaction. The block receives a Honker::Transaction; returning normally commits, raising rolls back, and ‘tx.rollback!` rolls back without surfacing an error.

db.transaction do |tx|
  tx.execute("INSERT INTO orders ...")
  q.enqueue_tx(tx, {order_id: 1})
end


183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/honker.rb', line 183

def transaction
  tx = Transaction.new(@db)
  begin
    @db.transaction do
      yield tx
    end
  rescue Transaction::Rollback
    # Caller used tx.rollback! to abort. The block exited with an
    # exception so the sqlite3 gem already rolled back — just
    # swallow the sentinel.
    nil
  end
end

#try_lock(name, owner:, ttl_s:) ⇒ Object

Try to acquire an advisory lock. Returns a ‘Lock` handle on success, `nil` if another owner holds it.



199
200
201
202
203
204
205
206
207
# File 'lib/honker.rb', line 199

def try_lock(name, owner:, ttl_s:)
  acquired = @db.get_first_row(
    "SELECT honker_lock_acquire(?, ?, ?)",
    [name, owner, ttl_s],
  )[0]
  return nil unless acquired == 1

  Lock.new(self, name, owner)
end

#try_rate_limit(name, limit:, per:) ⇒ Object

Fixed-window rate limiter. Returns true if this call fits within ‘limit` requests per `per` seconds.



211
212
213
214
215
216
# File 'lib/honker.rb', line 211

def try_rate_limit(name, limit:, per:)
  @db.get_first_row(
    "SELECT honker_rate_limit_try(?, ?, ?)",
    [name, limit, per],
  )[0] == 1
end

#update_snapshotObject



115
116
117
# File 'lib/honker.rb', line 115

def update_snapshot
  @local_update_seq
end

#wait_for_update(timeout_s) ⇒ Object



119
120
121
# File 'lib/honker.rb', line 119

def wait_for_update(timeout_s)
  @watcher.wait(timeout_s)
end