Class: Clacky::Channel::Adapters::Weixin::Adapter

Inherits:
Base
  • Object
show all
Defined in:
lib/clacky/server/channel/adapters/weixin/adapter.rb

Overview

Weixin (WeChat iLink) adapter.

Protocol: HTTP long-poll via ilinkai.weixin.qq.com Auth: token obtained from QR login (stored in channels.yml as ‘token`)

Config keys (channels.yml):

token:         [String] bot token from QR login
base_url:      [String] API base URL (default: https://ilinkai.weixin.qq.com)
allowed_users: [Array<String>] optional whitelist of from_user_id values

Event fields yielded to ChannelManager:

platform:      :weixin
chat_id:       String (from_user_id — used for replies)
user_id:       String (from_user_id)
text:          String
files:         Array<Hash>
message_id:    String
timestamp:     Time
chat_type:     :direct
context_token: String (must be echoed in every reply)

Constant Summary collapse

RECONNECT_DELAY =
5
TYPING_KEEPALIVE_INTERVAL =

── Typing keepalive ─────────────────────────────────────────────────sendtyping(status=1) serves dual purpose: maintains typing indicator AND renews the context_token TTL. Official @tencent-weixin/openclaw-weixin npm package uses keepaliveIntervalMs: 5000. We match that exactly.

5
TYPING_TICKET_TTL =

typing_ticket is valid for ~24h; cache and reuse it.

86_400

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#platform_id, #update_message

Constructor Details

#initialize(config) ⇒ Adapter

Returns a new instance of Adapter.



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 225

def initialize(config)
  @config        = config
  @token         = config[:token].to_s
  @base_url      = config[:base_url] || ApiClient::DEFAULT_BASE_URL
  @allowed_users = Array(config[:allowed_users])
  @running       = false
  @on_message    = nil
  # In-memory store: user_id → context_token (for reply threading)
  @context_tokens = {}
  @ctx_mutex      = Mutex.new
  @api_client     = ApiClient.new(base_url: @base_url, token: @token)
  @send_queue     = SendQueue.new(@api_client)
  # Typing keepalive: user_id → { ticket:, thread:, cached_at: }
  @typing_tickets  = {}
  @typing_mutex    = Mutex.new
  # Active keepalive threads: user_id → Thread
  @keepalive_threads = {}
  @keepalive_mutex   = Mutex.new
end

Class Method Details

.env_keysObject



195
196
197
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 195

def self.env_keys
  %w[IM_WEIXIN_TOKEN IM_WEIXIN_BASE_URL IM_WEIXIN_ALLOWED_USERS]
end

.platform_config(data) ⇒ Object



199
200
201
202
203
204
205
206
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 199

def self.platform_config(data)
  {
    token:         data["IM_WEIXIN_TOKEN"] || data["token"],
    base_url:      data["IM_WEIXIN_BASE_URL"] || data["base_url"] || ApiClient::DEFAULT_BASE_URL,
    allowed_users: (data["IM_WEIXIN_ALLOWED_USERS"] || data["allowed_users"] || "")
                     .then { |v| v.is_a?(Array) ? v : v.to_s.split(",").map(&:strip).reject(&:empty?) }
  }.compact
end

.platform_idObject



191
192
193
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 191

def self.platform_id
  :weixin
end

.set_env_data(data, config) ⇒ Object



208
209
210
211
212
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 208

def self.set_env_data(data, config)
  data["IM_WEIXIN_TOKEN"]         = config[:token]
  data["IM_WEIXIN_BASE_URL"]      = config[:base_url] if config[:base_url]
  data["IM_WEIXIN_ALLOWED_USERS"] = Array(config[:allowed_users]).join(",")
end

.test_connection(fields) ⇒ Object



214
215
216
217
218
219
220
221
222
223
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 214

def self.test_connection(fields)
  token = fields[:token].to_s.strip

  return { ok: false, error: "token is required" } if token.empty?

  # Weixin iLink token is obtained via the QR scan flow and is already
  # confirmed valid by the iLink API before we store it. There is no
  # lightweight ping endpoint, so we just verify the token is present.
  { ok: true, message: "Connected to Weixin iLink" }
end

Instance Method Details

#context_token_user_idsObject

Return all user IDs for which we have a cached context_token. Used by ChannelManager#known_users so callers can enumerate users reachable for proactive messaging.



562
563
564
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 562

def context_token_user_ids
  @ctx_mutex.synchronize { @context_tokens.keys.dup }
end

#detect_image_mime(bytes) ⇒ Object

Detect image MIME type from magic bytes. Falls back to image/jpeg if unknown.



535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 535

def detect_image_mime(bytes)
  return "image/jpeg"  unless bytes && bytes.bytesize >= 4
  head = bytes.byteslice(0, 8).bytes
  if head[0] == 0xFF && head[1] == 0xD8
    "image/jpeg"
  elsif head[0] == 0x89 && head[1] == 0x50 && head[2] == 0x4E && head[3] == 0x47
    "image/png"
  elsif head[0] == 0x47 && head[1] == 0x49 && head[2] == 0x46
    "image/gif"
  elsif head[0] == 0x52 && head[1] == 0x49 && head[2] == 0x46 && head[3] == 0x46
    "image/webp"
  else
    "image/jpeg"
  end
end

#extract_files(item_list) ⇒ Object

Extract file attachments from item_list for inbound messages. Returns array of hashes: { type:, name:, cdn_media: } cdn_media contains { encrypt_query_param:, aes_key: } for potential download. Extract and materialize file attachments from an inbound item_list.

Images are downloaded from CDN and converted to data_url so the agent’s vision pipeline (partition_files → resolve_vision_images) picks them up. Files (PDF, DOCX, etc.) are downloaded to clacky-uploads so the agent’s file processing pipeline (process_path) can parse them. Voice/video are kept as cdn_media metadata only (no local download).

Returns Array of Hashes. Image entries:

{ type: :image, name: String, mime_type: String, data_url: String }

File entries (downloaded):

{ type: :file, name: String, path: String }

Voice/video entries:

{ type: :voice/:video, name: String, cdn_media: Hash }


437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 437

def extract_files(item_list)
  files = []
  item_list.each do |item|
    case item["type"]
    when 2  # IMAGE — download + convert to data_url for agent vision
      img = item["image_item"]
      next unless img
      cdn_media = img["media"]
      next unless cdn_media

      # Protocol: image_item may have a top-level aeskey field that overrides
      # the one inside media. Use image_item.aeskey first, fall back to media.aes_key.
      top_level_aeskey = img["aeskey"]
      effective_cdn_media = if top_level_aeskey && !top_level_aeskey.empty?
                              cdn_media.merge("aes_key" => top_level_aeskey)
                            else
                              cdn_media
                            end

      Clacky::Logger.debug("[WeixinAdapter] image cdn_media: #{effective_cdn_media.to_json}")

      begin
        raw_bytes = @api_client.download_media(effective_cdn_media, ApiClient::MEDIA_TYPE_IMAGE)
        mime_type = detect_image_mime(raw_bytes)
        data_url  = "data:#{mime_type};base64,#{Base64.strict_encode64(raw_bytes)}"
        files << {
          type:      :image,
          name:      "image.jpg",
          mime_type: mime_type,
          data_url:  data_url
        }
      rescue => e
        Clacky::Logger.warn("[WeixinAdapter] Failed to download image: #{e.message}\n#{e.backtrace.first(3).join("\n")}")
      end

    when 3  # VOICE
      v = item["voice_item"]
      next unless v
      files << {
        type:      :voice,
        name:      "voice.amr",
        cdn_media: v["media"]
      }
    when 4  # FILE — download to disk so agent can parse it
      fi = item["file_item"]
      next unless fi
      cdn_media = fi["media"]
      file_name = fi["file_name"].to_s
      file_name = "attachment" if file_name.empty?
      file_md5  = fi["md5"].to_s
      file_len  = fi["len"].to_s

      if cdn_media
        begin
          raw_bytes = @api_client.download_media(cdn_media, ApiClient::MEDIA_TYPE_FILE)
          saved     = Clacky::Utils::FileProcessor.save(body: raw_bytes, filename: file_name)
          Clacky::Logger.info("[WeixinAdapter] file downloaded to #{saved[:path]} (#{raw_bytes.bytesize} bytes)")
          files << {
            type: :file,
            name: saved[:name],
            path: saved[:path],
            md5:  file_md5.empty? ? nil : file_md5,
            len:  file_len.empty? ? nil : file_len
          }
        rescue => e
          Clacky::Logger.warn("[WeixinAdapter] Failed to download file #{file_name}: #{e.message}\n#{e.backtrace.first(3).join("\n")}")
          # Fall back to metadata-only so the agent at least knows a file was attached
          files << {
            type:      :file,
            name:      file_name,
            cdn_media: cdn_media,
            md5:       file_md5.empty? ? nil : file_md5,
            len:       file_len.empty? ? nil : file_len
          }
        end
      else
        files << {
          type: :file,
          name: file_name,
          md5:  file_md5.empty? ? nil : file_md5,
          len:  file_len.empty? ? nil : file_len
        }
      end
    when 5  # VIDEO
      vi = item["video_item"]
      next unless vi
      files << {
        type:      :video,
        name:      "video.mp4",
        cdn_media: vi["media"]
      }
    end
  end
  files
end

#extract_text(item_list) ⇒ Object



395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 395

def extract_text(item_list)
  parts = []
  item_list.each do |item|
    case item["type"]
    when 1  # TEXT
      raw_text = item.dig("text_item", "text").to_s.strip
      ref = item["ref_msg"]
      if ref && !ref.empty?
        ref_parts = []
        ref_parts << ref["title"] if ref["title"] && !ref["title"].empty?
        if (ri = ref["message_item"]) && ri["type"] == 1
          rt = ri.dig("text_item", "text").to_s.strip
          ref_parts << rt unless rt.empty?
        end
        parts << "[引用: #{ref_parts.join(" | ")}]" unless ref_parts.empty?
      end
      parts << raw_text unless raw_text.empty?
    when 3  # VOICE — use transcription if available
      vt = item.dig("voice_item", "text").to_s.strip
      parts << vt unless vt.empty?
    end
  end
  parts.join("\n")
end

#fetch_typing_ticket(user_id, context_token) ⇒ Object

Fetch (or return cached) typing_ticket for user_id. Returns nil on failure — keepalive will just skip without crashing.



610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 610

def fetch_typing_ticket(user_id, context_token)
  @typing_mutex.synchronize do
    entry = @typing_tickets[user_id]
    if entry && (Time.now.to_i - entry[:cached_at]) < TYPING_TICKET_TTL
      return entry[:ticket]
    end
  end

  ticket = @api_client.get_typing_ticket(
    ilink_user_id: user_id,
    context_token: context_token
  )
  return nil if ticket.empty?

  @typing_mutex.synchronize do
    @typing_tickets[user_id] = { ticket: ticket, cached_at: Time.now.to_i }
  end
  ticket
rescue => e
  Clacky::Logger.warn("[WeixinAdapter] getconfig failed for #{user_id}: #{e.message}")
  nil
end

#flush_pending(chat_id) ⇒ Object

Force-flush pending text for a chat_id. Called before sending files or on task completion.



311
312
313
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 311

def flush_pending(chat_id)
  @send_queue.flush(chat_id)
end

#lookup_context_token(user_id) ⇒ Object



555
556
557
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 555

def lookup_context_token(user_id)
  @ctx_mutex.synchronize { @context_tokens[user_id] }
end

#markdown_to_plain(text) ⇒ Object

Strip markdown syntax for WeChat (no markdown rendering).



586
587
588
589
590
591
592
593
594
595
596
597
598
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 586

def markdown_to_plain(text)
  r = text.dup
  r.gsub!(/```[^\n]*\n?([\s\S]*?)```/) { Regexp.last_match(1).strip }
  r.gsub!(/!\[[^\]]*\]\([^)]*\)/, "")
  r.gsub!(/\[([^\]]+)\]\([^)]*\)/, '\\1')
  r.gsub!(/\*\*([^*]+)\*\*/, '\\1')
  r.gsub!(/\*([^*]+)\*/, '\\1')
  r.gsub!(/__([^_]+)__/, '\\1')
  r.gsub!(/_([^_]+)_/, '\\1')
  r.gsub!(/^#+\s+/, "")
  r.gsub!(/^[-*_]{3,}\s*$/, "")
  r.strip
end

#process_message(msg) ⇒ Object



350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 350

def process_message(msg)
  # Only process inbound USER messages (message_type 1 = USER)
  return unless msg["message_type"] == 1

  from_user_id  = msg["from_user_id"].to_s
  context_token = msg["context_token"].to_s
  return if from_user_id.empty? || context_token.empty?

  if @allowed_users.any? && !@allowed_users.include?(from_user_id)
    Clacky::Logger.debug("[WeixinAdapter] ignoring message from #{from_user_id} (not in allowed_users)")
    return
  end

  # Cache context_token — needed when sending replies
  store_context_token(from_user_id, context_token)

  item_list = msg["item_list"] || []
  Clacky::Logger.debug("[WeixinAdapter] item_list raw: #{item_list.to_json}")
  text  = extract_text(item_list)
  files = extract_files(item_list)

  # Require at least some content (text or files)
  return if text.strip.empty? && files.empty?

  event = {
    type:          :message,
    platform:      :weixin,
    chat_id:       from_user_id,
    user_id:       from_user_id,
    text:          text.strip,
    files:         files,
    message_id:    msg["message_id"]&.to_s,
    timestamp:     msg["create_time_ms"] ? Time.at(msg["create_time_ms"] / 1000.0) : Time.now,
    chat_type:     :direct,
    context_token: context_token,
    raw:           msg
  }

  log_parts = []
  log_parts << text.slice(0, 80) unless text.strip.empty?
  log_parts << "#{files.size} file(s)" unless files.empty?
  Clacky::Logger.info("[WeixinAdapter] message from #{from_user_id}: #{log_parts.join(" + ")}")
  @on_message&.call(event)
end

#send_file(chat_id, file_path, name: nil, reply_to: nil) ⇒ Object

Send a file to a user. file_path: local path to the file to send file_name: optional display name (defaults to basename)



318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 318

def send_file(chat_id, file_path, name: nil, reply_to: nil)
  ctoken = lookup_context_token(chat_id)
  unless ctoken
    Clacky::Logger.warn("[WeixinAdapter] send_file: no context_token for #{chat_id}, dropping")
    return { message_id: nil }
  end

  @send_queue.flush(chat_id)

  @api_client.send_file(
    to_user_id:    chat_id,
    file_path:     file_path,
    file_name:     name || File.basename(file_path),
    context_token: ctoken
  )
  { message_id: nil }
rescue => e
  Clacky::Logger.error("[WeixinAdapter] send_file failed for #{chat_id}: #{e.message}")
  { message_id: nil }
end

#send_text(chat_id, text, reply_to: nil) ⇒ Object

Send a plain text reply to a user. The context_token from the inbound message is required by the Weixin protocol. Text is enqueued and sent in batches by the background flusher to avoid rate-limiting.



296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 296

def send_text(chat_id, text, reply_to: nil)
  ctoken = lookup_context_token(chat_id)
  unless ctoken
    Clacky::Logger.warn("[WeixinAdapter] send_text: no context_token for #{chat_id}, dropping message")
    return { message_id: nil }
  end

  plain = markdown_to_plain(text)
  return { message_id: nil } if plain.empty?

  @send_queue.enqueue(chat_id, plain, ctoken)
  { message_id: nil }
end

#split_message(text, limit: 2000) ⇒ Object

Split text into ≤2000 Unicode character chunks per iLink protocol recommendation. Priority: split at nn, then n, then space, then hard cut.



568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 568

def split_message(text, limit: 2000)
  return [text] if text.chars.length <= limit
  chunks = []
  while text.chars.length > limit
    window = text.chars.first(limit).join
    # Prefer double-newline boundary
    cut = window.rindex("\n\n")
    cut = window.rindex("\n")   if cut.nil?
    cut = window.rindex(" ")    if cut.nil?
    cut = limit                 if cut.nil? || cut.zero?
    chunks << text.chars.first(cut).join.rstrip
    text = text.chars.drop(cut).join.lstrip
  end
  chunks << text unless text.empty?
  chunks
end

#start(&on_message) ⇒ Object



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 245

def start(&on_message)
  @running    = true
  @on_message = on_message

  get_updates_buf    = ""
  consecutive_errors = 0

  Clacky::Logger.info("[WeixinAdapter] starting long-poll (base_url=#{@base_url})")

  while @running
    begin
      resp = @api_client.get_updates(get_updates_buf: get_updates_buf)

      consecutive_errors = 0
      new_buf = resp["get_updates_buf"].to_s
      get_updates_buf = new_buf unless new_buf.empty?

      (resp["msgs"] || []).each do |msg|
        process_message(msg)
      rescue => e
        Clacky::Logger.warn("[WeixinAdapter] process_message error: #{e.message}")
      end

    rescue ApiClient::TimeoutError
      # Long-poll server-side timeout is expected — just retry
    rescue ApiClient::ApiError => e
      if e.code == ApiClient::SESSION_EXPIRED_ERRCODE
        Clacky::Logger.warn("[WeixinAdapter] Session expired (token may need refresh), backing off 60s")
        sleep 60
      else
        consecutive_errors += 1
        Clacky::Logger.warn("[WeixinAdapter] API error #{e.code}: #{e.message}")
        sleep(consecutive_errors > 3 ? 30 : RECONNECT_DELAY)
      end
    rescue => e
      consecutive_errors += 1
      Clacky::Logger.error("[WeixinAdapter] poll error: #{e.message}")
      break unless @running
      sleep(consecutive_errors > 3 ? 30 : RECONNECT_DELAY)
    end
  end
end

#start_typing_keepalive(user_id, context_token) ⇒ Object

Start a background thread that sends sendtyping(1) every TYPING_KEEPALIVE_INTERVAL. Any existing keepalive for this user is stopped first.



635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 635

def start_typing_keepalive(user_id, context_token)
  stop_typing_keepalive(user_id)

  ticket = fetch_typing_ticket(user_id, context_token)
  unless ticket
    Clacky::Logger.debug("[WeixinAdapter] no typing_ticket for #{user_id}, skipping keepalive")
    return
  end

  thread = Thread.new do
    loop do
      begin
        @api_client.send_typing(
          ilink_user_id: user_id,
          typing_ticket: ticket,
          status:        1
        )
        Clacky::Logger.debug("[WeixinAdapter] typing keepalive sent for #{user_id}")
      rescue => e
        Clacky::Logger.debug("[WeixinAdapter] typing keepalive error: #{e.message}")
      end
      sleep TYPING_KEEPALIVE_INTERVAL
    end
  end

  @keepalive_mutex.synchronize { @keepalive_threads[user_id] = thread }
  Clacky::Logger.debug("[WeixinAdapter] typing keepalive started for #{user_id}")
end

#stopObject



288
289
290
291
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 288

def stop
  @running = false
  @send_queue.stop
end

#stop_typing_keepalive(user_id) ⇒ Object

Stop keepalive thread and send sendtyping(status=2) to cancel “typing” indicator.



665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 665

def stop_typing_keepalive(user_id)
  thread = @keepalive_mutex.synchronize { @keepalive_threads.delete(user_id) }
  return unless thread

  thread.kill
  thread.join(1)

  ticket = @typing_mutex.synchronize { @typing_tickets.dig(user_id, :ticket) }
  if ticket
    begin
      @api_client.send_typing(
        ilink_user_id: user_id,
        typing_ticket: ticket,
        status:        2
      )
    rescue => e
      Clacky::Logger.debug("[WeixinAdapter] stop typing error: #{e.message}")
    end
  end
  Clacky::Logger.debug("[WeixinAdapter] typing keepalive stopped for #{user_id}")
end

#store_context_token(user_id, token) ⇒ Object



551
552
553
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 551

def store_context_token(user_id, token)
  @ctx_mutex.synchronize { @context_tokens[user_id] = token }
end

#supports_message_updates?Boolean

Returns:

  • (Boolean)


345
346
347
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 345

def supports_message_updates?
  false
end

#validate_config(config) ⇒ Object



339
340
341
342
343
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 339

def validate_config(config)
  errors = []
  errors << "token is required" if config[:token].nil? || config[:token].to_s.strip.empty?
  errors
end