Class: Hyperliquid::WS::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/hyperliquid/ws/client.rb

Overview

Managed WebSocket client for subscribing to real-time data channels

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(testnet: false, max_queue_size: Constants::WS_MAX_QUEUE_SIZE, reconnect: true) ⇒ Client

Returns a new instance of Client.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/hyperliquid/ws/client.rb', line 12

def initialize(testnet: false, max_queue_size: Constants::WS_MAX_QUEUE_SIZE, reconnect: true)
  base_url = testnet ? Constants::TESTNET_API_URL : Constants::MAINNET_API_URL
  @url = base_url.sub(%r{^https?://}, 'wss://') + Constants::WS_ENDPOINT
  @max_queue_size = max_queue_size
  @reconnect_enabled = reconnect

  @subscriptions = {}      # identifier => [{ id:, callback: }]
  @subscription_msgs = {}  # subscription_id => { subscription:, identifier: }
  @next_id = 0
  @mutex = Mutex.new
  @queue = Queue.new
  @dropped_message_count = 0

  @ws = nil
  @connected = false
  @closing = false
  @dispatch_thread = nil
  @ping_thread = nil
  @pending_subscriptions = []

  @lifecycle_callbacks = {}
  @reconnect_attempts = 0
  @connection_id = 0
end

Instance Attribute Details

#dropped_message_countObject (readonly)

Returns the value of attribute dropped_message_count.



10
11
12
# File 'lib/hyperliquid/ws/client.rb', line 10

def dropped_message_count
  @dropped_message_count
end

Instance Method Details

#closeObject



94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/hyperliquid/ws/client.rb', line 94

def close
  @closing = true
  @connected = false

  @ping_thread&.kill
  @ping_thread = nil

  @queue&.close if @queue.respond_to?(:close)
  @dispatch_thread&.join(5)
  @dispatch_thread = nil

  @ws&.close
  @ws = nil
end

#connectObject



37
38
39
40
41
42
43
44
# File 'lib/hyperliquid/ws/client.rb', line 37

def connect
  @closing = false
  @reconnect_attempts = 0
  establish_connection
  start_dispatch_thread
  start_ping_thread
  self
end

#connected?Boolean

Returns:

  • (Boolean)


109
110
111
# File 'lib/hyperliquid/ws/client.rb', line 109

def connected?
  @connected
end

#on(event, &callback) ⇒ Object



113
114
115
# File 'lib/hyperliquid/ws/client.rb', line 113

def on(event, &callback)
  @lifecycle_callbacks[event] = callback
end

#subscribe(subscription, &callback) ⇒ Object

Raises:

  • (ArgumentError)


46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/hyperliquid/ws/client.rb', line 46

def subscribe(subscription, &callback)
  raise ArgumentError, 'Block required for subscribe' unless block_given?

  identifier = subscription_identifier(subscription)
  sub_id = nil

  @mutex.synchronize do
    sub_id = @next_id
    @next_id += 1

    @subscriptions[identifier] ||= []
    @subscriptions[identifier] << { id: sub_id, callback: callback }
    @subscription_msgs[sub_id] = { subscription: subscription, identifier: identifier }
  end

  if @connected
    send_subscribe(subscription)
  else
    @mutex.synchronize { @pending_subscriptions << subscription }
    connect unless @ws
  end

  sub_id
end

#unsubscribe(subscription_id) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/hyperliquid/ws/client.rb', line 71

def unsubscribe(subscription_id)
  sub_msg = nil
  should_send = false

  @mutex.synchronize do
    sub_msg = @subscription_msgs.delete(subscription_id)
    return unless sub_msg

    identifier = sub_msg[:identifier]
    callbacks = @subscriptions[identifier]
    return unless callbacks

    callbacks.reject! { |entry| entry[:id] == subscription_id }

    if callbacks.empty?
      @subscriptions.delete(identifier)
      should_send = true
    end
  end

  send_unsubscribe(sub_msg[:subscription]) if should_send && @connected
end