Class: BSV::Wallet::Postgres::BroadcastQueue

Inherits:
Object
  • Object
show all
Includes:
Interface::BroadcastQueue
Defined in:
lib/bsv/wallet/postgres/broadcast_queue.rb

Overview

Broadcast lifecycle manager backed by the broadcasts table.

Owns network communication for transaction broadcasts. Uses Services for push/fetch operations and delegates column updates to Broadcast#write!.

Instance Method Summary collapse

Constructor Details

#initialize(db: nil, services: nil) ⇒ BroadcastQueue

Returns a new instance of BroadcastQueue.



14
15
16
17
# File 'lib/bsv/wallet/postgres/broadcast_queue.rb', line 14

def initialize(db: nil, services: nil)
  @db = db || BSV::Wallet::Postgres.db
  @services = services
end

Instance Method Details

#handle_event(event) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/bsv/wallet/postgres/broadcast_queue.rb', line 47

def handle_event(event)
  BSV::Primitives::Hex.validate_wtxid!(event[:wtxid], name: 'handle_event wtxid')
  BSV.logger&.debug { "[BroadcastQueue] handle_event: dtxid=#{event[:wtxid].reverse.unpack1('H*')} status=#{event[:tx_status]}" }
  action = Action.first(wtxid: Sequel.blob(event[:wtxid]))
  return unless action

  broadcast = Broadcast.first(action_id: action.id)
  broadcast ||= Broadcast.create(action_id: action.id)

  broadcast.update(
    tx_status:     event[:tx_status],
    arc_status:    event[:status],
    block_hash:    event[:block_hash] ? Sequel.blob(event[:block_hash]) : nil,
    block_height:  event[:block_height],
    merkle_path:   event[:merkle_path] ? Sequel.blob(event[:merkle_path]) : nil,
    extra_info:    event[:extra_info],
    competing_txs: event[:competing_txs] ? Sequel.pg_array(event[:competing_txs]) : nil
  )

  {
    action_id:    action.id,
    tx_status:    broadcast.tx_status,
    block_hash:   broadcast.block_hash,
    block_height: broadcast.block_height,
    merkle_path:  broadcast.merkle_path
  }
end

#process_pending(limit: 100) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/bsv/wallet/postgres/broadcast_queue.rb', line 30

def process_pending(limit: 100)
  stale = Broadcast
    .where { broadcast_at < Time.now - Broadcast::FETCH_STALENESS }
    .where(Sequel.|({ tx_status: nil }, Sequel.~(tx_status: Broadcast::TERMINAL_STATUSES)))
    .limit(limit)
    .all

  stale.filter_map do |broadcast|
    next unless broadcast.action&.wtxid && @services

    result = @services.fetch!(broadcast)
    next unless result.http_success?

    broadcast_to_hash(broadcast.reload)
  end
end

#status(action_id:) ⇒ Object



75
76
77
78
79
80
# File 'lib/bsv/wallet/postgres/broadcast_queue.rb', line 75

def status(action_id:)
  broadcast = Broadcast.first(action_id: action_id)
  return unless broadcast

  broadcast_to_hash(broadcast)
end

#submit(action_id:, raw_tx:, immediate: false) ⇒ Object



19
20
21
22
23
24
25
26
27
28
# File 'lib/bsv/wallet/postgres/broadcast_queue.rb', line 19

def submit(action_id:, raw_tx:, immediate: false)
  BSV.logger&.debug { "[BroadcastQueue] submit: action_id=#{action_id} immediate=#{immediate}" }
  broadcast = Broadcast.create(action_id: action_id)

  if immediate && @services
    @services.push!(broadcast)
  end

  broadcast_to_hash(broadcast.reload)
end