Class: Solrengine::Sdp::Broadcaster

Inherits:
Object
  • Object
show all
Defined in:
lib/solrengine/sdp/broadcaster.rb

Overview

Turns a chain-change signal (“this wallet’s account changed”) into the app’s configured broadcasts — or into silence. The WebSocket notification is only a doorbell: it carries no data. The broadcaster re-fetches everything displayed from the authoritative source (SDP) and broadcasts only when every fetch succeeded, so screens never regress from good content to an “unavailable” state mid-session; the last good content simply stays.

The app supplies broadcast targets — via Solrengine::Sdp.configure or the targets: argument — as an ordered array of hashes:

config.broadcast_targets = [
  { name: :activity,
    fetch:  ->(user) { ActivityFeed.entries_for(user) },
    render: ->(user, entries) {
      Turbo::StreamsChannel.broadcast_update_to(
        user.realtime_stream, target: "activity",
        partial: "activity/feed", locals: { entries: entries })
    } },
  { name: :balance, fetch: ..., render: ... }
]

The ENGINE owns the doorbell invariants; the lambdas own the app specifics:

* All-or-nothing: every target's fetch runs first, in configured
  order. Any fetch raising — or returning :unavailable — means NO
  renders at all this attempt (nil is valid data; signal "I could not
  fetch" with :unavailable or an exception).
* Consumed doorbells retry: a WebSocket notification never re-fires,
  so a transient SDP hiccup would otherwise permanently miss the
  update. The WHOLE cycle (fetch + render) retries up to
  configuration.broadcast_retries attempts, sleeping
  broadcast_retry_delay seconds between attempts (zero it in tests).
  Safe to sleep: solrengine-realtime invokes subscribers on a
  dedicated per-wallet broadcast thread.
* Priority order: renders run in the configured array order — put
  fast, money-bearing regions first, decorative ones last.
* Request-context-free: lambdas run in a watcher process, outside any
  HTTP request. Current.user, session, and request-thread locals are
  nil; partials need explicit locals.

USD enrichment inside fetch lambdas: Solrengine::Sdp.usd_value_for(balance) — SDP’s usd_value when present, Jupiter-derived when solrengine-tokens is available, nil otherwise. Price failures never fail a fetch.

Turbo deliberately never appears in this class: render lambdas do the actual broadcasting, so the engine core carries no turbo-rails dependency and the invariants are testable without it.

Constant Summary collapse

TARGET_KEYS =
%i[name fetch render].freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(wallet_address, targets: nil) ⇒ Broadcaster

Returns a new instance of Broadcaster.



61
62
63
64
# File 'lib/solrengine/sdp/broadcaster.rb', line 61

def initialize(wallet_address, targets: nil)
  @wallet_address = wallet_address
  @targets = validate_targets(targets || configuration.broadcast_targets)
end

Class Method Details

.call(wallet_address, targets: nil) ⇒ Object



57
58
59
# File 'lib/solrengine/sdp/broadcaster.rb', line 57

def self.call(wallet_address, targets: nil)
  new(wallet_address, targets: targets).call
end

Instance Method Details

#callObject

Resolves the wallet owner and runs the broadcast cycle. Unknown or not-yet-ready wallets (the fee payer, external counterparties) are a silent no-op — not ours to broadcast. Returns true when a cycle completed, false when retries were exhausted, nil on no-op.



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/solrengine/sdp/broadcaster.rb', line 70

def call
  if @targets.empty?
    logger&.info(
      "[Solrengine::Sdp::Broadcaster] No broadcast_targets configured — nothing to broadcast. " \
      "Set config.broadcast_targets in a Solrengine::Sdp.configure block to enable realtime updates."
    )
    return
  end

  # This executes on a long-lived per-wallet broadcast thread
  # (solrengine-realtime), NOT inside an HTTP request. Rails only
  # auto-releases AR connections at request boundaries, so a bare
  # find_by here permanently holds a connection on the thread.  With
  # a default pool of 5 that means the sixth wallet's broadcast
  # raises ConnectionTimeoutError — rescued by the realtime registry
  # and silently dropped. with_connection returns the lease to the
  # pool as soon as the block exits, before the fetch/render/sleep
  # cycle starts. App-provided fetch lambdas may also touch the DB;
  # that is the app's concern and is intentionally out of this scope.
  user = ActiveRecord::Base.connection_pool.with_connection do
    configuration.user_model.wallet_ready.find_by(wallet_address: @wallet_address)
  end
  return unless user

  attempts = configuration.broadcast_retries
  attempts.times do |attempt|
    return true if attempt_broadcast(user)

    sleep configuration.broadcast_retry_delay unless attempt == attempts - 1
  end

  logger&.warn(
    "[Solrengine::Sdp::Broadcaster] Giving up on #{@wallet_address}: " \
    "SDP unavailable — keeping last good content"
  )
  false
end