Class: Honker::Database
- Inherits:
-
Object
- Object
- Honker::Database
- Defined in:
- lib/honker.rb
Overview
Database is a Honker handle over a SQLite file with the Honker extension loaded. The constructor bootstraps the schema; safe to open the same path from multiple processes.
Instance Attribute Summary collapse
-
#db ⇒ Object
readonly
Returns the value of attribute db.
Instance Method Summary collapse
- #close ⇒ Object
-
#get_result(job_id) ⇒ Object
Fetch a stored result, or nil if absent or expired.
-
#initialize(path, extension_path:, watcher_backend: nil) ⇒ Database
constructor
A new instance of Database.
- #mark_updated ⇒ Object
-
#notify(channel, payload) ⇒ Object
Fire a pg_notify-style pub/sub signal.
-
#notify_tx(tx, channel, payload) ⇒ Object
Fire a notification inside an open transaction.
-
#outbox(name, delivery, visibility_timeout_s: 60, max_attempts: 5, base_backoff_s: 5) ⇒ Object
Transactional side-effect delivery built on a reserved queue.
-
#prune_notifications(older_than_s:) ⇒ Object
Delete notifications older than ‘older_than_s` seconds.
-
#queue(name, visibility_timeout_s: 300, max_attempts: 3) ⇒ Object
Returns a Queue handle for a named queue.
-
#save_result(job_id, value, ttl_s:) ⇒ Object
Persist a job result for later retrieval via ‘get_result`.
-
#scheduler ⇒ Object
Returns the time-trigger Scheduler facade.
-
#stream(name) ⇒ Object
Returns a Stream handle for an append-only ordered log.
-
#sweep_rate_limits(older_than_s:) ⇒ Object
Sweep old rate-limit window rows.
-
#sweep_results ⇒ Object
Drop expired result rows.
-
#transaction ⇒ Object
Run a block inside a SQLite transaction.
-
#try_lock(name, owner:, ttl_s:) ⇒ Object
Try to acquire an advisory lock.
-
#try_rate_limit(name, limit:, per:) ⇒ Object
Fixed-window rate limiter.
- #update_snapshot ⇒ Object
- #wait_for_update(timeout_s) ⇒ Object
Constructor Details
#initialize(path, extension_path:, watcher_backend: nil) ⇒ Database
Returns a new instance of Database.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/honker.rb', line 89 def initialize(path, extension_path:, watcher_backend: nil) unless watcher_backend.nil? || watcher_backend.is_a?(String) raise ArgumentError, "unknown watcher backend" end @db = SQLite3::Database.new(path) @local_update_seq = 0 @db.busy_timeout = 5000 @db.execute("PRAGMA mmap_size = 0") @db.enable_load_extension(true) @db.load_extension(extension_path) @db.enable_load_extension(false) @db.execute_batch(DEFAULT_PRAGMAS) @db.execute("SELECT honker_bootstrap()") @watcher = CoreWatcher.new(path, extension_path, watcher_backend) end |
Instance Attribute Details
#db ⇒ Object (readonly)
Returns the value of attribute db.
87 88 89 |
# File 'lib/honker.rb', line 87 def db @db end |
Instance Method Details
#close ⇒ Object
106 107 108 109 |
# File 'lib/honker.rb', line 106 def close @watcher&.close @db&.close end |
#get_result(job_id) ⇒ Object
Fetch a stored result, or nil if absent or expired.
238 239 240 241 242 243 |
# File 'lib/honker.rb', line 238 def get_result(job_id) @db.get_first_row( "SELECT honker_result_get(?)", [job_id], )[0] end |
#mark_updated ⇒ Object
111 112 113 |
# File 'lib/honker.rb', line 111 def mark_updated @local_update_seq += 1 end |
#notify(channel, payload) ⇒ Object
Fire a pg_notify-style pub/sub signal. Returns the notification id.
160 161 162 163 |
# File 'lib/honker.rb', line 160 def notify(channel, payload) row = @db.get_first_row("SELECT notify(?, ?)", [channel, JSON.dump(payload)]) row[0] end |
#notify_tx(tx, channel, payload) ⇒ Object
Fire a notification inside an open transaction. The signal lands only when the transaction commits.
167 168 169 170 171 172 173 |
# File 'lib/honker.rb', line 167 def notify_tx(tx, channel, payload) row = tx.query_row( "SELECT notify(?, ?)", [channel, JSON.dump(payload)], ) row[0] end |
#outbox(name, delivery, visibility_timeout_s: 60, max_attempts: 5, base_backoff_s: 5) ⇒ Object
Transactional side-effect delivery built on a reserved queue.
137 138 139 140 141 142 143 144 145 146 |
# File 'lib/honker.rb', line 137 def outbox(name, delivery, visibility_timeout_s: 60, max_attempts: 5, base_backoff_s: 5) Outbox.new( self, name, delivery, visibility_timeout_s: visibility_timeout_s, max_attempts: max_attempts, base_backoff_s: base_backoff_s, ) end |
#prune_notifications(older_than_s:) ⇒ Object
Delete notifications older than ‘older_than_s` seconds. Returns the number of rows deleted.
252 253 254 255 256 257 258 |
# File 'lib/honker.rb', line 252 def prune_notifications(older_than_s:) @db.execute( "DELETE FROM _honker_notifications WHERE created_at < unixepoch() - ?", [older_than_s], ) @db.changes end |
#queue(name, visibility_timeout_s: 300, max_attempts: 3) ⇒ Object
Returns a Queue handle for a named queue.
visibility_timeout_s: 300 # claim expiry before reclaim
max_attempts: 3 # retries before moving to dead
127 128 129 130 131 132 133 134 |
# File 'lib/honker.rb', line 127 def queue(name, visibility_timeout_s: 300, max_attempts: 3) Queue.new( self, name, visibility_timeout_s: visibility_timeout_s, max_attempts: max_attempts, ) end |
#save_result(job_id, value, ttl_s:) ⇒ Object
Persist a job result for later retrieval via ‘get_result`. `value` is stored verbatim — JSON-encode it yourself if you want to round-trip structured data.
229 230 231 232 233 234 235 |
# File 'lib/honker.rb', line 229 def save_result(job_id, value, ttl_s:) @db.get_first_row( "SELECT honker_result_save(?, ?, ?)", [job_id, value, ttl_s], ) nil end |
#scheduler ⇒ Object
Returns the time-trigger Scheduler facade. Cheap — no allocation beyond the wrapper object.
155 156 157 |
# File 'lib/honker.rb', line 155 def scheduler Scheduler.new(self) end |
#stream(name) ⇒ Object
Returns a Stream handle for an append-only ordered log.
149 150 151 |
# File 'lib/honker.rb', line 149 def stream(name) Stream.new(self, name) end |
#sweep_rate_limits(older_than_s:) ⇒ Object
Sweep old rate-limit window rows. Returns count deleted.
219 220 221 222 223 224 |
# File 'lib/honker.rb', line 219 def sweep_rate_limits(older_than_s:) @db.get_first_row( "SELECT honker_rate_limit_sweep(?)", [older_than_s], )[0] end |
#sweep_results ⇒ Object
Drop expired result rows. Returns count swept.
246 247 248 |
# File 'lib/honker.rb', line 246 def sweep_results @db.get_first_row("SELECT honker_result_sweep()")[0] end |
#transaction ⇒ Object
Run a block inside a SQLite transaction. The block receives a Honker::Transaction; returning normally commits, raising rolls back, and ‘tx.rollback!` rolls back without surfacing an error.
db.transaction do |tx|
tx.execute("INSERT INTO orders ...")
q.enqueue_tx(tx, {order_id: 1})
end
183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/honker.rb', line 183 def transaction tx = Transaction.new(@db) begin @db.transaction do yield tx end rescue Transaction::Rollback # Caller used tx.rollback! to abort. The block exited with an # exception so the sqlite3 gem already rolled back — just # swallow the sentinel. nil end end |
#try_lock(name, owner:, ttl_s:) ⇒ Object
Try to acquire an advisory lock. Returns a ‘Lock` handle on success, `nil` if another owner holds it.
199 200 201 202 203 204 205 206 207 |
# File 'lib/honker.rb', line 199 def try_lock(name, owner:, ttl_s:) acquired = @db.get_first_row( "SELECT honker_lock_acquire(?, ?, ?)", [name, owner, ttl_s], )[0] return nil unless acquired == 1 Lock.new(self, name, owner) end |
#try_rate_limit(name, limit:, per:) ⇒ Object
Fixed-window rate limiter. Returns true if this call fits within ‘limit` requests per `per` seconds.
211 212 213 214 215 216 |
# File 'lib/honker.rb', line 211 def try_rate_limit(name, limit:, per:) @db.get_first_row( "SELECT honker_rate_limit_try(?, ?, ?)", [name, limit, per], )[0] == 1 end |
#update_snapshot ⇒ Object
115 116 117 |
# File 'lib/honker.rb', line 115 def update_snapshot @local_update_seq end |
#wait_for_update(timeout_s) ⇒ Object
119 120 121 |
# File 'lib/honker.rb', line 119 def wait_for_update(timeout_s) @watcher.wait(timeout_s) end |