Class: Tempest::Jetstream::AsyncWebSocketTransport

Inherits:
Object
  • Object
show all
Defined in:
lib/tempest/jetstream/client.rb

Overview

Default WebSocket transport using async-websocket. Loaded lazily so unit tests that inject a stub transport don’t pull in the Async runtime.

Instance Method Summary collapse

Constructor Details

#initializeAsyncWebSocketTransport

Returns a new instance of AsyncWebSocketTransport.



51
52
53
54
55
# File 'lib/tempest/jetstream/client.rb', line 51

def initialize
  require "async"
  require "async/http/endpoint"
  require "async/websocket/client"
end

Instance Method Details

#each_message(url) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/tempest/jetstream/client.rb', line 57

def each_message(url)
  # `finished: false` makes Async::Task call `@promise.suppress_warnings!`
  # so that connect failures (DNS errors after offline wake-from-sleep,
  # TCP resets, TLS handshake errors) don't trigger Console.logger.warn
  # with "Task may have ended with unhandled exception." plus a full
  # backtrace. The exception still propagates via `.wait`; the
  # StreamManager's reconnect loop already handles it cleanly.
  Async(finished: false) do
    endpoint = Async::HTTP::Endpoint.parse(url)
    Async::WebSocket::Client.connect(endpoint) do |connection|
      while (message = connection.read)
        yield message.buffer
      end
    end
  end.wait
end