Class: Clacky::Channel::Adapters::Feishu::WSClient

Inherits:
Object
  • Object
show all
Defined in:
lib/clacky/server/channel/adapters/feishu/ws_client.rb

Overview

WebSocket client for Feishu long connection mode. Feishu uses protobuf-encoded binary frames (pbbp2.Frame) over WebSocket. Frame fields: SeqID(1), LogID(2), service(3), method(4), headers(5),

payloadType(7), payload(8), LogIDNew(9)

method=0 → control (ping/pong/handshake), method=1 → data (event)

Defined Under Namespace

Modules: ProtoFrame

Constant Summary collapse

RECONNECT_DELAY =

seconds

5
READ_TIMEOUT_MULTIPLIER =

Timeout for IO.select on the read loop. Feishu server sends pings every treating the connection as dead.

2.5

Instance Method Summary collapse

Constructor Details

#initialize(app_id:, app_secret:, domain: DEFAULT_DOMAIN) ⇒ WSClient

Returns a new instance of WSClient.



20
21
22
23
24
25
26
27
28
29
30
# File 'lib/clacky/server/channel/adapters/feishu/ws_client.rb', line 20

def initialize(app_id:, app_secret:, domain: DEFAULT_DOMAIN)
  @app_id = app_id
  @app_secret = app_secret
  @domain = domain
  @running = false
  @ws = nil
  @ping_thread = nil
  @ping_interval = 90 # overridden by server config
  @seq_id = 0
  @service_id = 0
end

Instance Method Details

#connect_and_listenObject



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/clacky/server/channel/adapters/feishu/ws_client.rb', line 60

def connect_and_listen
  Clacky::Logger.info("[feishu-ws] Fetching WebSocket endpoint...")
  endpoint = fetch_ws_endpoint
  Clacky::Logger.info("[feishu-ws] Connecting to #{endpoint.split("?").first}")
  uri = URI.parse(endpoint)

  port = uri.port || (uri.scheme == "wss" ? 443 : 80)
  tcp = TCPSocket.new(uri.host, port)

  socket = if uri.scheme == "wss"
    require "openssl"
    ssl_context = OpenSSL::SSL::SSLContext.new
    ssl_context.set_params(verify_mode: OpenSSL::SSL::VERIFY_PEER)
    ssl = OpenSSL::SSL::SSLSocket.new(tcp, ssl_context)
    ssl.sync_close = true
    ssl.connect
    ssl
  else
    tcp
  end

  # WebSocket handshake
  handshake = WebSocket::Handshake::Client.new(url: endpoint)
  socket.write(handshake.to_s)

  # Read until handshake complete
  until handshake.finished?
    handshake << socket.readpartial(4096)
  end
  raise "WebSocket handshake failed" unless handshake.valid?

  Clacky::Logger.info("[feishu-ws] WebSocket connected")
  @ws_version = handshake.version
  @ws_socket  = socket
  @ws_open    = true
  @incoming   = WebSocket::Frame::Incoming::Client.new(version: @ws_version)

  start_ping_thread

  # read_timeout is based on the server-provided ping interval so it
  # automatically adapts if Feishu changes the cadence.
  read_timeout = (@ping_interval * READ_TIMEOUT_MULTIPLIER).ceil

  loop do
    break unless @running

    # Use IO.select with a timeout to detect silent connection drops
    # (NAT expiry, firewall idle-kill) that never send a TCP FIN/RST.
    ready = IO.select([socket], nil, nil, read_timeout)
    unless ready
      Clacky::Logger.warn("[feishu-ws] read timeout (#{read_timeout}s), reconnecting...")
      return
    end

    data = socket.read_nonblock(4096)
    @incoming << data
    while (frame = @incoming.next)
      case frame.type
      when :binary
        raw = frame.data
        handle_frame(raw.respond_to?(:b) ? raw.b : raw)
      when :text
        handle_frame(frame.data)
      when :ping
        send_raw_frame(:pong, frame.data)
      when :close
        Clacky::Logger.info("[feishu-ws] WebSocket closed by server, will reconnect")
        return
      end
    end
  end
rescue EOFError, IOError, Errno::ECONNRESET, Errno::EPIPE,
       Errno::ETIMEDOUT, OpenSSL::SSL::SSLError => e
  # Let the exception bubble up to start() where it will log and sleep before retry
  raise
ensure
  @ws_open = false
  @ws_socket = nil
  socket&.close rescue nil
  @ping_thread&.kill
end

#fetch_ws_endpointObject



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/clacky/server/channel/adapters/feishu/ws_client.rb', line 142

def fetch_ws_endpoint
  uri = URI.parse("#{@domain}/callback/ws/endpoint")
  http = Net::HTTP.new(uri.host, uri.port)
  http.use_ssl = uri.scheme == "https"

  request = Net::HTTP::Post.new(uri.path)
  request["Content-Type"] = "application/json"
  request["locale"] = "en"
  request.body = JSON.generate({ AppID: @app_id, AppSecret: @app_secret })

  response = http.request(request)
  data = JSON.parse(response.body)

  if data["code"] != 0
    Clacky::Logger.warn("[feishu-ws] Failed to get endpoint: code=#{data["code"]} msg=#{data["msg"]}")
    raise "Failed to get WebSocket endpoint: #{data['msg']}"
  end

  client_config = data.dig("data", "ClientConfig") || {}
  @ping_interval = (client_config["PingInterval"] || 90).to_i

  url = data.dig("data", "URL")
  if url.nil? || url.strip.empty?
    Clacky::Logger.error("[feishu-ws] WebSocket endpoint URL is missing from response. " \
                         "Please verify your Feishu App ID and App Secret are correct.")
    raise "Failed to get WebSocket endpoint: URL is missing (check your Feishu App ID / App Secret)"
  end

  if url =~ /service_id=(\d+)/
    @service_id = $1.to_i
  end
  url
end

#handle_control_frame(frame, msg_type) ⇒ Object



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/clacky/server/channel/adapters/feishu/ws_client.rb', line 195

def handle_control_frame(frame, msg_type)
  case msg_type
  when "ping"
    send_frame(
      seq_id: frame[:seq_id],
      log_id: frame[:log_id],
      service: frame[:service],
      method: 0,
      headers: frame[:headers].merge("type" => "pong")
    )
  when "handshake"
    status = frame[:headers]["handshake-status"]
    if status == "200"
      Clacky::Logger.info("[feishu-ws] Handshake successful")
    else
      Clacky::Logger.warn("[feishu-ws] Handshake failed: #{frame[:headers]['handshake-msg']}")
    end
  end
end

#handle_data_frame(frame, headers) ⇒ Object



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/clacky/server/channel/adapters/feishu/ws_client.rb', line 215

def handle_data_frame(frame, headers)
  return unless headers["type"] == "event"

  payload_bytes = frame[:payload]
  return unless payload_bytes && !payload_bytes.empty?

  event_json = payload_bytes.force_encoding("UTF-8")
  event_data = JSON.parse(event_json)

  # Send ACK response
  send_frame(
    seq_id: frame[:seq_id],
    log_id: frame[:log_id],
    service: frame[:service],
    method: 1,
    headers: frame[:headers],
    payload: JSON.generate({ code: 200 })
  )

  event_type = event_data.dig("header", "event_type") || event_data["type"]
  Clacky::Logger.info("[feishu-ws] Dispatching event: #{event_type}")
  @on_event&.call(event_data)
rescue JSON::ParserError => e
  Clacky::Logger.warn("[feishu-ws] Failed to parse event payload: #{e.message}")
end

#handle_frame(raw) ⇒ Object

Parse and dispatch a Feishu protobuf binary frame



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/clacky/server/channel/adapters/feishu/ws_client.rb', line 177

def handle_frame(raw)
  raw = raw.b if raw.respond_to?(:b)
  frame = ProtoFrame.decode(raw)

  method_type = frame[:method]
  headers = frame[:headers] || {}

  case method_type
  when 0 # control frame
    handle_control_frame(frame, headers["type"])
  when 1 # data frame (event)
    Clacky::Logger.info("[feishu-ws] Received data frame (type=#{headers["type"]})")
    handle_data_frame(frame, headers)
  end
rescue => e
  Clacky::Logger.warn("[feishu-ws] Failed to handle frame: #{e.message}")
end

#send_frame(seq_id:, log_id:, service:, method:, headers:, payload: nil) ⇒ Object



241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/clacky/server/channel/adapters/feishu/ws_client.rb', line 241

def send_frame(seq_id:, log_id:, service:, method:, headers:, payload: nil)
  frame = {
    seq_id: seq_id,
    log_id: log_id,
    service: service,
    method: method,
    headers: headers,
    payload: payload
  }
  encoded = ProtoFrame.encode(frame)
  send_raw_frame(:binary, encoded)
rescue => e
  warn "[feishu-ws] failed to send frame: #{e.message}"
end

#send_raw_frame(type, data) ⇒ Object



256
257
258
259
260
261
262
263
264
# File 'lib/clacky/server/channel/adapters/feishu/ws_client.rb', line 256

def send_raw_frame(type, data)
  return unless @ws_socket && @ws_open
  outgoing = WebSocket::Frame::Outgoing::Client.new(
    version: @ws_version || 13,
    data: data,
    type: type
  )
  @ws_socket.write(outgoing.to_s)
end

#start(&on_event) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/clacky/server/channel/adapters/feishu/ws_client.rb', line 32

def start(&on_event)
  @running = true
  @on_event = on_event
  Clacky::Logger.info("[feishu-ws] Starting WebSocket client (app_id=#{@app_id})")

  while @running
    begin
      connect_and_listen
    rescue => e
      Clacky::Logger.warn("[feishu-ws] Connection error: #{e.message}")
      sleep RECONNECT_DELAY if @running
    end
  end
end

#start_ping_threadObject



266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/clacky/server/channel/adapters/feishu/ws_client.rb', line 266

def start_ping_thread
  @ping_thread&.kill
  @ping_thread = Thread.new do
    loop do
      sleep @ping_interval
      break unless @running
      begin
        @seq_id += 1
        send_frame(
          seq_id: @seq_id,
          log_id: 0,
          service: @service_id,
          method: 0,
          headers: { "type" => "ping" }
        )
      rescue => e
        Clacky::Logger.warn("[feishu-ws] ping failed (#{e.class}: #{e.message}), forcing reconnect")
        # Close the socket so IO.select in the read loop immediately
        # returns nil / read_nonblock raises IOError, triggering reconnect.
        @ws_socket&.close rescue nil
        break
      end
    end
  end
end

#stopObject



47
48
49
50
51
52
# File 'lib/clacky/server/channel/adapters/feishu/ws_client.rb', line 47

def stop
  @running = false
  @ping_thread&.kill
  send_raw_frame(:close, "") rescue nil
  @ws_socket&.close rescue nil
end