Class: WebSocketClient

Inherits:
Object
  • Object
show all
Defined in:
lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb

Constant Summary collapse

MAX_ATTEMPTS =
2

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_data) ⇒ WebSocketClient

Returns a new instance of WebSocketClient.



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 121

def initialize(connection_data)
  url = connection_data.respond_to?(:hostname) ? connection_data.hostname : connection_data.to_s
  @connect_timeout = connection_data.respond_to?(:connect_timeout) ? connection_data.connect_timeout : nil
  @uri = URI.parse(url)
  raise "WebSocket URL must use ws:// or wss://. Got: #{url}" unless %w[ws wss].include?(@uri.scheme)

  @host = @uri.host
  @port = @uri.port || default_port
  @path = (@uri.path.nil? || @uri.path.empty?) ? '/' : @uri.path
  @path += "?#{@uri.query}" if @uri.query

  @send_receive_lock = Mutex.new

  begin
    @socket = open_socket
    @handshake = WebSocket::Handshake::Client.new(url: url)
    perform_handshake
  rescue Errno::ECONNREFUSED, Errno::ETIMEDOUT, Errno::EHOSTUNREACH, SocketError, IOError, SystemCallError => e
    raise "WebSocket connection to #{url} failed: #{e.message}. Check that the server is running, the URL is correct (ws:// or wss://), and that no firewall or proxy is blocking the connection."
  rescue WebSocket::Error => e
    raise "WebSocket connection to #{url} failed: #{e.message}. Check that the server is running and supports WebSocket."
  end

  @incoming = WebSocket::Frame::Incoming::Client.new(version: @handshake.version)
end

Class Method Details

.close(connection_data, suppress_close_frame: false) ⇒ Object

Equivalent to C# Close(uri) suppress_close_frame can emulate “do not send CLOSE” during reconnect handling



51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 51

def close(connection_data, suppress_close_frame: false)
  key = connection_key(connection_data)
  @clients_lock.synchronize do
    if (client = @clients[key])
      begin
        client.close(suppress_close_frame: suppress_close_frame)
      ensure
        @clients.delete(key)
      end
    end
  end
end

.get_state(connection_data) ⇒ Object

Equivalent to C# GetState(uri, out state) Returns :OPEN, :CLOSED, or nil if no cached client



66
67
68
69
70
71
72
73
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 66

def get_state(connection_data)
  key = connection_key(connection_data)
  @clients_lock.synchronize do
    c = @clients[key]
    return nil unless c
    c.open? ? :OPEN : :CLOSED
  end
end

.send_message(connection_data, message) ⇒ Object

Equivalent to C# SendMessage(Uri, byte[]) message can be String (bytes) or Array<Integer 0..255>

Raises:

  • (ArgumentError)


16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 16

def send_message(connection_data, message)
  raise ArgumentError, "message cannot be nil" if message.nil?
  key = connection_key(connection_data)

  client = create_or_get_client(connection_data, expected_current: nil)

  last_ex = nil

  MAX_ATTEMPTS.times do |attempt|
    begin
      # Serialize send+receive per connection instance (like _sendReceiveLock)
      client.with_send_receive_lock do
        client.send_bytes(message)
        return client.receive_bytes
      end
    rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ETIMEDOUT, IOError, SystemCallError,
      WebSocket::Error => ex
      last_ex = ex
      break if attempt == MAX_ATTEMPTS - 1

      # Recreate semantics: only replace if the cached instance equals the one that failed
      client = create_or_get_client(connection_data, expected_current: client)
    end
  end

  if last_ex
    e = RuntimeError.new("WebSocket send to #{key} failed after #{MAX_ATTEMPTS} attempts. Last error: #{last_ex.message}")
    e.set_backtrace(last_ex.backtrace)
    raise e
  end
  raise RuntimeError, "WebSocket send to #{key} failed after #{MAX_ATTEMPTS} attempts."
end

Instance Method Details

#close(suppress_close_frame: false) ⇒ Object



218
219
220
221
222
223
224
225
226
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 218

def close(suppress_close_frame: false)
  begin
    send_close_reply unless suppress_close_frame || @socket.closed?
  rescue Errno::EPIPE, Errno::ECONNRESET, IOError, SystemCallError, WebSocket::Error
    # ignore
  ensure
    @socket.close unless @socket.closed?
  end
end

#open?Boolean

Returns:

  • (Boolean)


211
212
213
214
215
216
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 211

def open?
  return false if @socket.closed?
  true
rescue IOError, SystemCallError
  false
end

#receive_bytes(timeout: 5) ⇒ Object

Receive a full message payload as Array<Integer> (bytes) Raises on CLOSE like the C# code (instead of returning nil).



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 172

def receive_bytes(timeout: 5)
  deadline = Time.now + timeout
  payload_chunks = []

  loop do
    if (frame = @incoming.next)
      case frame.type
      when :binary, :text, :continuation
        payload_chunks << frame.data
        # Note: if you want to support fragmented continuation properly, you’d track FIN;
        # the websocket gem’s Incoming client typically handles reassembly at this layer.
        return payload_chunks.join.bytes
      when :ping
        send_pong(frame.data)
      when :close
        # Mirror C#: close and throw
        send_close_reply
        raise WebSocket::Error, "WebSocket connection to #{@uri} was closed by the remote endpoint. The server may have shut down or closed the connection."
      end
      next
    end

    remaining = deadline - Time.now
    raise Timeout::Error, "WebSocket receive from #{@uri} timed out. The server did not respond in time." if remaining <= 0

    if IO.select([@socket], nil, nil, [remaining, 0.05].max)
      chunk = @socket.read_nonblock(64 * 1024, exception: false)
      case chunk
      when nil
        raise EOFError, "peer closed"
      when :wait_readable
        next
      else
        @incoming << chunk
      end
    end
  end
end

#send_bytes(message) ⇒ Object

Send bytes (String bytes or Array of bytes)



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 153

def send_bytes(message)
  data =
    case message
    when String then message.b
    when Array  then message.pack('C*')
    else raise ArgumentError, "Unsupported message type: #{message.class}"
    end

  frame = WebSocket::Frame::Outgoing::Client.new(
    version: @handshake.version,
    data: data,
    type: :binary
  )
  write_all(frame.to_s)
  true
end

#with_send_receive_lockObject

Serialize send+receive for this connection instance



148
149
150
# File 'lib/hypertube-ruby-sdk/core/web_socket_client/web_socket_client.rb', line 148

def with_send_receive_lock
  @send_receive_lock.synchronize { yield }
end