Class: BSV::Wallet::Postgres::BroadcastQueue
- Inherits:
-
Object
- Object
- BSV::Wallet::Postgres::BroadcastQueue
- Includes:
- Interface::BroadcastQueue
- Defined in:
- lib/bsv/wallet/postgres/broadcast_queue.rb
Overview
Broadcast lifecycle manager backed by the broadcasts table.
Owns network communication for transaction broadcasts. Uses Services for push/fetch operations and delegates column updates to Broadcast#write!.
Instance Method Summary collapse
- #handle_event(event) ⇒ Object
-
#initialize(db: nil, services: nil) ⇒ BroadcastQueue
constructor
A new instance of BroadcastQueue.
- #process_pending(limit: 100) ⇒ Object
- #status(action_id:) ⇒ Object
- #submit(action_id:, raw_tx:, immediate: false) ⇒ Object
Constructor Details
#initialize(db: nil, services: nil) ⇒ BroadcastQueue
Returns a new instance of BroadcastQueue.
14 15 16 17 |
# File 'lib/bsv/wallet/postgres/broadcast_queue.rb', line 14 def initialize(db: nil, services: nil) @db = db || BSV::Wallet::Postgres.db @services = services end |
Instance Method Details
#handle_event(event) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/bsv/wallet/postgres/broadcast_queue.rb', line 47 def handle_event(event) BSV::Primitives::Hex.validate_wtxid!(event[:wtxid], name: 'handle_event wtxid') BSV.logger&.debug { "[BroadcastQueue] handle_event: dtxid=#{event[:wtxid].reverse.unpack1('H*')} status=#{event[:tx_status]}" } action = Action.first(wtxid: Sequel.blob(event[:wtxid])) return unless action broadcast = Broadcast.first(action_id: action.id) broadcast ||= Broadcast.create(action_id: action.id) broadcast.update( tx_status: event[:tx_status], arc_status: event[:status], block_hash: event[:block_hash] ? Sequel.blob(event[:block_hash]) : nil, block_height: event[:block_height], merkle_path: event[:merkle_path] ? Sequel.blob(event[:merkle_path]) : nil, extra_info: event[:extra_info], competing_txs: event[:competing_txs] ? Sequel.pg_array(event[:competing_txs]) : nil ) { action_id: action.id, tx_status: broadcast.tx_status, block_hash: broadcast.block_hash, block_height: broadcast.block_height, merkle_path: broadcast.merkle_path } end |
#process_pending(limit: 100) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/bsv/wallet/postgres/broadcast_queue.rb', line 30 def process_pending(limit: 100) stale = Broadcast .where { broadcast_at < Time.now - Broadcast::FETCH_STALENESS } .where(Sequel.|({ tx_status: nil }, Sequel.~(tx_status: Broadcast::TERMINAL_STATUSES))) .limit(limit) .all stale.filter_map do |broadcast| next unless broadcast.action&.wtxid && @services result = @services.fetch!(broadcast) next unless result.http_success? broadcast_to_hash(broadcast.reload) end end |
#status(action_id:) ⇒ Object
75 76 77 78 79 80 |
# File 'lib/bsv/wallet/postgres/broadcast_queue.rb', line 75 def status(action_id:) broadcast = Broadcast.first(action_id: action_id) return unless broadcast broadcast_to_hash(broadcast) end |
#submit(action_id:, raw_tx:, immediate: false) ⇒ Object
19 20 21 22 23 24 25 26 27 28 |
# File 'lib/bsv/wallet/postgres/broadcast_queue.rb', line 19 def submit(action_id:, raw_tx:, immediate: false) BSV.logger&.debug { "[BroadcastQueue] submit: action_id=#{action_id} immediate=#{immediate}" } broadcast = Broadcast.create(action_id: action_id) if immediate && @services @services.push!(broadcast) end broadcast_to_hash(broadcast.reload) end |