Class: Clacky::Channel::Adapters::Weixin::Adapter
- Defined in:
- lib/clacky/server/channel/adapters/weixin/adapter.rb
Overview
Weixin (WeChat iLink) adapter.
Protocol: HTTP long-poll via ilinkai.weixin.qq.com Auth: token obtained from QR login (stored in channels.yml as ‘token`)
Config keys (channels.yml):
token: [String] bot token from QR login
base_url: [String] API base URL (default: https://ilinkai.weixin.qq.com)
allowed_users: [Array<String>] optional whitelist of from_user_id values
Event fields yielded to ChannelManager:
platform: :weixin
chat_id: String (from_user_id — used for replies)
user_id: String (from_user_id)
text: String
files: Array<Hash>
message_id: String
timestamp: Time
chat_type: :direct
context_token: String (must be echoed in every reply)
Constant Summary collapse
- RECONNECT_DELAY =
5- TYPING_KEEPALIVE_INTERVAL =
── Typing keepalive ─────────────────────────────────────────────────sendtyping(status=1) serves dual purpose: maintains typing indicator AND renews the context_token TTL. Official @tencent-weixin/openclaw-weixin npm package uses keepaliveIntervalMs: 5000. We match that exactly.
5- TYPING_TICKET_TTL =
typing_ticket is valid for ~24h; cache and reuse it.
86_400
Class Method Summary collapse
- .env_keys ⇒ Object
- .platform_config(data) ⇒ Object
- .platform_id ⇒ Object
- .set_env_data(data, config) ⇒ Object
- .test_connection(fields) ⇒ Object
Instance Method Summary collapse
-
#context_token_user_ids ⇒ Object
Return all user IDs for which we have a cached context_token.
-
#detect_image_mime(bytes) ⇒ Object
Detect image MIME type from magic bytes.
-
#extract_files(item_list) ⇒ Object
Extract file attachments from item_list for inbound messages.
- #extract_text(item_list) ⇒ Object
-
#fetch_typing_ticket(user_id, context_token) ⇒ Object
Fetch (or return cached) typing_ticket for user_id.
-
#flush_pending(chat_id) ⇒ Object
Force-flush pending text for a chat_id.
-
#initialize(config) ⇒ Adapter
constructor
A new instance of Adapter.
- #lookup_context_token(user_id) ⇒ Object
-
#markdown_to_plain(text) ⇒ Object
Strip markdown syntax for WeChat (no markdown rendering).
- #process_message(msg) ⇒ Object
-
#send_file(chat_id, file_path, name: nil, reply_to: nil) ⇒ Object
Send a file to a user.
-
#send_text(chat_id, text, reply_to: nil) ⇒ Object
Send a plain text reply to a user.
-
#split_message(text, limit: 2000) ⇒ Object
Split text into ≤2000 Unicode character chunks per iLink protocol recommendation.
- #start(&on_message) ⇒ Object
-
#start_typing_keepalive(user_id, context_token) ⇒ Object
Start a background thread that sends sendtyping(1) every TYPING_KEEPALIVE_INTERVAL.
- #stop ⇒ Object
-
#stop_typing_keepalive(user_id) ⇒ Object
Stop keepalive thread and send sendtyping(status=2) to cancel “typing” indicator.
- #store_context_token(user_id, token) ⇒ Object
- #supports_message_updates? ⇒ Boolean
- #validate_config(config) ⇒ Object
Methods inherited from Base
Constructor Details
#initialize(config) ⇒ Adapter
Returns a new instance of Adapter.
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 225 def initialize(config) @config = config @token = config[:token].to_s @base_url = config[:base_url] || ApiClient::DEFAULT_BASE_URL @allowed_users = Array(config[:allowed_users]) @running = false @on_message = nil # In-memory store: user_id → context_token (for reply threading) @context_tokens = {} @ctx_mutex = Mutex.new @api_client = ApiClient.new(base_url: @base_url, token: @token) @send_queue = SendQueue.new(@api_client) # Typing keepalive: user_id → { ticket:, thread:, cached_at: } @typing_tickets = {} @typing_mutex = Mutex.new # Active keepalive threads: user_id → Thread @keepalive_threads = {} @keepalive_mutex = Mutex.new end |
Class Method Details
.env_keys ⇒ Object
195 196 197 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 195 def self.env_keys %w[IM_WEIXIN_TOKEN IM_WEIXIN_BASE_URL IM_WEIXIN_ALLOWED_USERS] end |
.platform_config(data) ⇒ Object
199 200 201 202 203 204 205 206 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 199 def self.platform_config(data) { token: data["IM_WEIXIN_TOKEN"] || data["token"], base_url: data["IM_WEIXIN_BASE_URL"] || data["base_url"] || ApiClient::DEFAULT_BASE_URL, allowed_users: (data["IM_WEIXIN_ALLOWED_USERS"] || data["allowed_users"] || "") .then { |v| v.is_a?(Array) ? v : v.to_s.split(",").map(&:strip).reject(&:empty?) } }.compact end |
.platform_id ⇒ Object
191 192 193 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 191 def self.platform_id :weixin end |
.set_env_data(data, config) ⇒ Object
208 209 210 211 212 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 208 def self.set_env_data(data, config) data["IM_WEIXIN_TOKEN"] = config[:token] data["IM_WEIXIN_BASE_URL"] = config[:base_url] if config[:base_url] data["IM_WEIXIN_ALLOWED_USERS"] = Array(config[:allowed_users]).join(",") end |
.test_connection(fields) ⇒ Object
214 215 216 217 218 219 220 221 222 223 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 214 def self.test_connection(fields) token = fields[:token].to_s.strip return { ok: false, error: "token is required" } if token.empty? # Weixin iLink token is obtained via the QR scan flow and is already # confirmed valid by the iLink API before we store it. There is no # lightweight ping endpoint, so we just verify the token is present. { ok: true, message: "Connected to Weixin iLink" } end |
Instance Method Details
#context_token_user_ids ⇒ Object
Return all user IDs for which we have a cached context_token. Used by ChannelManager#known_users so callers can enumerate users reachable for proactive messaging.
562 563 564 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 562 def context_token_user_ids @ctx_mutex.synchronize { @context_tokens.keys.dup } end |
#detect_image_mime(bytes) ⇒ Object
Detect image MIME type from magic bytes. Falls back to image/jpeg if unknown.
535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 535 def detect_image_mime(bytes) return "image/jpeg" unless bytes && bytes.bytesize >= 4 head = bytes.byteslice(0, 8).bytes if head[0] == 0xFF && head[1] == 0xD8 "image/jpeg" elsif head[0] == 0x89 && head[1] == 0x50 && head[2] == 0x4E && head[3] == 0x47 "image/png" elsif head[0] == 0x47 && head[1] == 0x49 && head[2] == 0x46 "image/gif" elsif head[0] == 0x52 && head[1] == 0x49 && head[2] == 0x46 && head[3] == 0x46 "image/webp" else "image/jpeg" end end |
#extract_files(item_list) ⇒ Object
Extract file attachments from item_list for inbound messages. Returns array of hashes: { type:, name:, cdn_media: } cdn_media contains { encrypt_query_param:, aes_key: } for potential download. Extract and materialize file attachments from an inbound item_list.
Images are downloaded from CDN and converted to data_url so the agent’s vision pipeline (partition_files → resolve_vision_images) picks them up. Files (PDF, DOCX, etc.) are downloaded to clacky-uploads so the agent’s file processing pipeline (process_path) can parse them. Voice/video are kept as cdn_media metadata only (no local download).
Returns Array of Hashes. Image entries:
{ type: :image, name: String, mime_type: String, data_url: String }
File entries (downloaded):
{ type: :file, name: String, path: String }
Voice/video entries:
{ type: :voice/:video, name: String, cdn_media: Hash }
437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 437 def extract_files(item_list) files = [] item_list.each do |item| case item["type"] when 2 # IMAGE — download + convert to data_url for agent vision img = item["image_item"] next unless img cdn_media = img["media"] next unless cdn_media # Protocol: image_item may have a top-level aeskey field that overrides # the one inside media. Use image_item.aeskey first, fall back to media.aes_key. top_level_aeskey = img["aeskey"] effective_cdn_media = if top_level_aeskey && !top_level_aeskey.empty? cdn_media.merge("aes_key" => top_level_aeskey) else cdn_media end Clacky::Logger.debug("[WeixinAdapter] image cdn_media: #{effective_cdn_media.to_json}") begin raw_bytes = @api_client.download_media(effective_cdn_media, ApiClient::MEDIA_TYPE_IMAGE) mime_type = detect_image_mime(raw_bytes) data_url = "data:#{mime_type};base64,#{Base64.strict_encode64(raw_bytes)}" files << { type: :image, name: "image.jpg", mime_type: mime_type, data_url: data_url } rescue => e Clacky::Logger.warn("[WeixinAdapter] Failed to download image: #{e.}\n#{e.backtrace.first(3).join("\n")}") end when 3 # VOICE v = item["voice_item"] next unless v files << { type: :voice, name: "voice.amr", cdn_media: v["media"] } when 4 # FILE — download to disk so agent can parse it fi = item["file_item"] next unless fi cdn_media = fi["media"] file_name = fi["file_name"].to_s file_name = "attachment" if file_name.empty? file_md5 = fi["md5"].to_s file_len = fi["len"].to_s if cdn_media begin raw_bytes = @api_client.download_media(cdn_media, ApiClient::MEDIA_TYPE_FILE) saved = Clacky::Utils::FileProcessor.save(body: raw_bytes, filename: file_name) Clacky::Logger.info("[WeixinAdapter] file downloaded to #{saved[:path]} (#{raw_bytes.bytesize} bytes)") files << { type: :file, name: saved[:name], path: saved[:path], md5: file_md5.empty? ? nil : file_md5, len: file_len.empty? ? nil : file_len } rescue => e Clacky::Logger.warn("[WeixinAdapter] Failed to download file #{file_name}: #{e.}\n#{e.backtrace.first(3).join("\n")}") # Fall back to metadata-only so the agent at least knows a file was attached files << { type: :file, name: file_name, cdn_media: cdn_media, md5: file_md5.empty? ? nil : file_md5, len: file_len.empty? ? nil : file_len } end else files << { type: :file, name: file_name, md5: file_md5.empty? ? nil : file_md5, len: file_len.empty? ? nil : file_len } end when 5 # VIDEO vi = item["video_item"] next unless vi files << { type: :video, name: "video.mp4", cdn_media: vi["media"] } end end files end |
#extract_text(item_list) ⇒ Object
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 395 def extract_text(item_list) parts = [] item_list.each do |item| case item["type"] when 1 # TEXT raw_text = item.dig("text_item", "text").to_s.strip ref = item["ref_msg"] if ref && !ref.empty? ref_parts = [] ref_parts << ref["title"] if ref["title"] && !ref["title"].empty? if (ri = ref["message_item"]) && ri["type"] == 1 rt = ri.dig("text_item", "text").to_s.strip ref_parts << rt unless rt.empty? end parts << "[引用: #{ref_parts.join(" | ")}]" unless ref_parts.empty? end parts << raw_text unless raw_text.empty? when 3 # VOICE — use transcription if available vt = item.dig("voice_item", "text").to_s.strip parts << vt unless vt.empty? end end parts.join("\n") end |
#fetch_typing_ticket(user_id, context_token) ⇒ Object
Fetch (or return cached) typing_ticket for user_id. Returns nil on failure — keepalive will just skip without crashing.
610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 610 def fetch_typing_ticket(user_id, context_token) @typing_mutex.synchronize do entry = @typing_tickets[user_id] if entry && (Time.now.to_i - entry[:cached_at]) < TYPING_TICKET_TTL return entry[:ticket] end end ticket = @api_client.get_typing_ticket( ilink_user_id: user_id, context_token: context_token ) return nil if ticket.empty? @typing_mutex.synchronize do @typing_tickets[user_id] = { ticket: ticket, cached_at: Time.now.to_i } end ticket rescue => e Clacky::Logger.warn("[WeixinAdapter] getconfig failed for #{user_id}: #{e.}") nil end |
#flush_pending(chat_id) ⇒ Object
Force-flush pending text for a chat_id. Called before sending files or on task completion.
311 312 313 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 311 def flush_pending(chat_id) @send_queue.flush(chat_id) end |
#lookup_context_token(user_id) ⇒ Object
555 556 557 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 555 def lookup_context_token(user_id) @ctx_mutex.synchronize { @context_tokens[user_id] } end |
#markdown_to_plain(text) ⇒ Object
Strip markdown syntax for WeChat (no markdown rendering).
586 587 588 589 590 591 592 593 594 595 596 597 598 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 586 def markdown_to_plain(text) r = text.dup r.gsub!(/```[^\n]*\n?([\s\S]*?)```/) { Regexp.last_match(1).strip } r.gsub!(/!\[[^\]]*\]\([^)]*\)/, "") r.gsub!(/\[([^\]]+)\]\([^)]*\)/, '\\1') r.gsub!(/\*\*([^*]+)\*\*/, '\\1') r.gsub!(/\*([^*]+)\*/, '\\1') r.gsub!(/__([^_]+)__/, '\\1') r.gsub!(/_([^_]+)_/, '\\1') r.gsub!(/^#+\s+/, "") r.gsub!(/^[-*_]{3,}\s*$/, "") r.strip end |
#process_message(msg) ⇒ Object
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 350 def (msg) # Only process inbound USER messages (message_type 1 = USER) return unless msg["message_type"] == 1 from_user_id = msg["from_user_id"].to_s context_token = msg["context_token"].to_s return if from_user_id.empty? || context_token.empty? if @allowed_users.any? && !@allowed_users.include?(from_user_id) Clacky::Logger.debug("[WeixinAdapter] ignoring message from #{from_user_id} (not in allowed_users)") return end # Cache context_token — needed when sending replies store_context_token(from_user_id, context_token) item_list = msg["item_list"] || [] Clacky::Logger.debug("[WeixinAdapter] item_list raw: #{item_list.to_json}") text = extract_text(item_list) files = extract_files(item_list) # Require at least some content (text or files) return if text.strip.empty? && files.empty? event = { type: :message, platform: :weixin, chat_id: from_user_id, user_id: from_user_id, text: text.strip, files: files, message_id: msg["message_id"]&.to_s, timestamp: msg["create_time_ms"] ? Time.at(msg["create_time_ms"] / 1000.0) : Time.now, chat_type: :direct, context_token: context_token, raw: msg } log_parts = [] log_parts << text.slice(0, 80) unless text.strip.empty? log_parts << "#{files.size} file(s)" unless files.empty? Clacky::Logger.info("[WeixinAdapter] message from #{from_user_id}: #{log_parts.join(" + ")}") @on_message&.call(event) end |
#send_file(chat_id, file_path, name: nil, reply_to: nil) ⇒ Object
Send a file to a user. file_path: local path to the file to send file_name: optional display name (defaults to basename)
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 318 def send_file(chat_id, file_path, name: nil, reply_to: nil) ctoken = lookup_context_token(chat_id) unless ctoken Clacky::Logger.warn("[WeixinAdapter] send_file: no context_token for #{chat_id}, dropping") return { message_id: nil } end @send_queue.flush(chat_id) @api_client.send_file( to_user_id: chat_id, file_path: file_path, file_name: name || File.basename(file_path), context_token: ctoken ) { message_id: nil } rescue => e Clacky::Logger.error("[WeixinAdapter] send_file failed for #{chat_id}: #{e.}") { message_id: nil } end |
#send_text(chat_id, text, reply_to: nil) ⇒ Object
Send a plain text reply to a user. The context_token from the inbound message is required by the Weixin protocol. Text is enqueued and sent in batches by the background flusher to avoid rate-limiting.
296 297 298 299 300 301 302 303 304 305 306 307 308 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 296 def send_text(chat_id, text, reply_to: nil) ctoken = lookup_context_token(chat_id) unless ctoken Clacky::Logger.warn("[WeixinAdapter] send_text: no context_token for #{chat_id}, dropping message") return { message_id: nil } end plain = markdown_to_plain(text) return { message_id: nil } if plain.empty? @send_queue.enqueue(chat_id, plain, ctoken) { message_id: nil } end |
#split_message(text, limit: 2000) ⇒ Object
Split text into ≤2000 Unicode character chunks per iLink protocol recommendation. Priority: split at nn, then n, then space, then hard cut.
568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 568 def (text, limit: 2000) return [text] if text.chars.length <= limit chunks = [] while text.chars.length > limit window = text.chars.first(limit).join # Prefer double-newline boundary cut = window.rindex("\n\n") cut = window.rindex("\n") if cut.nil? cut = window.rindex(" ") if cut.nil? cut = limit if cut.nil? || cut.zero? chunks << text.chars.first(cut).join.rstrip text = text.chars.drop(cut).join.lstrip end chunks << text unless text.empty? chunks end |
#start(&on_message) ⇒ Object
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 245 def start(&) @running = true @on_message = get_updates_buf = "" consecutive_errors = 0 Clacky::Logger.info("[WeixinAdapter] starting long-poll (base_url=#{@base_url})") while @running begin resp = @api_client.get_updates(get_updates_buf: get_updates_buf) consecutive_errors = 0 new_buf = resp["get_updates_buf"].to_s get_updates_buf = new_buf unless new_buf.empty? (resp["msgs"] || []).each do |msg| (msg) rescue => e Clacky::Logger.warn("[WeixinAdapter] process_message error: #{e.}") end rescue ApiClient::TimeoutError # Long-poll server-side timeout is expected — just retry rescue ApiClient::ApiError => e if e.code == ApiClient::SESSION_EXPIRED_ERRCODE Clacky::Logger.warn("[WeixinAdapter] Session expired (token may need refresh), backing off 60s") sleep 60 else consecutive_errors += 1 Clacky::Logger.warn("[WeixinAdapter] API error #{e.code}: #{e.}") sleep(consecutive_errors > 3 ? 30 : RECONNECT_DELAY) end rescue => e consecutive_errors += 1 Clacky::Logger.error("[WeixinAdapter] poll error: #{e.}") break unless @running sleep(consecutive_errors > 3 ? 30 : RECONNECT_DELAY) end end end |
#start_typing_keepalive(user_id, context_token) ⇒ Object
Start a background thread that sends sendtyping(1) every TYPING_KEEPALIVE_INTERVAL. Any existing keepalive for this user is stopped first.
635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 635 def start_typing_keepalive(user_id, context_token) stop_typing_keepalive(user_id) ticket = fetch_typing_ticket(user_id, context_token) unless ticket Clacky::Logger.debug("[WeixinAdapter] no typing_ticket for #{user_id}, skipping keepalive") return end thread = Thread.new do loop do begin @api_client.send_typing( ilink_user_id: user_id, typing_ticket: ticket, status: 1 ) Clacky::Logger.debug("[WeixinAdapter] typing keepalive sent for #{user_id}") rescue => e Clacky::Logger.debug("[WeixinAdapter] typing keepalive error: #{e.}") end sleep TYPING_KEEPALIVE_INTERVAL end end @keepalive_mutex.synchronize { @keepalive_threads[user_id] = thread } Clacky::Logger.debug("[WeixinAdapter] typing keepalive started for #{user_id}") end |
#stop ⇒ Object
288 289 290 291 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 288 def stop @running = false @send_queue.stop end |
#stop_typing_keepalive(user_id) ⇒ Object
Stop keepalive thread and send sendtyping(status=2) to cancel “typing” indicator.
665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 665 def stop_typing_keepalive(user_id) thread = @keepalive_mutex.synchronize { @keepalive_threads.delete(user_id) } return unless thread thread.kill thread.join(1) ticket = @typing_mutex.synchronize { @typing_tickets.dig(user_id, :ticket) } if ticket begin @api_client.send_typing( ilink_user_id: user_id, typing_ticket: ticket, status: 2 ) rescue => e Clacky::Logger.debug("[WeixinAdapter] stop typing error: #{e.}") end end Clacky::Logger.debug("[WeixinAdapter] typing keepalive stopped for #{user_id}") end |
#store_context_token(user_id, token) ⇒ Object
551 552 553 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 551 def store_context_token(user_id, token) @ctx_mutex.synchronize { @context_tokens[user_id] = token } end |
#supports_message_updates? ⇒ Boolean
345 346 347 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 345 def false end |
#validate_config(config) ⇒ Object
339 340 341 342 343 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 339 def validate_config(config) errors = [] errors << "token is required" if config[:token].nil? || config[:token].to_s.strip.empty? errors end |