Class: Hyperliquid::WS::Client
- Inherits:
-
Object
- Object
- Hyperliquid::WS::Client
- Defined in:
- lib/hyperliquid/ws/client.rb
Overview
Managed WebSocket client for subscribing to real-time data channels
Instance Attribute Summary collapse
-
#dropped_message_count ⇒ Object
readonly
Returns the value of attribute dropped_message_count.
Instance Method Summary collapse
- #close ⇒ Object
- #connect ⇒ Object
- #connected? ⇒ Boolean
-
#initialize(testnet: false, max_queue_size: Constants::WS_MAX_QUEUE_SIZE, reconnect: true) ⇒ Client
constructor
A new instance of Client.
- #on(event, &callback) ⇒ Object
- #subscribe(subscription, &callback) ⇒ Object
- #unsubscribe(subscription_id) ⇒ Object
Constructor Details
#initialize(testnet: false, max_queue_size: Constants::WS_MAX_QUEUE_SIZE, reconnect: true) ⇒ Client
Returns a new instance of Client.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/hyperliquid/ws/client.rb', line 12 def initialize(testnet: false, max_queue_size: Constants::WS_MAX_QUEUE_SIZE, reconnect: true) base_url = testnet ? Constants::TESTNET_API_URL : Constants::MAINNET_API_URL @url = base_url.sub(%r{^https?://}, 'wss://') + Constants::WS_ENDPOINT @max_queue_size = max_queue_size @reconnect_enabled = reconnect @subscriptions = {} # identifier => [{ id:, callback: }] @subscription_msgs = {} # subscription_id => { subscription:, identifier: } @next_id = 0 @mutex = Mutex.new @queue = Queue.new @dropped_message_count = 0 @ws = nil @connected = false @closing = false @dispatch_thread = nil @ping_thread = nil @pending_subscriptions = [] @lifecycle_callbacks = {} @reconnect_attempts = 0 @connection_id = 0 end |
Instance Attribute Details
#dropped_message_count ⇒ Object (readonly)
Returns the value of attribute dropped_message_count.
10 11 12 |
# File 'lib/hyperliquid/ws/client.rb', line 10 def @dropped_message_count end |
Instance Method Details
#close ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/hyperliquid/ws/client.rb', line 94 def close @closing = true @connected = false @ping_thread&.kill @ping_thread = nil @queue&.close if @queue.respond_to?(:close) @dispatch_thread&.join(5) @dispatch_thread = nil @ws&.close @ws = nil end |
#connect ⇒ Object
37 38 39 40 41 42 43 44 |
# File 'lib/hyperliquid/ws/client.rb', line 37 def connect @closing = false @reconnect_attempts = 0 establish_connection start_dispatch_thread start_ping_thread self end |
#connected? ⇒ Boolean
109 110 111 |
# File 'lib/hyperliquid/ws/client.rb', line 109 def connected? @connected end |
#on(event, &callback) ⇒ Object
113 114 115 |
# File 'lib/hyperliquid/ws/client.rb', line 113 def on(event, &callback) @lifecycle_callbacks[event] = callback end |
#subscribe(subscription, &callback) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/hyperliquid/ws/client.rb', line 46 def subscribe(subscription, &callback) raise ArgumentError, 'Block required for subscribe' unless block_given? identifier = subscription_identifier(subscription) sub_id = nil @mutex.synchronize do sub_id = @next_id @next_id += 1 @subscriptions[identifier] ||= [] @subscriptions[identifier] << { id: sub_id, callback: callback } @subscription_msgs[sub_id] = { subscription: subscription, identifier: identifier } end if @connected send_subscribe(subscription) else @mutex.synchronize { @pending_subscriptions << subscription } connect unless @ws end sub_id end |
#unsubscribe(subscription_id) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/hyperliquid/ws/client.rb', line 71 def unsubscribe(subscription_id) sub_msg = nil should_send = false @mutex.synchronize do sub_msg = @subscription_msgs.delete(subscription_id) return unless sub_msg identifier = sub_msg[:identifier] callbacks = @subscriptions[identifier] return unless callbacks callbacks.reject! { |entry| entry[:id] == subscription_id } if callbacks.empty? @subscriptions.delete(identifier) should_send = true end end send_unsubscribe(sub_msg[:subscription]) if should_send && @connected end |