Class: Honker::Database

Inherits:
Object
  • Object
show all
Defined in:
lib/honker.rb

Overview

Database is a Honker handle over a SQLite file with the Honker extension loaded. The constructor bootstraps the schema; safe to open the same path from multiple processes.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path, extension_path:) ⇒ Database

Returns a new instance of Database.



46
47
48
49
50
51
52
53
# File 'lib/honker.rb', line 46

def initialize(path, extension_path:)
  @db = SQLite3::Database.new(path)
  @db.enable_load_extension(true)
  @db.load_extension(extension_path)
  @db.enable_load_extension(false)
  @db.execute_batch(DEFAULT_PRAGMAS)
  @db.execute("SELECT honker_bootstrap()")
end

Instance Attribute Details

#dbObject (readonly)

Returns the value of attribute db.



44
45
46
# File 'lib/honker.rb', line 44

def db
  @db
end

Instance Method Details

#closeObject



55
56
57
# File 'lib/honker.rb', line 55

def close
  @db&.close
end

#get_result(job_id) ⇒ Object

Fetch a stored result, or nil if absent or expired.



162
163
164
165
166
167
# File 'lib/honker.rb', line 162

def get_result(job_id)
  @db.get_first_row(
    "SELECT honker_result_get(?)",
    [job_id],
  )[0]
end

#notify(channel, payload) ⇒ Object

Fire a pg_notify-style pub/sub signal. Returns the notification id.



84
85
86
87
# File 'lib/honker.rb', line 84

def notify(channel, payload)
  row = @db.get_first_row("SELECT notify(?, ?)", [channel, JSON.dump(payload)])
  row[0]
end

#notify_tx(tx, channel, payload) ⇒ Object

Fire a notification inside an open transaction. The signal lands only when the transaction commits.



91
92
93
94
95
96
97
# File 'lib/honker.rb', line 91

def notify_tx(tx, channel, payload)
  row = tx.query_row(
    "SELECT notify(?, ?)",
    [channel, JSON.dump(payload)],
  )
  row[0]
end

#prune_notifications(older_than_s:) ⇒ Object

Delete notifications older than ‘older_than_s` seconds. Returns the number of rows deleted.



176
177
178
179
180
181
182
# File 'lib/honker.rb', line 176

def prune_notifications(older_than_s:)
  @db.execute(
    "DELETE FROM _honker_notifications WHERE created_at < unixepoch() - ?",
    [older_than_s],
  )
  @db.changes
end

#queue(name, visibility_timeout_s: 300, max_attempts: 3) ⇒ Object

Returns a Queue handle for a named queue.

visibility_timeout_s: 300   # claim expiry before reclaim
max_attempts:         3     # retries before moving to dead


63
64
65
66
67
68
69
70
# File 'lib/honker.rb', line 63

def queue(name, visibility_timeout_s: 300, max_attempts: 3)
  Queue.new(
    self,
    name,
    visibility_timeout_s: visibility_timeout_s,
    max_attempts: max_attempts,
  )
end

#save_result(job_id, value, ttl_s:) ⇒ Object

Persist a job result for later retrieval via ‘get_result`. `value` is stored verbatim — JSON-encode it yourself if you want to round-trip structured data.



153
154
155
156
157
158
159
# File 'lib/honker.rb', line 153

def save_result(job_id, value, ttl_s:)
  @db.get_first_row(
    "SELECT honker_result_save(?, ?, ?)",
    [job_id, value, ttl_s],
  )
  nil
end

#schedulerObject

Returns the cron-style Scheduler facade. Cheap — no allocation beyond the wrapper object.



79
80
81
# File 'lib/honker.rb', line 79

def scheduler
  Scheduler.new(self)
end

#stream(name) ⇒ Object

Returns a Stream handle for an append-only ordered log.



73
74
75
# File 'lib/honker.rb', line 73

def stream(name)
  Stream.new(self, name)
end

#sweep_rate_limits(older_than_s:) ⇒ Object

Sweep old rate-limit window rows. Returns count deleted.



143
144
145
146
147
148
# File 'lib/honker.rb', line 143

def sweep_rate_limits(older_than_s:)
  @db.get_first_row(
    "SELECT honker_rate_limit_sweep(?)",
    [older_than_s],
  )[0]
end

#sweep_resultsObject

Drop expired result rows. Returns count swept.



170
171
172
# File 'lib/honker.rb', line 170

def sweep_results
  @db.get_first_row("SELECT honker_result_sweep()")[0]
end

#transactionObject

Run a block inside a SQLite transaction. The block receives a Honker::Transaction; returning normally commits, raising rolls back, and ‘tx.rollback!` rolls back without surfacing an error.

db.transaction do |tx|
  tx.execute("INSERT INTO orders ...")
  q.enqueue_tx(tx, {order_id: 1})
end


107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/honker.rb', line 107

def transaction
  tx = Transaction.new(@db)
  begin
    @db.transaction do
      yield tx
    end
  rescue Transaction::Rollback
    # Caller used tx.rollback! to abort. The block exited with an
    # exception so the sqlite3 gem already rolled back — just
    # swallow the sentinel.
    nil
  end
end

#try_lock(name, owner:, ttl_s:) ⇒ Object

Try to acquire an advisory lock. Returns a ‘Lock` handle on success, `nil` if another owner holds it.



123
124
125
126
127
128
129
130
131
# File 'lib/honker.rb', line 123

def try_lock(name, owner:, ttl_s:)
  acquired = @db.get_first_row(
    "SELECT honker_lock_acquire(?, ?, ?)",
    [name, owner, ttl_s],
  )[0]
  return nil unless acquired == 1

  Lock.new(self, name, owner)
end

#try_rate_limit(name, limit:, per:) ⇒ Object

Fixed-window rate limiter. Returns true if this call fits within ‘limit` requests per `per` seconds.



135
136
137
138
139
140
# File 'lib/honker.rb', line 135

def try_rate_limit(name, limit:, per:)
  @db.get_first_row(
    "SELECT honker_rate_limit_try(?, ?, ?)",
    [name, limit, per],
  )[0] == 1
end