Class: Honker::Database
- Inherits:
-
Object
- Object
- Honker::Database
- Defined in:
- lib/honker.rb
Overview
Database is a Honker handle over a SQLite file with the Honker extension loaded. The constructor bootstraps the schema; safe to open the same path from multiple processes.
Instance Attribute Summary collapse
-
#db ⇒ Object
readonly
Returns the value of attribute db.
Instance Method Summary collapse
- #close ⇒ Object
-
#get_result(job_id) ⇒ Object
Fetch a stored result, or nil if absent or expired.
-
#initialize(path, extension_path:) ⇒ Database
constructor
A new instance of Database.
-
#notify(channel, payload) ⇒ Object
Fire a pg_notify-style pub/sub signal.
-
#notify_tx(tx, channel, payload) ⇒ Object
Fire a notification inside an open transaction.
-
#prune_notifications(older_than_s:) ⇒ Object
Delete notifications older than ‘older_than_s` seconds.
-
#queue(name, visibility_timeout_s: 300, max_attempts: 3) ⇒ Object
Returns a Queue handle for a named queue.
-
#save_result(job_id, value, ttl_s:) ⇒ Object
Persist a job result for later retrieval via ‘get_result`.
-
#scheduler ⇒ Object
Returns the cron-style Scheduler facade.
-
#stream(name) ⇒ Object
Returns a Stream handle for an append-only ordered log.
-
#sweep_rate_limits(older_than_s:) ⇒ Object
Sweep old rate-limit window rows.
-
#sweep_results ⇒ Object
Drop expired result rows.
-
#transaction ⇒ Object
Run a block inside a SQLite transaction.
-
#try_lock(name, owner:, ttl_s:) ⇒ Object
Try to acquire an advisory lock.
-
#try_rate_limit(name, limit:, per:) ⇒ Object
Fixed-window rate limiter.
Constructor Details
#initialize(path, extension_path:) ⇒ Database
Returns a new instance of Database.
46 47 48 49 50 51 52 53 |
# File 'lib/honker.rb', line 46 def initialize(path, extension_path:) @db = SQLite3::Database.new(path) @db.enable_load_extension(true) @db.load_extension(extension_path) @db.enable_load_extension(false) @db.execute_batch(DEFAULT_PRAGMAS) @db.execute("SELECT honker_bootstrap()") end |
Instance Attribute Details
#db ⇒ Object (readonly)
Returns the value of attribute db.
44 45 46 |
# File 'lib/honker.rb', line 44 def db @db end |
Instance Method Details
#close ⇒ Object
55 56 57 |
# File 'lib/honker.rb', line 55 def close @db&.close end |
#get_result(job_id) ⇒ Object
Fetch a stored result, or nil if absent or expired.
162 163 164 165 166 167 |
# File 'lib/honker.rb', line 162 def get_result(job_id) @db.get_first_row( "SELECT honker_result_get(?)", [job_id], )[0] end |
#notify(channel, payload) ⇒ Object
Fire a pg_notify-style pub/sub signal. Returns the notification id.
84 85 86 87 |
# File 'lib/honker.rb', line 84 def notify(channel, payload) row = @db.get_first_row("SELECT notify(?, ?)", [channel, JSON.dump(payload)]) row[0] end |
#notify_tx(tx, channel, payload) ⇒ Object
Fire a notification inside an open transaction. The signal lands only when the transaction commits.
91 92 93 94 95 96 97 |
# File 'lib/honker.rb', line 91 def notify_tx(tx, channel, payload) row = tx.query_row( "SELECT notify(?, ?)", [channel, JSON.dump(payload)], ) row[0] end |
#prune_notifications(older_than_s:) ⇒ Object
Delete notifications older than ‘older_than_s` seconds. Returns the number of rows deleted.
176 177 178 179 180 181 182 |
# File 'lib/honker.rb', line 176 def prune_notifications(older_than_s:) @db.execute( "DELETE FROM _honker_notifications WHERE created_at < unixepoch() - ?", [older_than_s], ) @db.changes end |
#queue(name, visibility_timeout_s: 300, max_attempts: 3) ⇒ Object
Returns a Queue handle for a named queue.
visibility_timeout_s: 300 # claim expiry before reclaim
max_attempts: 3 # retries before moving to dead
63 64 65 66 67 68 69 70 |
# File 'lib/honker.rb', line 63 def queue(name, visibility_timeout_s: 300, max_attempts: 3) Queue.new( self, name, visibility_timeout_s: visibility_timeout_s, max_attempts: max_attempts, ) end |
#save_result(job_id, value, ttl_s:) ⇒ Object
Persist a job result for later retrieval via ‘get_result`. `value` is stored verbatim — JSON-encode it yourself if you want to round-trip structured data.
153 154 155 156 157 158 159 |
# File 'lib/honker.rb', line 153 def save_result(job_id, value, ttl_s:) @db.get_first_row( "SELECT honker_result_save(?, ?, ?)", [job_id, value, ttl_s], ) nil end |
#scheduler ⇒ Object
Returns the cron-style Scheduler facade. Cheap — no allocation beyond the wrapper object.
79 80 81 |
# File 'lib/honker.rb', line 79 def scheduler Scheduler.new(self) end |
#stream(name) ⇒ Object
Returns a Stream handle for an append-only ordered log.
73 74 75 |
# File 'lib/honker.rb', line 73 def stream(name) Stream.new(self, name) end |
#sweep_rate_limits(older_than_s:) ⇒ Object
Sweep old rate-limit window rows. Returns count deleted.
143 144 145 146 147 148 |
# File 'lib/honker.rb', line 143 def sweep_rate_limits(older_than_s:) @db.get_first_row( "SELECT honker_rate_limit_sweep(?)", [older_than_s], )[0] end |
#sweep_results ⇒ Object
Drop expired result rows. Returns count swept.
170 171 172 |
# File 'lib/honker.rb', line 170 def sweep_results @db.get_first_row("SELECT honker_result_sweep()")[0] end |
#transaction ⇒ Object
Run a block inside a SQLite transaction. The block receives a Honker::Transaction; returning normally commits, raising rolls back, and ‘tx.rollback!` rolls back without surfacing an error.
db.transaction do |tx|
tx.execute("INSERT INTO orders ...")
q.enqueue_tx(tx, {order_id: 1})
end
107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/honker.rb', line 107 def transaction tx = Transaction.new(@db) begin @db.transaction do yield tx end rescue Transaction::Rollback # Caller used tx.rollback! to abort. The block exited with an # exception so the sqlite3 gem already rolled back — just # swallow the sentinel. nil end end |
#try_lock(name, owner:, ttl_s:) ⇒ Object
Try to acquire an advisory lock. Returns a ‘Lock` handle on success, `nil` if another owner holds it.
123 124 125 126 127 128 129 130 131 |
# File 'lib/honker.rb', line 123 def try_lock(name, owner:, ttl_s:) acquired = @db.get_first_row( "SELECT honker_lock_acquire(?, ?, ?)", [name, owner, ttl_s], )[0] return nil unless acquired == 1 Lock.new(self, name, owner) end |
#try_rate_limit(name, limit:, per:) ⇒ Object
Fixed-window rate limiter. Returns true if this call fits within ‘limit` requests per `per` seconds.
135 136 137 138 139 140 |
# File 'lib/honker.rb', line 135 def try_rate_limit(name, limit:, per:) @db.get_first_row( "SELECT honker_rate_limit_try(?, ?, ?)", [name, limit, per], )[0] == 1 end |