Class: Clacky::Channel::Adapters::Weixin::Adapter
- Defined in:
- lib/clacky/server/channel/adapters/weixin/adapter.rb
Overview
Weixin (WeChat iLink) adapter.
Protocol: HTTP long-poll via ilinkai.weixin.qq.com Auth: token obtained from QR login (stored in channels.yml as ‘token`)
Config keys (channels.yml):
token: [String] bot token from QR login
base_url: [String] API base URL (default: https://ilinkai.weixin.qq.com)
allowed_users: [Array<String>] optional whitelist of from_user_id values
Event fields yielded to ChannelManager:
platform: :weixin
chat_id: String (from_user_id — used for replies)
user_id: String (from_user_id)
text: String
files: Array<Hash>
message_id: String
timestamp: Time
chat_type: :direct
context_token: String (must be echoed in every reply)
Constant Summary collapse
- RECONNECT_DELAY =
5- TYPING_KEEPALIVE_INTERVAL =
── Typing keepalive ─────────────────────────────────────────────────sendtyping(status=1) serves dual purpose: maintains typing indicator AND renews the context_token TTL. Official @tencent-weixin/openclaw-weixin npm package uses keepaliveIntervalMs: 5000. We match that exactly.
5- TYPING_TICKET_TTL =
typing_ticket is valid for ~24h; cache and reuse it.
86_400
Class Method Summary collapse
- .env_keys ⇒ Object
- .platform_config(data) ⇒ Object
- .platform_id ⇒ Object
- .set_env_data(data, config) ⇒ Object
- .test_connection(fields) ⇒ Object
Instance Method Summary collapse
-
#detect_image_mime(bytes) ⇒ Object
Detect image MIME type from magic bytes.
-
#extract_files(item_list) ⇒ Object
Extract file attachments from item_list for inbound messages.
- #extract_text(item_list) ⇒ Object
-
#fetch_typing_ticket(user_id, context_token) ⇒ Object
Fetch (or return cached) typing_ticket for user_id.
-
#initialize(config) ⇒ Adapter
constructor
A new instance of Adapter.
- #lookup_context_token(user_id) ⇒ Object
-
#markdown_to_plain(text) ⇒ Object
Strip markdown syntax for WeChat (no markdown rendering).
- #process_message(msg) ⇒ Object
-
#send_file(chat_id, file_path, name: nil, reply_to: nil) ⇒ Object
Send a file to a user.
-
#send_text(chat_id, text, reply_to: nil) ⇒ Object
Send a plain text reply to a user.
-
#split_message(text, limit: 2000) ⇒ Object
Split text into ≤2000 Unicode character chunks per iLink protocol recommendation.
- #start(&on_message) ⇒ Object
-
#start_typing_keepalive(user_id, context_token) ⇒ Object
Start a background thread that sends sendtyping(1) every TYPING_KEEPALIVE_INTERVAL.
- #stop ⇒ Object
-
#stop_typing_keepalive(user_id) ⇒ Object
Stop keepalive thread and send sendtyping(status=2) to cancel “typing” indicator.
- #store_context_token(user_id, token) ⇒ Object
- #supports_message_updates? ⇒ Boolean
- #validate_config(config) ⇒ Object
Methods inherited from Base
Constructor Details
#initialize(config) ⇒ Adapter
Returns a new instance of Adapter.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 68 def initialize(config) @config = config @token = config[:token].to_s @base_url = config[:base_url] || ApiClient::DEFAULT_BASE_URL @allowed_users = Array(config[:allowed_users]) @running = false @on_message = nil # In-memory store: user_id → context_token (for reply threading) @context_tokens = {} @ctx_mutex = Mutex.new @api_client = ApiClient.new(base_url: @base_url, token: @token) # Typing keepalive: user_id → { ticket:, thread:, cached_at: } @typing_tickets = {} @typing_mutex = Mutex.new # Active keepalive threads: user_id → Thread @keepalive_threads = {} @keepalive_mutex = Mutex.new end |
Class Method Details
.env_keys ⇒ Object
38 39 40 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 38 def self.env_keys %w[IM_WEIXIN_TOKEN IM_WEIXIN_BASE_URL IM_WEIXIN_ALLOWED_USERS] end |
.platform_config(data) ⇒ Object
42 43 44 45 46 47 48 49 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 42 def self.platform_config(data) { token: data["IM_WEIXIN_TOKEN"] || data["token"], base_url: data["IM_WEIXIN_BASE_URL"] || data["base_url"] || ApiClient::DEFAULT_BASE_URL, allowed_users: (data["IM_WEIXIN_ALLOWED_USERS"] || data["allowed_users"] || "") .then { |v| v.is_a?(Array) ? v : v.to_s.split(",").map(&:strip).reject(&:empty?) } }.compact end |
.platform_id ⇒ Object
34 35 36 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 34 def self.platform_id :weixin end |
.set_env_data(data, config) ⇒ Object
51 52 53 54 55 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 51 def self.set_env_data(data, config) data["IM_WEIXIN_TOKEN"] = config[:token] data["IM_WEIXIN_BASE_URL"] = config[:base_url] if config[:base_url] data["IM_WEIXIN_ALLOWED_USERS"] = Array(config[:allowed_users]).join(",") end |
.test_connection(fields) ⇒ Object
57 58 59 60 61 62 63 64 65 66 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 57 def self.test_connection(fields) token = fields[:token].to_s.strip return { ok: false, error: "token is required" } if token.empty? # Weixin iLink token is obtained via the QR scan flow and is already # confirmed valid by the iLink API before we store it. There is no # lightweight ping endpoint, so we just verify the token is present. { ok: true, message: "Connected to Weixin iLink" } end |
Instance Method Details
#detect_image_mime(bytes) ⇒ Object
Detect image MIME type from magic bytes. Falls back to image/jpeg if unknown.
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 372 def detect_image_mime(bytes) return "image/jpeg" unless bytes && bytes.bytesize >= 4 head = bytes.byteslice(0, 8).bytes if head[0] == 0xFF && head[1] == 0xD8 "image/jpeg" elsif head[0] == 0x89 && head[1] == 0x50 && head[2] == 0x4E && head[3] == 0x47 "image/png" elsif head[0] == 0x47 && head[1] == 0x49 && head[2] == 0x46 "image/gif" elsif head[0] == 0x52 && head[1] == 0x49 && head[2] == 0x46 && head[3] == 0x46 "image/webp" else "image/jpeg" end end |
#extract_files(item_list) ⇒ Object
Extract file attachments from item_list for inbound messages. Returns array of hashes: { type:, name:, cdn_media: } cdn_media contains { encrypt_query_param:, aes_key: } for potential download. Extract and materialize file attachments from an inbound item_list.
Images are downloaded from CDN and converted to data_url so the agent’s vision pipeline (partition_files → resolve_vision_images) picks them up. Files (PDF, DOCX, etc.) are downloaded to clacky-uploads so the agent’s file processing pipeline (process_path) can parse them. Voice/video are kept as cdn_media metadata only (no local download).
Returns Array of Hashes. Image entries:
{ type: :image, name: String, mime_type: String, data_url: String }
File entries (downloaded):
{ type: :file, name: String, path: String }
Voice/video entries:
{ type: :voice/:video, name: String, cdn_media: Hash }
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 274 def extract_files(item_list) files = [] item_list.each do |item| case item["type"] when 2 # IMAGE — download + convert to data_url for agent vision img = item["image_item"] next unless img cdn_media = img["media"] next unless cdn_media # Protocol: image_item may have a top-level aeskey field that overrides # the one inside media. Use image_item.aeskey first, fall back to media.aes_key. top_level_aeskey = img["aeskey"] effective_cdn_media = if top_level_aeskey && !top_level_aeskey.empty? cdn_media.merge("aes_key" => top_level_aeskey) else cdn_media end Clacky::Logger.debug("[WeixinAdapter] image cdn_media: #{effective_cdn_media.to_json}") begin raw_bytes = @api_client.download_media(effective_cdn_media, ApiClient::MEDIA_TYPE_IMAGE) mime_type = detect_image_mime(raw_bytes) data_url = "data:#{mime_type};base64,#{Base64.strict_encode64(raw_bytes)}" files << { type: :image, name: "image.jpg", mime_type: mime_type, data_url: data_url } rescue => e Clacky::Logger.warn("[WeixinAdapter] Failed to download image: #{e.}\n#{e.backtrace.first(3).join("\n")}") end when 3 # VOICE v = item["voice_item"] next unless v files << { type: :voice, name: "voice.amr", cdn_media: v["media"] } when 4 # FILE — download to disk so agent can parse it fi = item["file_item"] next unless fi cdn_media = fi["media"] file_name = fi["file_name"].to_s file_name = "attachment" if file_name.empty? file_md5 = fi["md5"].to_s file_len = fi["len"].to_s if cdn_media begin raw_bytes = @api_client.download_media(cdn_media, ApiClient::MEDIA_TYPE_FILE) saved = Clacky::Utils::FileProcessor.save(body: raw_bytes, filename: file_name) Clacky::Logger.info("[WeixinAdapter] file downloaded to #{saved[:path]} (#{raw_bytes.bytesize} bytes)") files << { type: :file, name: saved[:name], path: saved[:path], md5: file_md5.empty? ? nil : file_md5, len: file_len.empty? ? nil : file_len } rescue => e Clacky::Logger.warn("[WeixinAdapter] Failed to download file #{file_name}: #{e.}\n#{e.backtrace.first(3).join("\n")}") # Fall back to metadata-only so the agent at least knows a file was attached files << { type: :file, name: file_name, cdn_media: cdn_media, md5: file_md5.empty? ? nil : file_md5, len: file_len.empty? ? nil : file_len } end else files << { type: :file, name: file_name, md5: file_md5.empty? ? nil : file_md5, len: file_len.empty? ? nil : file_len } end when 5 # VIDEO vi = item["video_item"] next unless vi files << { type: :video, name: "video.mp4", cdn_media: vi["media"] } end end files end |
#extract_text(item_list) ⇒ Object
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 232 def extract_text(item_list) parts = [] item_list.each do |item| case item["type"] when 1 # TEXT raw_text = item.dig("text_item", "text").to_s.strip ref = item["ref_msg"] if ref && !ref.empty? ref_parts = [] ref_parts << ref["title"] if ref["title"] && !ref["title"].empty? if (ri = ref["message_item"]) && ri["type"] == 1 rt = ri.dig("text_item", "text").to_s.strip ref_parts << rt unless rt.empty? end parts << "[引用: #{ref_parts.join(" | ")}]" unless ref_parts.empty? end parts << raw_text unless raw_text.empty? when 3 # VOICE — use transcription if available vt = item.dig("voice_item", "text").to_s.strip parts << vt unless vt.empty? end end parts.join("\n") end |
#fetch_typing_ticket(user_id, context_token) ⇒ Object
Fetch (or return cached) typing_ticket for user_id. Returns nil on failure — keepalive will just skip without crashing.
440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 440 def fetch_typing_ticket(user_id, context_token) @typing_mutex.synchronize do entry = @typing_tickets[user_id] if entry && (Time.now.to_i - entry[:cached_at]) < TYPING_TICKET_TTL return entry[:ticket] end end ticket = @api_client.get_typing_ticket( ilink_user_id: user_id, context_token: context_token ) return nil if ticket.empty? @typing_mutex.synchronize do @typing_tickets[user_id] = { ticket: ticket, cached_at: Time.now.to_i } end ticket rescue => e Clacky::Logger.warn("[WeixinAdapter] getconfig failed for #{user_id}: #{e.}") nil end |
#lookup_context_token(user_id) ⇒ Object
392 393 394 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 392 def lookup_context_token(user_id) @ctx_mutex.synchronize { @context_tokens[user_id] } end |
#markdown_to_plain(text) ⇒ Object
Strip markdown syntax for WeChat (no markdown rendering).
416 417 418 419 420 421 422 423 424 425 426 427 428 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 416 def markdown_to_plain(text) r = text.dup r.gsub!(/```[^\n]*\n?([\s\S]*?)```/) { Regexp.last_match(1).strip } r.gsub!(/!\[[^\]]*\]\([^)]*\)/, "") r.gsub!(/\[([^\]]+)\]\([^)]*\)/, '') r.gsub!(/\*\*([^*]+)\*\*/, '') r.gsub!(/\*([^*]+)\*/, '') r.gsub!(/__([^_]+)__/, '') r.gsub!(/_([^_]+)_/, '') r.gsub!(/^#+\s+/, "") r.gsub!(/^[-*_]{3,}\s*$/, "") r.strip end |
#process_message(msg) ⇒ Object
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 187 def (msg) # Only process inbound USER messages (message_type 1 = USER) return unless msg["message_type"] == 1 from_user_id = msg["from_user_id"].to_s context_token = msg["context_token"].to_s return if from_user_id.empty? || context_token.empty? if @allowed_users.any? && !@allowed_users.include?(from_user_id) Clacky::Logger.debug("[WeixinAdapter] ignoring message from #{from_user_id} (not in allowed_users)") return end # Cache context_token — needed when sending replies store_context_token(from_user_id, context_token) item_list = msg["item_list"] || [] Clacky::Logger.debug("[WeixinAdapter] item_list raw: #{item_list.to_json}") text = extract_text(item_list) files = extract_files(item_list) # Require at least some content (text or files) return if text.strip.empty? && files.empty? event = { type: :message, platform: :weixin, chat_id: from_user_id, user_id: from_user_id, text: text.strip, files: files, message_id: msg["message_id"]&.to_s, timestamp: msg["create_time_ms"] ? Time.at(msg["create_time_ms"] / 1000.0) : Time.now, chat_type: :direct, context_token: context_token, raw: msg } log_parts = [] log_parts << text.slice(0, 80) unless text.strip.empty? log_parts << "#{files.size} file(s)" unless files.empty? Clacky::Logger.info("[WeixinAdapter] message from #{from_user_id}: #{log_parts.join(" + ")}") @on_message&.call(event) end |
#send_file(chat_id, file_path, name: nil, reply_to: nil) ⇒ Object
Send a file to a user. file_path: local path to the file to send file_name: optional display name (defaults to basename)
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 157 def send_file(chat_id, file_path, name: nil, reply_to: nil) ctoken = lookup_context_token(chat_id) unless ctoken Clacky::Logger.warn("[WeixinAdapter] send_file: no context_token for #{chat_id}, dropping") return { message_id: nil } end @api_client.send_file( to_user_id: chat_id, file_path: file_path, file_name: name || File.basename(file_path), context_token: ctoken ) { message_id: nil } rescue => e Clacky::Logger.error("[WeixinAdapter] send_file failed for #{chat_id}: #{e.}") { message_id: nil } end |
#send_text(chat_id, text, reply_to: nil) ⇒ Object
Send a plain text reply to a user. The context_token from the inbound message is required by the Weixin protocol.
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 136 def send_text(chat_id, text, reply_to: nil) ctoken = lookup_context_token(chat_id) unless ctoken Clacky::Logger.warn("[WeixinAdapter] send_text: no context_token for #{chat_id}, dropping message") return { message_id: nil } end plain = markdown_to_plain(text) (plain).each do |chunk| @api_client.send_text(to_user_id: chat_id, text: chunk, context_token: ctoken) end { message_id: nil } rescue => e Clacky::Logger.error("[WeixinAdapter] send_text failed for #{chat_id} (context_token=#{lookup_context_token(chat_id).to_s.slice(0, 20)}...): #{e.}") { message_id: nil } end |
#split_message(text, limit: 2000) ⇒ Object
Split text into ≤2000 Unicode character chunks per iLink protocol recommendation. Priority: split at nn, then n, then space, then hard cut.
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 398 def (text, limit: 2000) return [text] if text.chars.length <= limit chunks = [] while text.chars.length > limit window = text.chars.first(limit).join # Prefer double-newline boundary cut = window.rindex("\n\n") cut = window.rindex("\n") if cut.nil? cut = window.rindex(" ") if cut.nil? cut = limit if cut.nil? || cut.zero? chunks << text.chars.first(cut).join.rstrip text = text.chars.drop(cut).join.lstrip end chunks << text unless text.empty? chunks end |
#start(&on_message) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 87 def start(&) @running = true @on_message = get_updates_buf = "" consecutive_errors = 0 Clacky::Logger.info("[WeixinAdapter] starting long-poll (base_url=#{@base_url})") while @running begin resp = @api_client.get_updates(get_updates_buf: get_updates_buf) consecutive_errors = 0 new_buf = resp["get_updates_buf"].to_s get_updates_buf = new_buf unless new_buf.empty? (resp["msgs"] || []).each do |msg| (msg) rescue => e Clacky::Logger.warn("[WeixinAdapter] process_message error: #{e.}") end rescue ApiClient::TimeoutError # Long-poll server-side timeout is expected — just retry rescue ApiClient::ApiError => e if e.code == ApiClient::SESSION_EXPIRED_ERRCODE Clacky::Logger.warn("[WeixinAdapter] Session expired (token may need refresh), backing off 60s") sleep 60 else consecutive_errors += 1 Clacky::Logger.warn("[WeixinAdapter] API error #{e.code}: #{e.}") sleep(consecutive_errors > 3 ? 30 : RECONNECT_DELAY) end rescue => e consecutive_errors += 1 Clacky::Logger.error("[WeixinAdapter] poll error: #{e.}") break unless @running sleep(consecutive_errors > 3 ? 30 : RECONNECT_DELAY) end end end |
#start_typing_keepalive(user_id, context_token) ⇒ Object
Start a background thread that sends sendtyping(1) every TYPING_KEEPALIVE_INTERVAL. Any existing keepalive for this user is stopped first.
465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 465 def start_typing_keepalive(user_id, context_token) stop_typing_keepalive(user_id) ticket = fetch_typing_ticket(user_id, context_token) unless ticket Clacky::Logger.debug("[WeixinAdapter] no typing_ticket for #{user_id}, skipping keepalive") return end thread = Thread.new do loop do begin @api_client.send_typing( ilink_user_id: user_id, typing_ticket: ticket, status: 1 ) Clacky::Logger.debug("[WeixinAdapter] typing keepalive sent for #{user_id}") rescue => e Clacky::Logger.debug("[WeixinAdapter] typing keepalive error: #{e.}") end sleep TYPING_KEEPALIVE_INTERVAL end end @keepalive_mutex.synchronize { @keepalive_threads[user_id] = thread } Clacky::Logger.debug("[WeixinAdapter] typing keepalive started for #{user_id}") end |
#stop ⇒ Object
130 131 132 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 130 def stop @running = false end |
#stop_typing_keepalive(user_id) ⇒ Object
Stop keepalive thread and send sendtyping(status=2) to cancel “typing” indicator.
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 495 def stop_typing_keepalive(user_id) thread = @keepalive_mutex.synchronize { @keepalive_threads.delete(user_id) } return unless thread thread.kill thread.join(1) ticket = @typing_mutex.synchronize { @typing_tickets.dig(user_id, :ticket) } if ticket begin @api_client.send_typing( ilink_user_id: user_id, typing_ticket: ticket, status: 2 ) rescue => e Clacky::Logger.debug("[WeixinAdapter] stop typing error: #{e.}") end end Clacky::Logger.debug("[WeixinAdapter] typing keepalive stopped for #{user_id}") end |
#store_context_token(user_id, token) ⇒ Object
388 389 390 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 388 def store_context_token(user_id, token) @ctx_mutex.synchronize { @context_tokens[user_id] = token } end |
#supports_message_updates? ⇒ Boolean
182 183 184 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 182 def false end |
#validate_config(config) ⇒ Object
176 177 178 179 180 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 176 def validate_config(config) errors = [] errors << "token is required" if config[:token].nil? || config[:token].to_s.strip.empty? errors end |