Class: BSV::Wallet::SolidQueueAdapter
- Inherits:
-
Object
- Object
- BSV::Wallet::SolidQueueAdapter
- Includes:
- BroadcastQueue
- Defined in:
- lib/bsv/wallet_postgres/solid_queue_adapter.rb
Overview
PostgreSQL-backed asynchronous broadcast queue adapter.
Persists outbound transactions to the wallet_broadcast_jobs table and processes them in a background worker thread. Designed for multi-process deployments where InlineQueue‘s synchronous broadcast is unacceptable.
The worker uses SELECT … FOR UPDATE SKIP LOCKED so multiple processes can run SolidQueueAdapter against the same database safely — only one worker will claim a given job.
Lifecycle
adapter = BSV::Wallet::SolidQueueAdapter.new(
db: sequel_db,
storage: postgres_store,
broadcaster: arc_broadcaster
)
adapter.start # spawns background worker thread
# ... process transactions ...
adapter.drain # stop + join (blocks until current poll cycle completes)
Recovery
On restart, the worker’s first poll naturally finds stale sending jobs (those whose locked_at is older than stale_threshold seconds) and re-broadcasts them. No special recovery code is needed.
Drain warning
drain blocks until the current poll cycle completes. If a job is mid-broadcast this may take several seconds. Jobs enqueued after @running = false but before the worker thread exits will remain unsent until the next start.
Constant Summary collapse
- DEFAULT_POLL_INTERVAL =
Default number of seconds between poll cycles.
8- STALE_THRESHOLD =
Default number of seconds before a
sendingjob is considered stale and eligible for retry. Configurable for testability. 300- MAX_ATTEMPTS =
Maximum number of broadcast attempts before a job is abandoned. After this many failures the job remains
failedpermanently. 5
Instance Method Summary collapse
-
#async? ⇒ Boolean
Returns
true— this adapter executes broadcast asynchronously. -
#broadcast_enabled? ⇒ Boolean
Returns
true—SolidQueueAdapterrequires a broadcaster at construction time (ArgumentErroris raised ifnilis passed), so broadcast is always available. -
#drain ⇒ void
Stops the worker and blocks until the current poll cycle completes.
-
#enqueue(payload) ⇒ Hash
Persists a transaction to the broadcast job queue and returns immediately.
-
#initialize(db:, storage:, broadcaster:, poll_interval: DEFAULT_POLL_INTERVAL, stale_threshold: STALE_THRESHOLD) ⇒ SolidQueueAdapter
constructor
A new instance of SolidQueueAdapter.
-
#start ⇒ void
Spawns the background worker thread.
-
#status(txid) ⇒ String?
Returns the broadcast status for a previously enqueued transaction.
-
#stop ⇒ void
Signals the worker to stop after the current poll cycle.
Constructor Details
#initialize(db:, storage:, broadcaster:, poll_interval: DEFAULT_POLL_INTERVAL, stale_threshold: STALE_THRESHOLD) ⇒ SolidQueueAdapter
Returns a new instance of SolidQueueAdapter.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/bsv/wallet_postgres/solid_queue_adapter.rb', line 61 def initialize(db:, storage:, broadcaster:, poll_interval: DEFAULT_POLL_INTERVAL, stale_threshold: STALE_THRESHOLD) if storage.is_a?(BSV::Wallet::MemoryStore) raise ArgumentError, 'SolidQueueAdapter requires a persistent storage adapter — MemoryStore is not supported' end raise ArgumentError, 'SolidQueueAdapter requires a broadcaster' if broadcaster.nil? @db = db @storage = storage @broadcaster = broadcaster @poll_interval = poll_interval @stale_threshold = stale_threshold @mutex = Mutex.new @running = false @worker_thread = nil end |
Instance Method Details
#async? ⇒ Boolean
Returns true — this adapter executes broadcast asynchronously.
81 82 83 |
# File 'lib/bsv/wallet_postgres/solid_queue_adapter.rb', line 81 def async? true end |
#broadcast_enabled? ⇒ Boolean
Returns true — SolidQueueAdapter requires a broadcaster at construction time (ArgumentError is raised if nil is passed), so broadcast is always available.
90 91 92 |
# File 'lib/bsv/wallet_postgres/solid_queue_adapter.rb', line 90 def broadcast_enabled? !@broadcaster.nil? end |
#drain ⇒ void
This method returns an undefined value.
Stops the worker and blocks until the current poll cycle completes.
Safe to call when start has not been called (+@worker_thread+ is nil).
170 171 172 173 |
# File 'lib/bsv/wallet_postgres/solid_queue_adapter.rb', line 170 def drain stop @worker_thread&.join end |
#enqueue(payload) ⇒ Hash
Persists a transaction to the broadcast job queue and returns immediately.
Inserts a row into wallet_broadcast_jobs with status unsent. If a row already exists for the same txid (e.g. after a crash and restart), the UniqueConstraintViolation is rescued and the existing status is returned instead.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/bsv/wallet_postgres/solid_queue_adapter.rb', line 103 def enqueue(payload) txid = payload[:txid] beef_binary = payload[:beef_binary] input_outpoints = payload[:input_outpoints] change_outpoints = payload[:change_outpoints] fund_ref = payload[:fund_ref] row = { txid: txid, beef_hex: beef_binary.unpack1('H*'), input_outpoints: input_outpoints ? Sequel.pg_array(input_outpoints, :text) : nil, change_outpoints: change_outpoints ? Sequel.pg_array(change_outpoints, :text) : nil, fund_ref: fund_ref, status: 'unsent' } begin @db[:wallet_broadcast_jobs].insert(row) rescue Sequel::UniqueConstraintViolation existing_status = @db[:wallet_broadcast_jobs].where(txid: txid).get(:status) return { txid: txid, broadcast_status: existing_status || 'sending' } end { txid: txid, broadcast_status: 'sending' } end |
#start ⇒ void
This method returns an undefined value.
Spawns the background worker thread.
Safe to call multiple times — returns immediately if already running. The check-and-set is atomic under the mutex to prevent two concurrent start calls from spawning duplicate worker threads.
147 148 149 150 151 152 153 154 |
# File 'lib/bsv/wallet_postgres/solid_queue_adapter.rb', line 147 def start @mutex.synchronize do return if @running || @worker_thread&.alive? @running = true end @worker_thread = Thread.new { worker_loop } end |
#status(txid) ⇒ String?
Returns the broadcast status for a previously enqueued transaction.
Reads directly from the jobs table — the authoritative source of truth for async broadcast status.
136 137 138 |
# File 'lib/bsv/wallet_postgres/solid_queue_adapter.rb', line 136 def status(txid) @db[:wallet_broadcast_jobs].where(txid: txid).get(:status) end |
#stop ⇒ void
This method returns an undefined value.
Signals the worker to stop after the current poll cycle.
Non-blocking — returns immediately without waiting for the thread.
161 162 163 |
# File 'lib/bsv/wallet_postgres/solid_queue_adapter.rb', line 161 def stop @mutex.synchronize { @running = false } end |