Class: Hypertube::Core::WebSocketClient::WebSocketClient

Inherits:
Object
  • Object
show all
Defined in:
lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb

Constant Summary collapse

MAX_ATTEMPTS =
2
DEFAULT_CONNECT_TIMEOUT_SECONDS =
120
DEFAULT_RESPONSE_TIMEOUT_SECONDS =
180
TRANSIENT_ERRORS =
[
  Errno::EPIPE,
  Errno::ECONNRESET,
  Errno::ETIMEDOUT,
  IOError,
  SystemCallError,
  Timeout::Error,
  OpenSSL::SSL::SSLError,
  WebSocket::Error
].freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_data) ⇒ WebSocketClient

Returns a new instance of WebSocketClient.



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 139

def initialize(connection_data)
  @connection_data = connection_data
  @closed = false
  url = connection_data.respond_to?(:hostname) ? connection_data.hostname : connection_data.to_s
  @response_timeout =
    connection_data.respond_to?(:response_timeout) ? connection_data.response_timeout : DEFAULT_RESPONSE_TIMEOUT_SECONDS
  @uri = URI.parse(url)
  raise "WebSocket URL must use ws:// or wss://. Got: #{url}" unless %w[ws wss].include?(@uri.scheme)

  @host = @uri.host
  @port = @uri.port || default_port
  @path = (@uri.path.nil? || @uri.path.empty?) ? '/' : @uri.path
  @path += "?#{@uri.query}" if @uri.query

  @send_receive_lock = Mutex.new

  begin
    @socket = open_socket
    @handshake = WebSocket::Handshake::Client.new(url: url, headers: request_headers)
    perform_handshake
  rescue Errno::ECONNREFUSED, Errno::ETIMEDOUT, Errno::EHOSTUNREACH, SocketError, IOError, SystemCallError,
         OpenSSL::SSL::SSLError => e
    mark_closed
    raise "WebSocket connection to #{url} failed: #{e.message}. Check that the server is running, the URL is correct (ws:// or wss://), and that no firewall or proxy is blocking the connection."
  rescue WebSocket::Error => e
    mark_closed
    raise "WebSocket connection to #{url} failed: #{e.message}. Check that the server is running and supports WebSocket."
  end

  @incoming = WebSocket::Frame::Incoming::Client.new(version: @handshake.version)
end

Class Method Details

.close(connection_data, suppress_close_frame: false) ⇒ Object

Equivalent to C# Close(uri) suppress_close_frame can emulate "do not send CLOSE" during reconnect handling



67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 67

def close(connection_data, suppress_close_frame: false)
  key = connection_key(connection_data)
  @clients_lock.synchronize do
    if (client = @clients[key])
      begin
        client.close(suppress_close_frame: suppress_close_frame)
      ensure
        @clients.delete(key)
      end
    end
  end
end

.get_state(connection_data) ⇒ Object

Equivalent to C# GetState(uri, out state) Returns :OPEN, :CLOSED, or nil if no cached client



82
83
84
85
86
87
88
89
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 82

def get_state(connection_data)
  key = connection_key(connection_data)
  @clients_lock.synchronize do
    c = @clients[key]
    return nil unless c
    c.open? ? :OPEN : :CLOSED
  end
end

.send_message(connection_data, message) ⇒ Object

Equivalent to C# SendMessage(Uri, byte) message can be String (bytes) or Array<Integer 0..255>

Raises:

  • (ArgumentError)


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 32

def send_message(connection_data, message)
  raise ArgumentError, "message cannot be nil" if message.nil?
  key = connection_key(connection_data)

  client = create_or_get_client(connection_data, expected_current: nil)

  last_ex = nil

  MAX_ATTEMPTS.times do |attempt|
    begin
      # Serialize send+receive per connection instance (like _sendReceiveLock)
      client.with_send_receive_lock do
        client.send_bytes(message)
        return client.receive_bytes
      end
    rescue *TRANSIENT_ERRORS => ex
      last_ex = ex
      client.mark_closed
      break if attempt == MAX_ATTEMPTS - 1

      # Recreate semantics: only replace if the cached instance equals the one that failed
      client = create_or_get_client(connection_data, expected_current: client)
    end
  end

  if last_ex
    e = RuntimeError.new("WebSocket send to #{key} failed after #{MAX_ATTEMPTS} attempts. Last error: #{last_ex.message}")
    e.set_backtrace(last_ex.backtrace)
    raise e
  end
  raise RuntimeError, "WebSocket send to #{key} failed after #{MAX_ATTEMPTS} attempts."
end

Instance Method Details

#close(suppress_close_frame: false) ⇒ Object



249
250
251
252
253
254
255
256
257
258
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 249

def close(suppress_close_frame: false)
  begin
    send_close_reply unless suppress_close_frame || @socket.closed?
  rescue Errno::EPIPE, Errno::ECONNRESET, IOError, SystemCallError, OpenSSL::SSL::SSLError, WebSocket::Error
    # ignore
  ensure
    mark_closed
    @socket.close unless @socket.closed?
  end
end

#mark_closedObject



176
177
178
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 176

def mark_closed
  @closed = true
end

#open?Boolean

Returns:

  • (Boolean)


241
242
243
244
245
246
247
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 241

def open?
  return false if @closed
  return false if @socket.closed?
  true
rescue IOError, SystemCallError, OpenSSL::SSL::SSLError
  false
end

#receive_bytes(timeout: nil) ⇒ Object

Receive a full message payload as Array (bytes) Raises on CLOSE like the C# code (instead of returning nil).



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 200

def receive_bytes(timeout: nil)
  timeout = @response_timeout if timeout.nil?
  deadline = timeout && timeout >= 0 ? Time.now + timeout : nil
  payload_chunks = []

  loop do
    if (frame = @incoming.next)
      case frame.type
      when :binary, :text, :continuation
        payload_chunks << frame.data
        # Note: if you want to support fragmented continuation properly, you’d track FIN;
        # the websocket gem’s Incoming client typically handles reassembly at this layer.
        return payload_chunks.join.bytes
      when :ping
        send_pong(frame.data)
      when :close
        # Mirror C#: close and throw
        send_close_reply
        raise WebSocket::Error, "WebSocket connection to #{@uri} was closed by the remote endpoint. The server may have shut down or closed the connection."
      end
      next
    end

    remaining = deadline ? (deadline - Time.now) : nil
    raise Timeout::Error, "WebSocket receive from #{@uri} timed out. The server did not respond in time." if deadline && remaining <= 0

    wait_timeout = remaining ? [remaining, 0.05].max : nil
    if IO.select([@socket], nil, nil, wait_timeout)
      chunk = @socket.read_nonblock(64 * 1024, exception: false)
      case chunk
      when nil
        raise EOFError, "peer closed"
      when :wait_readable
        next
      else
        @incoming << chunk
      end
    end
  end
end

#send_bytes(message) ⇒ Object

Send bytes (String bytes or Array of bytes)



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 181

def send_bytes(message)
  data =
    case message
    when String then message.b
    when Array  then message.pack('C*')
    else raise ArgumentError, "Unsupported message type: #{message.class}"
    end

  frame = WebSocket::Frame::Outgoing::Client.new(
    version: @handshake.version,
    data: data,
    type: :binary
  )
  write_all(frame.to_s)
  true
end

#with_send_receive_lockObject

Serialize send+receive for this connection instance



172
173
174
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 172

def with_send_receive_lock
  @send_receive_lock.synchronize { yield }
end