Class: BSV::Wallet::SolidQueueAdapter

Inherits:
Object
  • Object
show all
Includes:
BroadcastQueue
Defined in:
lib/bsv/wallet_postgres/solid_queue_adapter.rb

Overview

PostgreSQL-backed asynchronous broadcast queue adapter.

Persists outbound transactions to the wallet_broadcast_jobs table and processes them in a background worker thread. Designed for multi-process deployments where InlineQueue‘s synchronous broadcast is unacceptable.

The worker uses SELECT … FOR UPDATE SKIP LOCKED so multiple processes can run SolidQueueAdapter against the same database safely — only one worker will claim a given job.

Lifecycle

adapter = BSV::Wallet::SolidQueueAdapter.new(
  db: sequel_db,
  storage: postgres_store,
  broadcaster: arc_broadcaster
)
adapter.start   # spawns background worker thread
# ... process transactions ...
adapter.drain   # stop + join (blocks until current poll cycle completes)

Recovery

On restart, the worker’s first poll naturally finds stale sending jobs (those whose locked_at is older than stale_threshold seconds) and re-broadcasts them. No special recovery code is needed.

Drain warning

drain blocks until the current poll cycle completes. If a job is mid-broadcast this may take several seconds. Jobs enqueued after @running = false but before the worker thread exits will remain unsent until the next start.

Constant Summary collapse

DEFAULT_POLL_INTERVAL =

Default number of seconds between poll cycles.

8
STALE_THRESHOLD =

Default number of seconds before a sending job is considered stale and eligible for retry. Configurable for testability.

300
MAX_ATTEMPTS =

Maximum number of broadcast attempts before a job is abandoned. After this many failures the job remains failed permanently.

5

Instance Method Summary collapse

Constructor Details

#initialize(db:, storage:, broadcaster:, poll_interval: DEFAULT_POLL_INTERVAL, stale_threshold: STALE_THRESHOLD) ⇒ SolidQueueAdapter

Returns a new instance of SolidQueueAdapter.

Parameters:

  • db (Sequel::Database)

    a Sequel database handle (shared with PostgresStore)

  • storage (StorageAdapter)

    wallet storage adapter (must not be MemoryStore)

  • broadcaster (#broadcast)

    broadcaster object

  • poll_interval (Integer) (defaults to: DEFAULT_POLL_INTERVAL)

    seconds between worker poll cycles

  • stale_threshold (Integer) (defaults to: STALE_THRESHOLD)

    seconds before a sending job is retried

Raises:

  • (ArgumentError)

    if storage is a BSV::Wallet::MemoryStore

  • (ArgumentError)

    if broadcaster is nil



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/bsv/wallet_postgres/solid_queue_adapter.rb', line 61

def initialize(db:, storage:, broadcaster:, poll_interval: DEFAULT_POLL_INTERVAL, stale_threshold: STALE_THRESHOLD)
  if storage.is_a?(BSV::Wallet::MemoryStore)
    raise ArgumentError, 'SolidQueueAdapter requires a persistent storage adapter — MemoryStore is not supported'
  end
  raise ArgumentError, 'SolidQueueAdapter requires a broadcaster' if broadcaster.nil?

  @db             = db
  @storage        = storage
  @broadcaster    = broadcaster
  @poll_interval  = poll_interval
  @stale_threshold = stale_threshold

  @mutex         = Mutex.new
  @running       = false
  @worker_thread = nil
end

Instance Method Details

#async?Boolean

Returns true — this adapter executes broadcast asynchronously.

Returns:

  • (Boolean)


81
82
83
# File 'lib/bsv/wallet_postgres/solid_queue_adapter.rb', line 81

def async?
  true
end

#broadcast_enabled?Boolean

Returns trueSolidQueueAdapter requires a broadcaster at construction time (ArgumentError is raised if nil is passed), so broadcast is always available.

Returns:

  • (Boolean)


90
91
92
# File 'lib/bsv/wallet_postgres/solid_queue_adapter.rb', line 90

def broadcast_enabled?
  !@broadcaster.nil?
end

#drainvoid

This method returns an undefined value.

Stops the worker and blocks until the current poll cycle completes.

Safe to call when start has not been called (+@worker_thread+ is nil).



170
171
172
173
# File 'lib/bsv/wallet_postgres/solid_queue_adapter.rb', line 170

def drain
  stop
  @worker_thread&.join
end

#enqueue(payload) ⇒ Hash

Persists a transaction to the broadcast job queue and returns immediately.

Inserts a row into wallet_broadcast_jobs with status unsent. If a row already exists for the same txid (e.g. after a crash and restart), the UniqueConstraintViolation is rescued and the existing status is returned instead.

Parameters:

  • payload (Hash)

    broadcast payload (see BroadcastQueue module docs)

Returns:

  • (Hash)

    { txid: String, broadcast_status: ‘sending’ }



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/bsv/wallet_postgres/solid_queue_adapter.rb', line 103

def enqueue(payload)
  txid             = payload[:txid]
  beef_binary      = payload[:beef_binary]
  input_outpoints  = payload[:input_outpoints]
  change_outpoints = payload[:change_outpoints]
  fund_ref         = payload[:fund_ref]

  row = {
    txid: txid,
    beef_hex: beef_binary.unpack1('H*'),
    input_outpoints: input_outpoints ? Sequel.pg_array(input_outpoints, :text) : nil,
    change_outpoints: change_outpoints ? Sequel.pg_array(change_outpoints, :text) : nil,
    fund_ref: fund_ref,
    status: 'unsent'
  }

  begin
    @db[:wallet_broadcast_jobs].insert(row)
  rescue Sequel::UniqueConstraintViolation
    existing_status = @db[:wallet_broadcast_jobs].where(txid: txid).get(:status)
    return { txid: txid, broadcast_status: existing_status || 'sending' }
  end

  { txid: txid, broadcast_status: 'sending' }
end

#startvoid

This method returns an undefined value.

Spawns the background worker thread.

Safe to call multiple times — returns immediately if already running. The check-and-set is atomic under the mutex to prevent two concurrent start calls from spawning duplicate worker threads.



147
148
149
150
151
152
153
154
# File 'lib/bsv/wallet_postgres/solid_queue_adapter.rb', line 147

def start
  @mutex.synchronize do
    return if @running || @worker_thread&.alive?

    @running = true
  end
  @worker_thread = Thread.new { worker_loop }
end

#status(txid) ⇒ String?

Returns the broadcast status for a previously enqueued transaction.

Reads directly from the jobs table — the authoritative source of truth for async broadcast status.

Parameters:

  • txid (String)

    hex transaction identifier

Returns:

  • (String, nil)

    status string or nil if no job exists



136
137
138
# File 'lib/bsv/wallet_postgres/solid_queue_adapter.rb', line 136

def status(txid)
  @db[:wallet_broadcast_jobs].where(txid: txid).get(:status)
end

#stopvoid

This method returns an undefined value.

Signals the worker to stop after the current poll cycle.

Non-blocking — returns immediately without waiting for the thread.



161
162
163
# File 'lib/bsv/wallet_postgres/solid_queue_adapter.rb', line 161

def stop
  @mutex.synchronize { @running = false }
end