Class: BSV::Wallet::PostgresStore

Inherits:
Object
  • Object
show all
Includes:
Interface::Store
Defined in:
lib/bsv/wallet_postgres/postgres_store.rb

Overview

PostgreSQL-backed storage adapter for BSV::Wallet.

Implements the full Store interface against a Sequel Database object. Survives process restarts, scales to multiple instances, and is thread-safe via Sequel’s connection pool.

Design notes

  • JSONB is the source of truth. Every row stores the full record hash in a data jsonb column; dedicated indexed columns (basket, tags, labels, certifier, …) exist only to make queries fast. Reads return the jsonb blob so adding fields to bsv-wallet’s record hashes does not require a schema change.

  • Outputs upsert on outpoint (unique); certificates upsert on the composite unique (type, serial_number, certifier). Proofs and transactions upsert on their txid primary key. Actions are append-only — the interface has no natural key for actions.

  • Pagination is ordered by insertion (+id ASC+) to match MemoryStore.

  • This class is thread-safe because Sequel is — the adapter itself holds no mutable state beyond the injected database handle.

Examples:

Quickstart

require 'bsv-wallet-postgres'

db = Sequel.connect(ENV['DATABASE_URL'])
BSV::Wallet::PostgresStore.migrate!(db)

store  = BSV::Wallet::PostgresStore.new(db)
wallet = BSV::Wallet::Client.new(key, storage: store)

Bringing your own migration runner

# Copy lib/bsv/wallet_postgres/migrations/001_create_wallet_tables.rb
# into your own db/migrate directory, then run your framework's
# migrator as normal. `migrate!` is a convenience — not a requirement.

Constant Summary collapse

MIGRATIONS_DIR =
File.expand_path('migrations', __dir__)

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(db) ⇒ PostgresStore

Returns a new instance of PostgresStore.

Parameters:

  • db (Sequel::Database)

    a Sequel database handle. The caller owns connection lifecycle, pool sizing, and migrations.



79
80
81
82
83
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 79

def initialize(db)
  @db = db
  @db.extension :pg_array
  @db.extension :pg_json
end

Instance Attribute Details

#dbSequel::Database (readonly)

Returns the underlying database handle.

Returns:

  • (Sequel::Database)

    the underlying database handle



86
87
88
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 86

def db
  @db
end

Class Method Details

.migrate!(db) ⇒ void

This method returns an undefined value.

Run the shipped wallet schema migrations against db.

Uses Sequel’s migrator so every schema change ships as a numbered migration file and the database tracks which ones have been applied. Safe to call repeatedly.

Consumers who prefer their own migration framework can copy the migration file(s) out of lib/bsv/wallet_postgres/migrations/ instead of calling this helper.

Parameters:

  • db (Sequel::Database)


63
64
65
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 63

def self.migrate!(db)
  Sequel::Migrator.run(db, MIGRATIONS_DIR)
end

Instance Method Details

#count_actions(query) ⇒ Object



101
102
103
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 101

def count_actions(query)
  filter_actions(@db[:wallet_actions], query).count
end

#count_certificates(query) ⇒ Object



345
346
347
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 345

def count_certificates(query)
  filter_certificates(@db[:wallet_certificates], query).count
end

#count_outputs(query) ⇒ Object



151
152
153
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 151

def count_outputs(query)
  filter_outputs(@db[:wallet_outputs], query).count
end

#delete_action(txid) ⇒ Object



122
123
124
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 122

def delete_action(txid)
  @db[:wallet_actions].where(txid: txid).delete.positive?
end

#delete_certificate(type:, serial_number:, certifier:) ⇒ Object



349
350
351
352
353
354
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 349

def delete_certificate(type:, serial_number:, certifier:)
  @db[:wallet_certificates]
    .where(type: type, serial_number: serial_number, certifier: certifier)
    .delete
    .positive?
end

#delete_output(outpoint) ⇒ Object



155
156
157
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 155

def delete_output(outpoint)
  @db[:wallet_outputs].where(outpoint: outpoint).delete.positive?
end

#find_actions(query) ⇒ Object



96
97
98
99
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 96

def find_actions(query)
  ds = filter_actions(@db[:wallet_actions], query)
  paginate(ds, query).map { |r| symbolise_keys(r[:data]) }
end

#find_certificates(query) ⇒ Object



340
341
342
343
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 340

def find_certificates(query)
  ds = filter_certificates(@db[:wallet_certificates], query)
  paginate(ds, query).map { |r| symbolise_keys(r[:data]) }
end

#find_outputs(query) ⇒ Object



146
147
148
149
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 146

def find_outputs(query)
  ds = filter_outputs(@db[:wallet_outputs], query)
  paginate(ds, query).map { |r| symbolise_keys(r[:data]) }
end

#find_proof(txid) ⇒ Object



364
365
366
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 364

def find_proof(txid)
  @db[:wallet_proofs].where(txid: txid).get(:bump_hex)
end

#find_setting(key) ⇒ Object



388
389
390
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 388

def find_setting(key)
  @db[:wallet_settings].where(key: key).get(:value)
end

#find_spendable_outputs(basket: nil, min_satoshis: nil, sort_order: :desc) ⇒ Array<Hash>

Returns outputs whose effective state is :spendable.

Legacy rows with state = NULL are treated as spendable when the spendable boolean is true (or absent), matching MemoryStore’s effective_state logic.

Parameters:

  • basket (String, nil) (defaults to: nil)

    restrict to this basket when provided

  • min_satoshis (Integer, nil) (defaults to: nil)

    exclude outputs below this value

  • sort_order (Symbol) (defaults to: :desc)

    :asc or :desc (default :desc, largest first)

Returns:

  • (Array<Hash>)


169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 169

def find_spendable_outputs(basket: nil, min_satoshis: nil, sort_order: :desc)
  ds = @db[:wallet_outputs]
       .where(Sequel.lit('(state = ? OR (state IS NULL AND spendable = TRUE))', 'spendable'))
  ds = ds.where(basket: basket) if basket
  if min_satoshis
    ds = ds.where(
      Sequel.lit('COALESCE(satoshis, (data->>?)::bigint, 0) >= ?', 'satoshis', min_satoshis)
    )
  end
  satoshis_expr = Sequel.lit('COALESCE(satoshis, (data->>?)::bigint, 0)', 'satoshis')
  ds = ds.order(sort_order == :asc ? Sequel.asc(satoshis_expr) : Sequel.desc(satoshis_expr))
  ds.all.map { |r| symbolise_keys(r[:data]) }
end

#find_transaction(txid) ⇒ Object



376
377
378
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 376

def find_transaction(txid)
  @db[:wallet_transactions].where(txid: txid).get(:tx_hex)
end

#lock_utxos(outpoints, reference:, no_send: false) ⇒ Array<String>

Atomically marks a set of outpoints as :pending.

Uses UPDATE … WHERE state = ‘spendable’ … RETURNING outpoint so that the check-and-set is atomic at the database level. A concurrent caller that wins the race will have already changed the state to ‘pending’, so the second caller’s WHERE clause will not match and will return nothing. No explicit row-level locking is needed — the UPDATE itself takes the lock.

Legacy rows with state = NULL AND spendable = TRUE are also eligible.

Parameters:

  • outpoints (Array<String>)

    outpoint identifiers to lock

  • reference (String)

    caller-supplied pending reference

  • no_send (Boolean) (defaults to: false)

    true if this is a no_send lock

Returns:

  • (Array<String>)

    outpoints that were actually locked



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 273

def lock_utxos(outpoints, reference:, no_send: false)
  return [] if outpoints.empty?

  rows = @db[:wallet_outputs]
         .where(outpoint: outpoints)
         .where(Sequel.lit('(state = ? OR (state IS NULL AND spendable = TRUE))', 'spendable'))
         .returning(:outpoint)
         .update(
           state: 'pending',
           spendable: false,
           pending_since: Sequel.lit('NOW()'),
           pending_reference: reference,
           no_send: no_send ? true : false,
           data: Sequel.lit(
             "data || jsonb_build_object('state', 'pending', 'pending_since', NOW()::text, " \
             "'pending_reference', ?, 'no_send', ?)",
             reference, no_send ? true : false
           )
         )

  rows.map { |r| r[:outpoint] }
end

#release_stale_pending!(timeout: 300) ⇒ Integer

Releases stale pending locks back to :spendable.

Any output in :pending state whose pending_since is older than timeout seconds is reset to spendable and its pending metadata is cleared. Outputs with no_send = true are exempt and remain pending. Outputs with pending_since = NULL are also skipped — they are treated as freshly locked (NULL means “just acquired but no timestamp yet”).

Parameters:

  • timeout (Integer) (defaults to: 300)

    age in seconds before a lock is considered stale (default 300)

Returns:

  • (Integer)

    number of outputs released



306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 306

def release_stale_pending!(timeout: 300)
  rows = @db[:wallet_outputs]
         .where(state: 'pending')
         .where(Sequel.lit('no_send IS NOT TRUE'))
         .where(Sequel.lit('pending_since IS NOT NULL'))
         .where(Sequel.lit('pending_since < (NOW() - INTERVAL ?)', "#{timeout} seconds"))
         .returning(:outpoint)
         .update(
           state: 'spendable',
           spendable: true,
           pending_since: nil,
           pending_reference: nil,
           no_send: false,
           data: Sequel.lit(
             "(data - 'pending_since' - 'pending_reference' - 'no_send') || jsonb_build_object('state', 'spendable')"
           )
         )

  rows.length
end

#store_action(action_data) ⇒ Object

— Actions —



90
91
92
93
94
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 90

def store_action(action_data)
  row = action_row(action_data)
  @db[:wallet_actions].insert(row)
  action_data
end

#store_certificate(cert_data) ⇒ Object

— Certificates —



329
330
331
332
333
334
335
336
337
338
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 329

def store_certificate(cert_data)
  row = certificate_row(cert_data)
  @db[:wallet_certificates]
    .insert_conflict(
      target: %i[type serial_number certifier],
      update: { subject: row[:subject], data: row[:data] }
    )
    .insert(row)
  cert_data
end

#store_output(output_data) ⇒ Object

— Outputs —



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 128

def store_output(output_data)
  row = output_row(output_data)
  @db[:wallet_outputs]
    .insert_conflict(
      target: :outpoint,
      update: {
        basket: row[:basket],
        tags: row[:tags],
        spendable: row[:spendable],
        state: row[:state],
        satoshis: row[:satoshis],
        data: row[:data]
      }
    )
    .insert(row)
  output_data
end

#store_proof(txid, bump_hex) ⇒ Object

— Proofs —



358
359
360
361
362
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 358

def store_proof(txid, bump_hex)
  @db[:wallet_proofs]
    .insert_conflict(target: :txid, update: { bump_hex: bump_hex })
    .insert(txid: txid, bump_hex: bump_hex)
end

#store_setting(key, value) ⇒ Object

— Settings —



382
383
384
385
386
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 382

def store_setting(key, value)
  @db[:wallet_settings]
    .insert_conflict(target: :key, update: { value: value })
    .insert(key: key, value: value)
end

#store_transaction(txid, tx_hex) ⇒ Object

— Transactions —



370
371
372
373
374
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 370

def store_transaction(txid, tx_hex)
  @db[:wallet_transactions]
    .insert_conflict(target: :txid, update: { tx_hex: tx_hex })
    .insert(txid: txid, tx_hex: tx_hex)
end

#update_action_status(txid, new_status) ⇒ Object

Raises:

  • (WalletError)


105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 105

def update_action_status(txid, new_status)
  # Fetch by txid first, then update by primary key so only exactly one
  # row is targeted. The unique index on txid makes this unambiguous, but
  # scoping to the id column makes the intent explicit and is safe even
  # on databases where the migration has not yet been applied.
  row = @db[:wallet_actions].where(txid: txid).first
  raise WalletError, "Action not found: #{txid}" unless row

  @db[:wallet_actions].where(id: row[:id]).update(
    data: Sequel.lit(
      "data || jsonb_build_object('status', ?)",
      new_status
    )
  )
  symbolise_keys(@db[:wallet_actions].where(id: row[:id]).first[:data])
end

#update_output_basket(outpoint, new_basket) ⇒ Hash

Moves an output from one basket to another (metadata-only).

Updates both the basket column and the JSONB data blob. Does not affect the output’s state or pending metadata.

Parameters:

  • outpoint (String)

    the outpoint identifier

  • new_basket (String)

    the destination basket name

Returns:

  • (Hash)

    the updated output hash

Raises:

  • (BSV::Wallet::WalletError)

    if the outpoint is not found



248
249
250
251
252
253
254
255
256
257
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 248

def update_output_basket(outpoint, new_basket)
  ds = @db[:wallet_outputs].where(outpoint: outpoint)
  rows_updated = ds.update(
    basket: new_basket,
    data: Sequel.lit("data || jsonb_build_object('basket', ?)", new_basket)
  )
  raise WalletError, "Output not found: #{outpoint}" if rows_updated.zero?

  symbolise_keys(ds.first[:data])
end

#update_output_state(outpoint, new_state, pending_reference: nil, no_send: nil) ⇒ Hash

Transitions the state of an existing output.

When new_state is :pending, sets pending_since, pending_reference, and no_send, and merges those values into the JSONB data blob.

When transitioning away from :pending, clears the pending metadata columns and removes the corresponding keys from the JSONB blob.

Parameters:

  • outpoint (String)

    the outpoint identifier

  • new_state (Symbol)

    :spendable, :pending, or :spent

  • pending_reference (String, nil) (defaults to: nil)

    caller-supplied label for a pending lock

  • no_send (Boolean, nil) (defaults to: nil)

    true if the lock belongs to a no_send transaction

Returns:

  • (Hash)

    the updated output hash

Raises:

  • (BSV::Wallet::WalletError)

    if the outpoint is not found



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/bsv/wallet_postgres/postgres_store.rb', line 197

def update_output_state(outpoint, new_state, pending_reference: nil, no_send: nil)
  state_str = new_state.to_s

  # Keep legacy spendable boolean in sync so filter_outputs and other
  # queries that haven't migrated to the state column still work.
  spendable_bool = new_state == :spendable

  if new_state == :pending
    updates = {
      state: state_str,
      spendable: spendable_bool,
      pending_since: Sequel.lit('NOW()'),
      pending_reference: pending_reference,
      no_send: no_send ? true : false,
      data: Sequel.lit(
        "data || jsonb_build_object('state', ?, 'pending_since', NOW()::text, 'pending_reference', ?, 'no_send', ?)",
        state_str, pending_reference, no_send ? true : false
      )
    }
  else
    updates = {
      state: state_str,
      spendable: spendable_bool,
      pending_since: nil,
      pending_reference: nil,
      no_send: false
    }
    # Remove pending keys from JSONB blob, update state
    updates[:data] = Sequel.lit(
      "(data - 'pending_since' - 'pending_reference' - 'no_send') || jsonb_build_object('state', ?)",
      state_str
    )
  end

  ds = @db[:wallet_outputs].where(outpoint: outpoint)
  rows_updated = ds.update(updates)
  raise WalletError, "Output not found: #{outpoint}" if rows_updated.zero?

  row = ds.first
  symbolise_keys(row[:data])
end