Class: BSV::Wallet::PostgresStore
- Inherits:
-
Object
- Object
- BSV::Wallet::PostgresStore
- Includes:
- StorageAdapter
- Defined in:
- lib/bsv/wallet_postgres/postgres_store.rb
Overview
PostgreSQL-backed storage adapter for BSV::Wallet.
Implements the full StorageAdapter interface against a Sequel Database object. Survives process restarts, scales to multiple instances, and is thread-safe via Sequel’s connection pool.
Design notes
-
JSONB is the source of truth. Every row stores the full record hash in a
datajsonb column; dedicated indexed columns (basket,tags,labels,certifier, …) exist only to make queries fast. Reads return the jsonb blob so adding fields to bsv-wallet’s record hashes does not require a schema change. -
Outputs upsert on
outpoint(unique); certificates upsert on the composite unique (type, serial_number, certifier). Proofs and transactions upsert on theirtxidprimary key. Actions are append-only — the interface has no natural key for actions. -
Pagination is ordered by insertion (+id ASC+) to match MemoryStore.
-
This class is thread-safe because Sequel is — the adapter itself holds no mutable state beyond the injected database handle.
Constant Summary collapse
- MIGRATIONS_DIR =
File.('migrations', __dir__)
Instance Attribute Summary collapse
-
#db ⇒ Sequel::Database
readonly
The underlying database handle.
Class Method Summary collapse
-
.migrate!(db) ⇒ void
Run the shipped wallet schema migrations against
db.
Instance Method Summary collapse
- #count_actions(query) ⇒ Object
- #count_certificates(query) ⇒ Object
- #count_outputs(query) ⇒ Object
- #delete_action(txid) ⇒ Object
- #delete_certificate(type:, serial_number:, certifier:) ⇒ Object
- #delete_output(outpoint) ⇒ Object
- #find_actions(query) ⇒ Object
- #find_certificates(query) ⇒ Object
- #find_outputs(query) ⇒ Object
- #find_proof(txid) ⇒ Object
- #find_setting(key) ⇒ Object
-
#find_spendable_outputs(basket: nil, min_satoshis: nil, sort_order: :desc) ⇒ Array<Hash>
Returns outputs whose effective state is
:spendable. - #find_transaction(txid) ⇒ Object
-
#initialize(db) ⇒ PostgresStore
constructor
A new instance of PostgresStore.
-
#lock_utxos(outpoints, reference:, no_send: false) ⇒ Array<String>
Atomically marks a set of outpoints as
:pending. -
#release_stale_pending!(timeout: 300) ⇒ Integer
Releases stale pending locks back to
:spendable. -
#store_action(action_data) ⇒ Object
— Actions —.
-
#store_certificate(cert_data) ⇒ Object
— Certificates —.
-
#store_output(output_data) ⇒ Object
— Outputs —.
-
#store_proof(txid, bump_hex) ⇒ Object
— Proofs —.
-
#store_setting(key, value) ⇒ Object
— Settings —.
-
#store_transaction(txid, tx_hex) ⇒ Object
— Transactions —.
- #update_action_status(txid, new_status) ⇒ Object
-
#update_output_state(outpoint, new_state, pending_reference: nil, no_send: nil) ⇒ Hash
Transitions the state of an existing output.
Constructor Details
#initialize(db) ⇒ PostgresStore
Returns a new instance of PostgresStore.
79 80 81 82 83 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 79 def initialize(db) @db = db @db.extension :pg_array @db.extension :pg_json end |
Instance Attribute Details
#db ⇒ Sequel::Database (readonly)
Returns the underlying database handle.
86 87 88 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 86 def db @db end |
Class Method Details
.migrate!(db) ⇒ void
This method returns an undefined value.
Run the shipped wallet schema migrations against db.
Uses Sequel’s migrator so every schema change ships as a numbered migration file and the database tracks which ones have been applied. Safe to call repeatedly.
Consumers who prefer their own migration framework can copy the migration file(s) out of lib/bsv/wallet_postgres/migrations/ instead of calling this helper.
63 64 65 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 63 def self.migrate!(db) Sequel::Migrator.run(db, MIGRATIONS_DIR) end |
Instance Method Details
#count_actions(query) ⇒ Object
101 102 103 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 101 def count_actions(query) filter_actions(@db[:wallet_actions], query).count end |
#count_certificates(query) ⇒ Object
325 326 327 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 325 def count_certificates(query) filter_certificates(@db[:wallet_certificates], query).count end |
#count_outputs(query) ⇒ Object
151 152 153 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 151 def count_outputs(query) filter_outputs(@db[:wallet_outputs], query).count end |
#delete_action(txid) ⇒ Object
122 123 124 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 122 def delete_action(txid) @db[:wallet_actions].where(txid: txid).delete.positive? end |
#delete_certificate(type:, serial_number:, certifier:) ⇒ Object
329 330 331 332 333 334 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 329 def delete_certificate(type:, serial_number:, certifier:) @db[:wallet_certificates] .where(type: type, serial_number: serial_number, certifier: certifier) .delete .positive? end |
#delete_output(outpoint) ⇒ Object
155 156 157 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 155 def delete_output(outpoint) @db[:wallet_outputs].where(outpoint: outpoint).delete.positive? end |
#find_actions(query) ⇒ Object
96 97 98 99 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 96 def find_actions(query) ds = filter_actions(@db[:wallet_actions], query) paginate(ds, query).map { |r| symbolise_keys(r[:data]) } end |
#find_certificates(query) ⇒ Object
320 321 322 323 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 320 def find_certificates(query) ds = filter_certificates(@db[:wallet_certificates], query) paginate(ds, query).map { |r| symbolise_keys(r[:data]) } end |
#find_outputs(query) ⇒ Object
146 147 148 149 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 146 def find_outputs(query) ds = filter_outputs(@db[:wallet_outputs], query) paginate(ds, query).map { |r| symbolise_keys(r[:data]) } end |
#find_proof(txid) ⇒ Object
344 345 346 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 344 def find_proof(txid) @db[:wallet_proofs].where(txid: txid).get(:bump_hex) end |
#find_setting(key) ⇒ Object
368 369 370 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 368 def find_setting(key) @db[:wallet_settings].where(key: key).get(:value) end |
#find_spendable_outputs(basket: nil, min_satoshis: nil, sort_order: :desc) ⇒ Array<Hash>
Returns outputs whose effective state is :spendable.
Legacy rows with state = NULL are treated as spendable when the spendable boolean is true (or absent), matching MemoryStore’s effective_state logic.
169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 169 def find_spendable_outputs(basket: nil, min_satoshis: nil, sort_order: :desc) ds = @db[:wallet_outputs] .where(Sequel.lit('(state = ? OR (state IS NULL AND spendable = TRUE))', 'spendable')) ds = ds.where(basket: basket) if basket if min_satoshis ds = ds.where( Sequel.lit('COALESCE(satoshis, (data->>?)::bigint, 0) >= ?', 'satoshis', min_satoshis) ) end satoshis_expr = Sequel.lit('COALESCE(satoshis, (data->>?)::bigint, 0)', 'satoshis') ds = ds.order(sort_order == :asc ? Sequel.asc(satoshis_expr) : Sequel.desc(satoshis_expr)) ds.all.map { |r| symbolise_keys(r[:data]) } end |
#find_transaction(txid) ⇒ Object
356 357 358 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 356 def find_transaction(txid) @db[:wallet_transactions].where(txid: txid).get(:tx_hex) end |
#lock_utxos(outpoints, reference:, no_send: false) ⇒ Array<String>
Atomically marks a set of outpoints as :pending.
Uses UPDATE … WHERE state = ‘spendable’ … RETURNING outpoint so that the check-and-set is atomic at the database level. A concurrent caller that wins the race will have already changed the state to ‘pending’, so the second caller’s WHERE clause will not match and will return nothing. No explicit row-level locking is needed — the UPDATE itself takes the lock.
Legacy rows with state = NULL AND spendable = TRUE are also eligible.
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 253 def lock_utxos(outpoints, reference:, no_send: false) return [] if outpoints.empty? rows = @db[:wallet_outputs] .where(outpoint: outpoints) .where(Sequel.lit('(state = ? OR (state IS NULL AND spendable = TRUE))', 'spendable')) .returning(:outpoint) .update( state: 'pending', spendable: false, pending_since: Sequel.lit('NOW()'), pending_reference: reference, no_send: no_send ? true : false, data: Sequel.lit( "data || jsonb_build_object('state', 'pending', 'pending_since', NOW()::text, " \ "'pending_reference', ?, 'no_send', ?)", reference, no_send ? true : false ) ) rows.map { |r| r[:outpoint] } end |
#release_stale_pending!(timeout: 300) ⇒ Integer
Releases stale pending locks back to :spendable.
Any output in :pending state whose pending_since is older than timeout seconds is reset to spendable and its pending metadata is cleared. Outputs with no_send = true are exempt and remain pending. Outputs with pending_since = NULL are also skipped — they are treated as freshly locked (NULL means “just acquired but no timestamp yet”).
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 286 def release_stale_pending!(timeout: 300) rows = @db[:wallet_outputs] .where(state: 'pending') .where(Sequel.lit('no_send IS NOT TRUE')) .where(Sequel.lit('pending_since IS NOT NULL')) .where(Sequel.lit('pending_since < (NOW() - INTERVAL ?)', "#{timeout} seconds")) .returning(:outpoint) .update( state: 'spendable', spendable: true, pending_since: nil, pending_reference: nil, no_send: false, data: Sequel.lit( "(data - 'pending_since' - 'pending_reference' - 'no_send') || jsonb_build_object('state', 'spendable')" ) ) rows.length end |
#store_action(action_data) ⇒ Object
— Actions —
90 91 92 93 94 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 90 def store_action(action_data) row = action_row(action_data) @db[:wallet_actions].insert(row) action_data end |
#store_certificate(cert_data) ⇒ Object
— Certificates —
309 310 311 312 313 314 315 316 317 318 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 309 def store_certificate(cert_data) row = certificate_row(cert_data) @db[:wallet_certificates] .insert_conflict( target: %i[type serial_number certifier], update: { subject: row[:subject], data: row[:data] } ) .insert(row) cert_data end |
#store_output(output_data) ⇒ Object
— Outputs —
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 128 def store_output(output_data) row = output_row(output_data) @db[:wallet_outputs] .insert_conflict( target: :outpoint, update: { basket: row[:basket], tags: row[:tags], spendable: row[:spendable], state: row[:state], satoshis: row[:satoshis], data: row[:data] } ) .insert(row) output_data end |
#store_proof(txid, bump_hex) ⇒ Object
— Proofs —
338 339 340 341 342 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 338 def store_proof(txid, bump_hex) @db[:wallet_proofs] .insert_conflict(target: :txid, update: { bump_hex: bump_hex }) .insert(txid: txid, bump_hex: bump_hex) end |
#store_setting(key, value) ⇒ Object
— Settings —
362 363 364 365 366 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 362 def store_setting(key, value) @db[:wallet_settings] .insert_conflict(target: :key, update: { value: value }) .insert(key: key, value: value) end |
#store_transaction(txid, tx_hex) ⇒ Object
— Transactions —
350 351 352 353 354 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 350 def store_transaction(txid, tx_hex) @db[:wallet_transactions] .insert_conflict(target: :txid, update: { tx_hex: tx_hex }) .insert(txid: txid, tx_hex: tx_hex) end |
#update_action_status(txid, new_status) ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 105 def update_action_status(txid, new_status) # Fetch by txid first, then update by primary key so only exactly one # row is targeted. The unique index on txid makes this unambiguous, but # scoping to the id column makes the intent explicit and is safe even # on databases where the migration has not yet been applied. row = @db[:wallet_actions].where(txid: txid).first raise WalletError, "Action not found: #{txid}" unless row @db[:wallet_actions].where(id: row[:id]).update( data: Sequel.lit( "data || jsonb_build_object('status', ?)", new_status ) ) symbolise_keys(@db[:wallet_actions].where(id: row[:id]).first[:data]) end |
#update_output_state(outpoint, new_state, pending_reference: nil, no_send: nil) ⇒ Hash
Transitions the state of an existing output.
When new_state is :pending, sets pending_since, pending_reference, and no_send, and merges those values into the JSONB data blob.
When transitioning away from :pending, clears the pending metadata columns and removes the corresponding keys from the JSONB blob.
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 197 def update_output_state(outpoint, new_state, pending_reference: nil, no_send: nil) state_str = new_state.to_s # Keep legacy spendable boolean in sync so filter_outputs and other # queries that haven't migrated to the state column still work. spendable_bool = new_state == :spendable if new_state == :pending updates = { state: state_str, spendable: spendable_bool, pending_since: Sequel.lit('NOW()'), pending_reference: pending_reference, no_send: no_send ? true : false, data: Sequel.lit( "data || jsonb_build_object('state', ?, 'pending_since', NOW()::text, 'pending_reference', ?, 'no_send', ?)", state_str, pending_reference, no_send ? true : false ) } else updates = { state: state_str, spendable: spendable_bool, pending_since: nil, pending_reference: nil, no_send: false } # Remove pending keys from JSONB blob, update state updates[:data] = Sequel.lit( "(data - 'pending_since' - 'pending_reference' - 'no_send') || jsonb_build_object('state', ?)", state_str ) end ds = @db[:wallet_outputs].where(outpoint: outpoint) rows_updated = ds.update(updates) raise WalletError, "Output not found: #{outpoint}" if rows_updated.zero? row = ds.first symbolise_keys(row[:data]) end |