Class: Hypertube::Core::WebSocketClient::WebSocketClient

Inherits:
Object
  • Object
show all
Defined in:
lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb

Constant Summary collapse

MAX_ATTEMPTS =
2
DEFAULT_TIMEOUT_SECONDS =
60
TRANSIENT_ERRORS =
[
  Errno::EPIPE,
  Errno::ECONNRESET,
  Errno::ETIMEDOUT,
  IOError,
  SystemCallError,
  Timeout::Error,
  OpenSSL::SSL::SSLError,
  WebSocket::Error
].freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_data) ⇒ WebSocketClient

Returns a new instance of WebSocketClient.



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 138

def initialize(connection_data)
  @connection_data = connection_data
  @closed = false
  url = connection_data.respond_to?(:hostname) ? connection_data.hostname : connection_data.to_s
  @connect_timeout = connection_data.respond_to?(:connect_timeout) ? connection_data.connect_timeout : nil
  @uri = URI.parse(url)
  raise "WebSocket URL must use ws:// or wss://. Got: #{url}" unless %w[ws wss].include?(@uri.scheme)

  @host = @uri.host
  @port = @uri.port || default_port
  @path = (@uri.path.nil? || @uri.path.empty?) ? '/' : @uri.path
  @path += "?#{@uri.query}" if @uri.query

  @send_receive_lock = Mutex.new

  begin
    @socket = open_socket
    @handshake = WebSocket::Handshake::Client.new(url: url, headers: request_headers)
    perform_handshake
  rescue Errno::ECONNREFUSED, Errno::ETIMEDOUT, Errno::EHOSTUNREACH, SocketError, IOError, SystemCallError,
         OpenSSL::SSL::SSLError => e
    mark_closed
    raise "WebSocket connection to #{url} failed: #{e.message}. Check that the server is running, the URL is correct (ws:// or wss://), and that no firewall or proxy is blocking the connection."
  rescue WebSocket::Error => e
    mark_closed
    raise "WebSocket connection to #{url} failed: #{e.message}. Check that the server is running and supports WebSocket."
  end

  @incoming = WebSocket::Frame::Incoming::Client.new(version: @handshake.version)
end

Class Method Details

.close(connection_data, suppress_close_frame: false) ⇒ Object

Equivalent to C# Close(uri) suppress_close_frame can emulate “do not send CLOSE” during reconnect handling



66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 66

def close(connection_data, suppress_close_frame: false)
  key = connection_key(connection_data)
  @clients_lock.synchronize do
    if (client = @clients[key])
      begin
        client.close(suppress_close_frame: suppress_close_frame)
      ensure
        @clients.delete(key)
      end
    end
  end
end

.get_state(connection_data) ⇒ Object

Equivalent to C# GetState(uri, out state) Returns :OPEN, :CLOSED, or nil if no cached client



81
82
83
84
85
86
87
88
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 81

def get_state(connection_data)
  key = connection_key(connection_data)
  @clients_lock.synchronize do
    c = @clients[key]
    return nil unless c
    c.open? ? :OPEN : :CLOSED
  end
end

.send_message(connection_data, message) ⇒ Object

Equivalent to C# SendMessage(Uri, byte[]) message can be String (bytes) or Array<Integer 0..255>

Raises:

  • (ArgumentError)


31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 31

def send_message(connection_data, message)
  raise ArgumentError, "message cannot be nil" if message.nil?
  key = connection_key(connection_data)

  client = create_or_get_client(connection_data, expected_current: nil)

  last_ex = nil

  MAX_ATTEMPTS.times do |attempt|
    begin
      # Serialize send+receive per connection instance (like _sendReceiveLock)
      client.with_send_receive_lock do
        client.send_bytes(message)
        return client.receive_bytes
      end
    rescue *TRANSIENT_ERRORS => ex
      last_ex = ex
      client.mark_closed
      break if attempt == MAX_ATTEMPTS - 1

      # Recreate semantics: only replace if the cached instance equals the one that failed
      client = create_or_get_client(connection_data, expected_current: client)
    end
  end

  if last_ex
    e = RuntimeError.new("WebSocket send to #{key} failed after #{MAX_ATTEMPTS} attempts. Last error: #{last_ex.message}")
    e.set_backtrace(last_ex.backtrace)
    raise e
  end
  raise RuntimeError, "WebSocket send to #{key} failed after #{MAX_ATTEMPTS} attempts."
end

Instance Method Details

#close(suppress_close_frame: false) ⇒ Object



245
246
247
248
249
250
251
252
253
254
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 245

def close(suppress_close_frame: false)
  begin
    send_close_reply unless suppress_close_frame || @socket.closed?
  rescue Errno::EPIPE, Errno::ECONNRESET, IOError, SystemCallError, OpenSSL::SSL::SSLError, WebSocket::Error
    # ignore
  ensure
    mark_closed
    @socket.close unless @socket.closed?
  end
end

#mark_closedObject



174
175
176
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 174

def mark_closed
  @closed = true
end

#open?Boolean

Returns:

  • (Boolean)


237
238
239
240
241
242
243
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 237

def open?
  return false if @closed
  return false if @socket.closed?
  true
rescue IOError, SystemCallError, OpenSSL::SSL::SSLError
  false
end

#receive_bytes(timeout: DEFAULT_TIMEOUT_SECONDS) ⇒ Object

Receive a full message payload as Array<Integer> (bytes) Raises on CLOSE like the C# code (instead of returning nil).



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 198

def receive_bytes(timeout: DEFAULT_TIMEOUT_SECONDS)
  deadline = Time.now + timeout
  payload_chunks = []

  loop do
    if (frame = @incoming.next)
      case frame.type
      when :binary, :text, :continuation
        payload_chunks << frame.data
        # Note: if you want to support fragmented continuation properly, you’d track FIN;
        # the websocket gem’s Incoming client typically handles reassembly at this layer.
        return payload_chunks.join.bytes
      when :ping
        send_pong(frame.data)
      when :close
        # Mirror C#: close and throw
        send_close_reply
        raise WebSocket::Error, "WebSocket connection to #{@uri} was closed by the remote endpoint. The server may have shut down or closed the connection."
      end
      next
    end

    remaining = deadline - Time.now
    raise Timeout::Error, "WebSocket receive from #{@uri} timed out. The server did not respond in time." if remaining <= 0

    if IO.select([@socket], nil, nil, [remaining, 0.05].max)
      chunk = @socket.read_nonblock(64 * 1024, exception: false)
      case chunk
      when nil
        raise EOFError, "peer closed"
      when :wait_readable
        next
      else
        @incoming << chunk
      end
    end
  end
end

#send_bytes(message) ⇒ Object

Send bytes (String bytes or Array of bytes)



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 179

def send_bytes(message)
  data =
    case message
    when String then message.b
    when Array  then message.pack('C*')
    else raise ArgumentError, "Unsupported message type: #{message.class}"
    end

  frame = WebSocket::Frame::Outgoing::Client.new(
    version: @handshake.version,
    data: data,
    type: :binary
  )
  write_all(frame.to_s)
  true
end

#with_send_receive_lockObject

Serialize send+receive for this connection instance



170
171
172
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 170

def with_send_receive_lock
  @send_receive_lock.synchronize { yield }
end