Class: Honker::Database
- Inherits:
-
Object
- Object
- Honker::Database
- Defined in:
- lib/honker.rb
Overview
Database is a Honker handle over a SQLite file with the Honker extension loaded. The constructor bootstraps the schema; safe to open the same path from multiple processes.
Instance Attribute Summary collapse
-
#db ⇒ Object
readonly
Returns the value of attribute db.
Instance Method Summary collapse
- #close ⇒ Object
-
#get_result(job_id) ⇒ Object
Fetch a stored result, or nil if absent or expired.
-
#initialize(path, extension_path:) ⇒ Database
constructor
A new instance of Database.
- #mark_updated ⇒ Object
-
#notify(channel, payload) ⇒ Object
Fire a pg_notify-style pub/sub signal.
-
#notify_tx(tx, channel, payload) ⇒ Object
Fire a notification inside an open transaction.
-
#prune_notifications(older_than_s:) ⇒ Object
Delete notifications older than ‘older_than_s` seconds.
-
#queue(name, visibility_timeout_s: 300, max_attempts: 3) ⇒ Object
Returns a Queue handle for a named queue.
-
#save_result(job_id, value, ttl_s:) ⇒ Object
Persist a job result for later retrieval via ‘get_result`.
-
#scheduler ⇒ Object
Returns the time-trigger Scheduler facade.
-
#stream(name) ⇒ Object
Returns a Stream handle for an append-only ordered log.
-
#sweep_rate_limits(older_than_s:) ⇒ Object
Sweep old rate-limit window rows.
-
#sweep_results ⇒ Object
Drop expired result rows.
-
#transaction ⇒ Object
Run a block inside a SQLite transaction.
-
#try_lock(name, owner:, ttl_s:) ⇒ Object
Try to acquire an advisory lock.
-
#try_rate_limit(name, limit:, per:) ⇒ Object
Fixed-window rate limiter.
- #update_snapshot ⇒ Object
Constructor Details
#initialize(path, extension_path:) ⇒ Database
Returns a new instance of Database.
46 47 48 49 50 51 52 53 54 |
# File 'lib/honker.rb', line 46 def initialize(path, extension_path:) @db = SQLite3::Database.new(path) @local_update_seq = 0 @db.enable_load_extension(true) @db.load_extension(extension_path) @db.enable_load_extension(false) @db.execute_batch(DEFAULT_PRAGMAS) @db.execute("SELECT honker_bootstrap()") end |
Instance Attribute Details
#db ⇒ Object (readonly)
Returns the value of attribute db.
44 45 46 |
# File 'lib/honker.rb', line 44 def db @db end |
Instance Method Details
#close ⇒ Object
56 57 58 |
# File 'lib/honker.rb', line 56 def close @db&.close end |
#get_result(job_id) ⇒ Object
Fetch a stored result, or nil if absent or expired.
171 172 173 174 175 176 |
# File 'lib/honker.rb', line 171 def get_result(job_id) @db.get_first_row( "SELECT honker_result_get(?)", [job_id], )[0] end |
#mark_updated ⇒ Object
60 61 62 |
# File 'lib/honker.rb', line 60 def mark_updated @local_update_seq += 1 end |
#notify(channel, payload) ⇒ Object
Fire a pg_notify-style pub/sub signal. Returns the notification id.
93 94 95 96 |
# File 'lib/honker.rb', line 93 def notify(channel, payload) row = @db.get_first_row("SELECT notify(?, ?)", [channel, JSON.dump(payload)]) row[0] end |
#notify_tx(tx, channel, payload) ⇒ Object
Fire a notification inside an open transaction. The signal lands only when the transaction commits.
100 101 102 103 104 105 106 |
# File 'lib/honker.rb', line 100 def notify_tx(tx, channel, payload) row = tx.query_row( "SELECT notify(?, ?)", [channel, JSON.dump(payload)], ) row[0] end |
#prune_notifications(older_than_s:) ⇒ Object
Delete notifications older than ‘older_than_s` seconds. Returns the number of rows deleted.
185 186 187 188 189 190 191 |
# File 'lib/honker.rb', line 185 def prune_notifications(older_than_s:) @db.execute( "DELETE FROM _honker_notifications WHERE created_at < unixepoch() - ?", [older_than_s], ) @db.changes end |
#queue(name, visibility_timeout_s: 300, max_attempts: 3) ⇒ Object
Returns a Queue handle for a named queue.
visibility_timeout_s: 300 # claim expiry before reclaim
max_attempts: 3 # retries before moving to dead
72 73 74 75 76 77 78 79 |
# File 'lib/honker.rb', line 72 def queue(name, visibility_timeout_s: 300, max_attempts: 3) Queue.new( self, name, visibility_timeout_s: visibility_timeout_s, max_attempts: max_attempts, ) end |
#save_result(job_id, value, ttl_s:) ⇒ Object
Persist a job result for later retrieval via ‘get_result`. `value` is stored verbatim — JSON-encode it yourself if you want to round-trip structured data.
162 163 164 165 166 167 168 |
# File 'lib/honker.rb', line 162 def save_result(job_id, value, ttl_s:) @db.get_first_row( "SELECT honker_result_save(?, ?, ?)", [job_id, value, ttl_s], ) nil end |
#scheduler ⇒ Object
Returns the time-trigger Scheduler facade. Cheap — no allocation beyond the wrapper object.
88 89 90 |
# File 'lib/honker.rb', line 88 def scheduler Scheduler.new(self) end |
#stream(name) ⇒ Object
Returns a Stream handle for an append-only ordered log.
82 83 84 |
# File 'lib/honker.rb', line 82 def stream(name) Stream.new(self, name) end |
#sweep_rate_limits(older_than_s:) ⇒ Object
Sweep old rate-limit window rows. Returns count deleted.
152 153 154 155 156 157 |
# File 'lib/honker.rb', line 152 def sweep_rate_limits(older_than_s:) @db.get_first_row( "SELECT honker_rate_limit_sweep(?)", [older_than_s], )[0] end |
#sweep_results ⇒ Object
Drop expired result rows. Returns count swept.
179 180 181 |
# File 'lib/honker.rb', line 179 def sweep_results @db.get_first_row("SELECT honker_result_sweep()")[0] end |
#transaction ⇒ Object
Run a block inside a SQLite transaction. The block receives a Honker::Transaction; returning normally commits, raising rolls back, and ‘tx.rollback!` rolls back without surfacing an error.
db.transaction do |tx|
tx.execute("INSERT INTO orders ...")
q.enqueue_tx(tx, {order_id: 1})
end
116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/honker.rb', line 116 def transaction tx = Transaction.new(@db) begin @db.transaction do yield tx end rescue Transaction::Rollback # Caller used tx.rollback! to abort. The block exited with an # exception so the sqlite3 gem already rolled back — just # swallow the sentinel. nil end end |
#try_lock(name, owner:, ttl_s:) ⇒ Object
Try to acquire an advisory lock. Returns a ‘Lock` handle on success, `nil` if another owner holds it.
132 133 134 135 136 137 138 139 140 |
# File 'lib/honker.rb', line 132 def try_lock(name, owner:, ttl_s:) acquired = @db.get_first_row( "SELECT honker_lock_acquire(?, ?, ?)", [name, owner, ttl_s], )[0] return nil unless acquired == 1 Lock.new(self, name, owner) end |
#try_rate_limit(name, limit:, per:) ⇒ Object
Fixed-window rate limiter. Returns true if this call fits within ‘limit` requests per `per` seconds.
144 145 146 147 148 149 |
# File 'lib/honker.rb', line 144 def try_rate_limit(name, limit:, per:) @db.get_first_row( "SELECT honker_rate_limit_try(?, ?, ?)", [name, limit, per], )[0] == 1 end |
#update_snapshot ⇒ Object
64 65 66 |
# File 'lib/honker.rb', line 64 def update_snapshot @local_update_seq end |