Class: Hyperliquid::WebsocketManager
- Inherits:
-
Object
- Object
- Hyperliquid::WebsocketManager
- Defined in:
- lib/hyperliquid/websocket_manager.rb
Constant Summary collapse
- PING_INTERVAL =
seconds
50
Class Method Summary collapse
-
.subscription_to_identifier(subscription) ⇒ Object
Maps a subscription request to its identifier string.
-
.ws_msg_to_identifier(ws_msg) ⇒ Object
Maps an incoming WS message to its identifier string.
Instance Method Summary collapse
-
#initialize(base_url) ⇒ WebsocketManager
constructor
A new instance of WebsocketManager.
- #start ⇒ Object
- #stop ⇒ Object
- #subscribe(subscription, callback, subscription_id: nil) ⇒ Object
- #unsubscribe(subscription, subscription_id) ⇒ Object
Constructor Details
#initialize(base_url) ⇒ WebsocketManager
Returns a new instance of WebsocketManager.
12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/hyperliquid/websocket_manager.rb', line 12 def initialize(base_url) @subscription_id_counter = 0 @ws_ready = false @queued_subscriptions = [] @active_subscriptions = Hash.new { |h, k| h[k] = [] } @mutex = Mutex.new @stop_event = false ws_url = "ws#{base_url[4..]}/ws" @ws_url = ws_url @ws = nil end |
Class Method Details
.subscription_to_identifier(subscription) ⇒ Object
Maps a subscription request to its identifier string.
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/hyperliquid/websocket_manager.rb', line 83 def self.subscription_to_identifier(subscription) type = subscription["type"] case type when "allMids" "allMids" when "l2Book" "l2Book:#{subscription["coin"].downcase}" when "trades" "trades:#{subscription["coin"].downcase}" when "userEvents" "userEvents" when "userFills" "userFills:#{subscription["user"].downcase}" when "candle" "candle:#{subscription["coin"].downcase},#{subscription["interval"]}" when "orderUpdates" "orderUpdates" when "userFundings" "userFundings:#{subscription["user"].downcase}" when "userNonFundingLedgerUpdates" "userNonFundingLedgerUpdates:#{subscription["user"].downcase}" when "webData2" "webData2:#{subscription["user"].downcase}" when "bbo" "bbo:#{subscription["coin"].downcase}" when "activeAssetCtx" "activeAssetCtx:#{subscription["coin"].downcase}" when "activeAssetData" "activeAssetData:#{subscription["coin"].downcase},#{subscription["user"].downcase}" end end |
.ws_msg_to_identifier(ws_msg) ⇒ Object
Maps an incoming WS message to its identifier string.
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/hyperliquid/websocket_manager.rb', line 116 def self.ws_msg_to_identifier(ws_msg) channel = ws_msg["channel"] case channel when "pong" "pong" when "allMids" "allMids" when "l2Book" "l2Book:#{ws_msg["data"]["coin"].downcase}" when "trades" trades = ws_msg["data"] return nil if trades.empty? "trades:#{trades[0]["coin"].downcase}" when "user" "userEvents" when "userFills" "userFills:#{ws_msg["data"]["user"].downcase}" when "candle" "candle:#{ws_msg["data"]["s"].downcase},#{ws_msg["data"]["i"]}" when "orderUpdates" "orderUpdates" when "userFundings" "userFundings:#{ws_msg["data"]["user"].downcase}" when "userNonFundingLedgerUpdates" "userNonFundingLedgerUpdates:#{ws_msg["data"]["user"].downcase}" when "webData2" "webData2:#{ws_msg["data"]["user"].downcase}" when "bbo" "bbo:#{ws_msg["data"]["coin"].downcase}" when "activeAssetCtx", "activeSpotAssetCtx" "activeAssetCtx:#{ws_msg["data"]["coin"].downcase}" when "activeAssetData" "activeAssetData:#{ws_msg["data"]["coin"].downcase},#{ws_msg["data"]["user"].downcase}" end end |
Instance Method Details
#start ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/hyperliquid/websocket_manager.rb', line 25 def start manager = self @ws = WebSocket::Client::Simple.connect(@ws_url) @ws.on :message do |msg| manager.send(:handle_message, msg.data) end @ws.on :open do manager.send(:handle_open) end @ping_thread = Thread.new { send_ping } end |
#stop ⇒ Object
40 41 42 43 44 |
# File 'lib/hyperliquid/websocket_manager.rb', line 40 def stop @mutex.synchronize { @stop_event = true } @ws&.close @ping_thread&.join end |
#subscribe(subscription, callback, subscription_id: nil) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/hyperliquid/websocket_manager.rb', line 46 def subscribe(subscription, callback, subscription_id: nil) @mutex.synchronize do if subscription_id.nil? @subscription_id_counter += 1 subscription_id = @subscription_id_counter end if @ws_ready identifier = self.class.subscription_to_identifier(subscription) if %w[userEvents orderUpdates].include?(identifier) && !@active_subscriptions[identifier].empty? raise NotImplementedError, "Cannot subscribe to #{identifier} multiple times" end @active_subscriptions[identifier] << ActiveSubscription.new(callback, subscription_id) @ws.send(JSON.generate({ "method" => "subscribe", "subscription" => subscription })) else @queued_subscriptions << [subscription, ActiveSubscription.new(callback, subscription_id)] end subscription_id end end |
#unsubscribe(subscription, subscription_id) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/hyperliquid/websocket_manager.rb', line 69 def unsubscribe(subscription, subscription_id) @mutex.synchronize do raise NotImplementedError, "Can't unsubscribe before websocket connected" unless @ws_ready identifier = self.class.subscription_to_identifier(subscription) active = @active_subscriptions[identifier] new_active = active.reject { |s| s.subscription_id == subscription_id } @ws.send(JSON.generate({ "method" => "unsubscribe", "subscription" => subscription })) if new_active.empty? @active_subscriptions[identifier] = new_active active.length != new_active.length end end |