Class: Clacky::Channel::ChannelManager

Inherits:
Object
  • Object
show all
Defined in:
lib/clacky/server/channel/channel_manager.rb

Overview

ChannelManager starts and supervises IM platform adapter threads. When an inbound message arrives it:

1. Resolves (or auto-creates) a Session bound to this IM identity
2. Retrieves the WebUIController for that session
3. Creates a ChannelUIController and subscribes it to the WebUIController
4. Runs the agent task via run_agent_task (same as HttpServer)
5. Unsubscribes the ChannelUIController when the task finishes

Thread model: each adapter runs two long-lived threads (read loop + ping). ChannelManager itself is non-blocking — call #start from HttpServer after the WEBrick server has started.

Session binding: the first message from an IM identity automatically creates a new session and binds it. Users can use /bind <session_id> to switch to an existing WebUI session instead. Bindings are stored in the session registry as :channel_keys => Set of channel key strings. WebUI sessions are persisted by HttpServer — channel adds no extra persistence.

Constant Summary collapse

KNOWN_COMMAND =
%r{\A/(new|clear|model|skills|bind|stop|unbind|status|list)\b}i
COMMAND_HELP =
<<~HELP.strip
  Commands:
    ? / h / help - show this help
    /new / /clear - start a new session
    /model - show current model & available models
    /model <n> - switch to model n
    /skills - list available skills
    /<skill> <args> - invoke a skill directly
    /bind <n|session_id> - switch to a session (use /list to see numbers)
    /unbind - remove binding
    /stop - interrupt current task
    /status - show current binding
    /list - show recent sessions
HELP

Instance Method Summary collapse

Constructor Details

#initialize(session_registry:, session_builder:, run_agent_task:, interrupt_session:, channel_config:, binding_mode: :chat_user) ⇒ ChannelManager

Returns a new instance of ChannelManager.

Parameters:

  • session_registry (Clacky::Server::SessionRegistry)
  • session_builder (Proc)

    (name:, working_dir:) => session_id — from HttpServer

  • run_agent_task (Proc)

    (session_id, agent, &task) — from HttpServer

  • interrupt_session (Proc)

    (session_id) — from HttpServer

  • channel_config (Clacky::ChannelConfig)
  • binding_mode (:user | :chat | :chat_user) (defaults to: :chat_user)

    how to map IM identities to sessions. :chat_user (default) — one session per (chat, user) pair. Most natural:

    private chat = that user's session; in a group each
    user has their own session; the same user across
    different groups keeps those contexts separate.
    

    :chat — one session per chat (all users in a group share it). :user — one session per user (merges DMs and all groups).



38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/clacky/server/channel/channel_manager.rb', line 38

def initialize(session_registry:, session_builder:, run_agent_task:, interrupt_session:, channel_config:, binding_mode: :chat_user)
  @registry          = session_registry
  @session_builder   = session_builder
  @run_agent_task    = run_agent_task
  @interrupt_session = interrupt_session
  @channel_config    = channel_config
  @binding_mode      = binding_mode
  @adapters          = []
  @adapter_threads   = []
  @running           = false
  @mutex             = Mutex.new
  @session_counters  = Hash.new(0)  # platform => count, for short session names
  @group_buffer      = GroupMessageBuffer.new
end

Instance Method Details

#adapter_for(platform) ⇒ Object?

Return the currently-live adapter for a given platform, or nil if none running. Thread-safe — acquires @mutex to read from @adapters.

Parameters:

  • platform (Symbol, String)

Returns:

  • (Object, nil)


103
104
105
106
# File 'lib/clacky/server/channel/channel_manager.rb', line 103

def adapter_for(platform)
  platform = platform.to_sym
  @mutex.synchronize { @adapters.find { |a| a.platform_id == platform } }
end

#adapter_loop(adapter) ⇒ Object



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/clacky/server/channel/channel_manager.rb', line 225

def adapter_loop(adapter)
  Clacky::Logger.info("[ChannelManager] :#{adapter.platform_id} adapter loop started")
  adapter.start do |event|
    summary = event[:text].to_s.lines.first.to_s.strip[0, 80]
    summary = "[image]" if summary.empty? && !event[:files].to_a.empty?
    Clacky::Logger.info("[ChannelManager] :#{adapter.platform_id} message from #{event[:user_id]} in #{event[:chat_id]}: #{summary}")
    route_message(adapter, event)
  rescue StandardError => e
    Clacky::Logger.warn("[ChannelManager] Error routing :#{adapter.platform_id} message: #{e.message}\n#{e.backtrace.first(3).join("\n")}")
    adapter.send_text(event[:chat_id], "Error: #{e.message}")
  end
rescue StandardError => e
  Clacky::Logger.warn("[ChannelManager] :#{adapter.platform_id} adapter crashed: #{e.message}\n#{e.backtrace.first(3).join("\n")}")
  if @running
    Clacky::Logger.info("[ChannelManager] :#{adapter.platform_id} restarting in 5s...")
    sleep 5
    retry
  end
end

#auto_create_session(adapter, event) ⇒ Object



570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
# File 'lib/clacky/server/channel/channel_manager.rb', line 570

def auto_create_session(adapter, event)
  key      = channel_key(event)
  platform = event[:platform].to_s
  count    = @mutex.synchronize { @session_counters[platform] += 1 }
  name     = "#{platform}-#{count}"
  session_id = @session_builder.call(name: name, source: :channel)
  bind_key_to_session(key, session_id)

  # Create a long-lived ChannelUIController for this session and subscribe it
  # to the session's WebUIController. It stays for the session's full lifetime
  # so all events (agent output, errors, status) flow through web_ui → channel_ui.
  channel_ui = ChannelUIController.new(event, -> { adapter_for(event[:platform]) })
  @registry.with_session(session_id) do |s|
    s[:ui]&.subscribe_channel(channel_ui)
    s[:channel_ui] = channel_ui
  end

  Clacky::Logger.info("[ChannelManager] Auto-created session #{session_id[0, 8]} for #{key}")
  session_id
end

#bind_key_to_session(key, session_id) ⇒ Object



629
630
631
632
633
634
635
636
637
# File 'lib/clacky/server/channel/channel_manager.rb', line 629

def bind_key_to_session(key, session_id)
  @registry.list.each do |summary|
    @registry.with_session(summary[:id]) { |s| s[:channel_keys]&.delete(key) }
  end
  @registry.with_session(session_id) do |s|
    s[:channel_keys] ||= Set.new
    s[:channel_keys].add(key)
  end
end

#channel_key(event) ⇒ Object



653
654
655
656
657
658
659
660
661
# File 'lib/clacky/server/channel/channel_manager.rb', line 653

def channel_key(event)
  platform = event[:platform].to_s
  case @binding_mode
  when :chat      then "#{platform}:chat:#{event[:chat_id]}"
  when :user      then "#{platform}:user:#{event[:user_id]}"
  else # :chat_user (default)
    "#{platform}:chat:#{event[:chat_id]}:user:#{event[:user_id]}"
  end
end

#channel_key_from_info(channel_info) ⇒ Object



663
664
665
666
667
668
669
670
671
672
673
# File 'lib/clacky/server/channel/channel_manager.rb', line 663

def channel_key_from_info(channel_info)
  platform = channel_info[:platform].to_s
  chat_id  = channel_info[:chat_id].to_s
  user_id  = channel_info[:user_id].to_s
  case @binding_mode
  when :chat      then "#{platform}:chat:#{chat_id}"
  when :user      then "#{platform}:user:#{user_id}"
  else # :chat_user (default)
    "#{platform}:chat:#{chat_id}:user:#{user_id}"
  end
end

#channel_ui_for_session(session_id) ⇒ Object

Retrieve the ChannelUIController bound to a session (if any).



592
593
594
595
596
# File 'lib/clacky/server/channel/channel_manager.rb', line 592

def channel_ui_for_session(session_id)
  result = nil
  @registry.with_session(session_id) { |s| result = s[:channel_ui] }
  result
end

#ensure_channel_ui_subscribed(session_id, event) ⇒ Object

Make sure session has a ChannelUIController subscribed to its WebUIController. Needed both at startup (for restored sessions) and after a session is evicted from memory and rebuilt by SessionRegistry#ensure (which drops :ui/:channel_ui).



601
602
603
604
605
606
607
608
609
610
611
612
613
614
# File 'lib/clacky/server/channel/channel_manager.rb', line 601

def ensure_channel_ui_subscribed(session_id, event)
  needs_attach = false
  @registry.with_session(session_id) do |s|
    needs_attach = s[:ui] && s[:channel_ui].nil?
  end
  return unless needs_attach

  channel_ui = ChannelUIController.new(event, -> { adapter_for(event[:platform]) })
  @registry.with_session(session_id) do |s|
    next unless s[:ui] && s[:channel_ui].nil?
    s[:ui].subscribe_channel(channel_ui)
    s[:channel_ui] = channel_ui
  end
end

#group_history(chat_id) ⇒ Array<Hash>

Return all buffered group chat messages for a given chat.

Parameters:

  • chat_id (String)

Returns:

  • (Array<Hash>)

    each entry has :user_id, :user_name, :text



88
89
90
91
92
# File 'lib/clacky/server/channel/channel_manager.rb', line 88

def group_history(chat_id)
  @group_buffer.peek(chat_id).map do |e|
    { user_id: e.user_id, user_name: e.user_name, text: e.text }
  end
end

#handle_command(adapter, event, text) ⇒ Object



350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
# File 'lib/clacky/server/channel/channel_manager.rb', line 350

def handle_command(adapter, event, text)
  chat_id = event[:chat_id]
  key     = channel_key(event)

  case text
  when /\A([\?h]|help)\z/i
    adapter.send_text(chat_id, COMMAND_HELP)

  when "/new", "/clear"
    session_id = auto_create_session(adapter, event)
    adapter.send_text(chat_id, "New session `#{session_id[0, 8]}` created.") if session_id

  when /\A\/model\b/i
    handle_model_command(adapter, event, text)

  when /\A\/skills\b/i
    handle_skills_command(adapter, event)

  when /\A\/bind\s+(\S+)\z/i
    arg = Regexp.last_match(1)
    # Support numeric index from /list (1-based)
    session_id = if arg =~ /\A\d+\z/
      recent = @registry.list.first(5)
      idx = arg.to_i - 1
      recent[idx]&.fetch(:id, nil)
    else
      arg
    end
    unless session_id && @registry.get(session_id)
      adapter.send_text(chat_id, "Session not found. Use /list to see available sessions.")
      return
    end

    # Detach channel_ui from the old session's web_ui, reattach to the new one.
    old_session_id = resolve_session(event)
    channel_ui = old_session_id ? channel_ui_for_session(old_session_id) : nil

    if channel_ui
      @registry.with_session(old_session_id) { |s| s[:ui]&.unsubscribe_channel(channel_ui); s.delete(:channel_ui) }
    else
      channel_ui = ChannelUIController.new(event, -> { adapter_for(event[:platform]) })
    end

    bind_key_to_session(key, session_id)
    @registry.with_session(session_id) do |s|
      s[:ui]&.subscribe_channel(channel_ui)
      s[:channel_ui] = channel_ui
    end

    Clacky::Logger.info("[ChannelManager] Bound #{key} -> session #{session_id[0, 8]}")
    adapter.send_text(chat_id, "Bound to session `#{session_id[0, 8]}`.")

  when "/stop"
    session_id = resolve_session(event)
    unless session_id
      adapter.send_text(chat_id, "No session bound.")
      return
    end
    @interrupt_session.call(session_id)
    adapter.send_text(chat_id, "Task interrupted.")

  when "/unbind"
    unbound = false
    @registry.list.each do |summary|
      @registry.with_session(summary[:id]) do |s|
        unbound = true if s[:channel_keys]&.delete(key)
      end
    end
    adapter.send_text(chat_id, unbound ? "Unbound." : "No binding found.")

  when "/status"
    session_id = resolve_session(event)
    if session_id
      session = @registry.get(session_id)
      model = session&.dig(:agent)&.current_model_info
      model_name = model&.dig(:model) || "unknown"
      adapter.send_text(chat_id, "Bound to session `#{session_id[0, 8]}` (status: #{session&.dig(:status) || "unknown"}, model: #{model_name})")
    else
      adapter.send_text(chat_id, "No session bound yet. Send any message to auto-create one.")
    end

  when "/list"
    list_sessions(adapter, chat_id)

  else
    adapter.send_text(chat_id, "Unknown command. Type ? for help.")
  end
end

#handle_model_command(adapter, event, text) ⇒ Object



456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
# File 'lib/clacky/server/channel/channel_manager.rb', line 456

def handle_model_command(adapter, event, text)
  chat_id   = event[:chat_id]
  session_id = resolve_session(event)

  unless session_id
    adapter.send_text(chat_id, "No session bound. Send any message to auto-create one first.")
    return
  end

  session = @registry.get(session_id)
  agent = session&.dig(:agent)
  unless agent
    adapter.send_text(chat_id, "Session not ready.")
    return
  end

  arg = text.sub(/\A\/model\s*/i, "").strip

  if arg.empty?
    # Show current model and available list
    info = agent.current_model_info
    current = info&.dig(:model) || "unknown"
    sub     = info&.dig(:sub_model)
    card    = info&.dig(:card_model)
    header  = "Current model: #{current}"
    header += " (#{card} · #{sub})" if card && sub && sub != current
    header += " (#{card})" if card && !sub

    models = agent.available_models
    if models.empty?
      adapter.send_text(chat_id, "#{header}\nNo other models available.")
      return
    end

    lines = models.each_with_index.map do |name, i|
      marker = name == current ? " *" : ""
      "#{i + 1}. #{name}#{marker}"
    end
    adapter.send_text(chat_id, "#{header}\n\nSwitch with /model <n>:\n#{lines.join("\n")}")
  elsif arg =~ /\A\d+\z/
    idx = arg.to_i - 1
    models = agent.config.models
    if idx < 0 || idx >= models.length
      adapter.send_text(chat_id, "Invalid model number. Use /model to see available models.")
      return
    end

    model_id = models[idx]["id"]
    if agent.switch_model_by_id(model_id)
      new_info = agent.current_model_info
      adapter.send_text(chat_id, "Switched to #{new_info&.dig(:model) || model_id}.")
    else
      adapter.send_text(chat_id, "Failed to switch model.")
    end
  else
    adapter.send_text(chat_id, "Usage: /model to list, /model <n> to switch.")
  end
end

#handle_skills_command(adapter, event) ⇒ Object



515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
# File 'lib/clacky/server/channel/channel_manager.rb', line 515

def handle_skills_command(adapter, event)
  chat_id    = event[:chat_id]
  session_id = resolve_session(event)

  unless session_id
    adapter.send_text(chat_id, "No session bound. Send any message to auto-create one first.")
    return
  end

  session = @registry.get(session_id)
  agent = session&.dig(:agent)
  unless agent
    adapter.send_text(chat_id, "Session not ready.")
    return
  end

  skills = agent.skill_loader.user_invocable_skills
    .reject { |s| s.source == :default }
    .first(10)
  if skills.empty?
    adapter.send_text(chat_id, "No skills available.")
    return
  end

  lines = skills.each_with_index.map do |s, i|
    desc = s.description.to_s.strip
    desc = desc.empty? ? "(no description)" : desc.length > 50 ? "#{desc[0..49]}..." : desc
    "#{i + 1}. #{s.name} - #{desc}"
  end
  adapter.send_text(chat_id, "Skills:\n#{lines.join("\n")}")
end

#known_users(platform) ⇒ Array<String>

Return a list of known user IDs for the given platform. Collected from every message that has been processed since the server started. Weixin stores context_tokens keyed by user_id; feishu/wecom track chat_ids via the session binding table in the registry.

Parameters:

  • platform (Symbol, String)

Returns:

  • (Array<String>)


139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/clacky/server/channel/channel_manager.rb', line 139

def known_users(platform)
  platform = platform.to_sym
  adapter  = adapter_for(platform)
  return [] unless adapter

  # Weixin adapter exposes @context_tokens whose keys are user_ids
  if adapter.respond_to?(:context_token_user_ids)
    return adapter.context_token_user_ids
  end

  # Fallback: scan session registry for channel_keys matching this platform.
  # Key formats depend on binding_mode:
  #   :user       → "platform:user:USER_ID"
  #   :chat       → "platform:chat:CHAT_ID"
  #   :chat_user  → "platform:chat:CHAT_ID:user:USER_ID"
  #
  # For send_text we need the chat_id (Feishu/WeCom use chat_id as the
  # receive_id for outbound messages), so we extract the chat portion.
  prefix = "#{platform}:"
  ids = []
  @registry.list.each do |summary|
    @registry.with_session(summary[:id]) do |s|
      (s[:channel_keys] || []).each do |key|
        next unless key.start_with?(prefix)

        remainder = key.sub(prefix, "") # e.g. "chat:OC_ID:user:OU_ID" or "user:UID" or "chat:CID"
        ids << extract_chat_id(remainder)
      end
    end
  end
  ids.compact.uniq
end

#list_sessions(adapter, chat_id) ⇒ Object



639
640
641
642
643
644
645
646
647
648
649
650
651
# File 'lib/clacky/server/channel/channel_manager.rb', line 639

def list_sessions(adapter, chat_id)
  sessions = @registry.list.first(5)
  if sessions.empty?
    adapter.send_text(chat_id, "No sessions available.")
    return
  end
  lines = sessions.each_with_index.map do |s, i|
    name = s[:name].to_s.empty? ? "(unnamed)" : s[:name]
    time = s[:updated_at].to_s[5, 11]&.tr("T", " ") || "-"
    "#{i + 1}. `#{s[:id][0, 8]}` #{name} (#{s[:status]}) #{time}"
  end
  adapter.send_text(chat_id, "Recent sessions:\n#{lines.join("\n")}\n\nUse `/bind <n>` to switch.")
end

#reload_platform(platform, config) ⇒ Object

Hot-reload a single platform adapter with updated config. Stops the existing adapter (if running), then starts a new one if enabled.

Parameters:



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/clacky/server/channel/channel_manager.rb', line 176

def reload_platform(platform, config)
  # Stop existing adapter for this platform
  @mutex.synchronize do
    existing = @adapters.find { |a| a.platform_id == platform }
    if existing
      safe_stop_adapter(existing)
      @adapters.delete(existing)
    end
  end

  # Start new adapter if enabled
  if config.enabled?(platform)
    @channel_config = config
    start_adapter(platform)
    Clacky::Logger.info("[ChannelManager] :#{platform} adapter reloaded")
  else
    Clacky::Logger.info("[ChannelManager] :#{platform} disabled — adapter not started")
  end
end

#resolve_session(event) ⇒ Object



547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
# File 'lib/clacky/server/channel/channel_manager.rb', line 547

def resolve_session(event)
  key = channel_key(event)
  @registry.list.each do |summary|
    found = nil
    @registry.with_session(summary[:id]) { |s| found = s[:channel_keys]&.include?(key) }
    return summary[:id] if found

    # Check evicted channel sessions via persisted channel_info
    next unless summary[:source] == "channel"
    next unless @registry.ensure(summary[:id])
    agent = nil
    @registry.with_session(summary[:id]) { |s| agent = s[:agent] }
    next unless agent&.channel_info
    next unless channel_key_from_info(agent.channel_info) == key
    bind_key_to_session(key, summary[:id])
    return summary[:id]
  end
  nil
rescue StandardError => e
  Clacky::Logger.error("[ChannelManager] Session resolve failed: #{e.message}")
  nil
end

#restore_channel_bindingsObject



707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
# File 'lib/clacky/server/channel/channel_manager.rb', line 707

def restore_channel_bindings
  bound_keys = Set.new
  restored_count = 0
  @registry.list(limit: nil).each do |summary|
    info = summary[:channel_info]
    next unless info.is_a?(Hash) && info[:platform] && info[:user_id] && info[:chat_id]

    @registry.ensure(summary[:id])
    agent = nil
    @registry.with_session(summary[:id]) { |s| agent = s[:agent] }
    next unless agent&.channel_info

    info = agent.channel_info
    next unless info[:platform] && info[:user_id] && info[:chat_id]

    key = channel_key_from_info(info)

    event = { platform: info[:platform], chat_id: info[:chat_id] }
    ensure_channel_ui_subscribed(summary[:id], event)

    next unless bound_keys.add?(key)
    bind_key_to_session(key, summary[:id])

    Clacky::Logger.info("[ChannelManager] Restored channel binding #{key} -> session #{summary[:id][0, 8]}")
    restored_count += 1
  end
  Clacky::Logger.info("[ChannelManager] Restored #{restored_count} channel binding(s)") if restored_count > 0
end

#route_message(adapter, event) ⇒ Object



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
# File 'lib/clacky/server/channel/channel_manager.rb', line 245

def route_message(adapter, event)
  if event[:observe_only]
    @group_buffer.push(event[:chat_id], user_id: event[:user_id], user_name: event[:user_name], text: event[:text].to_s)
    return
  end

  if event[:unsupported]
    adapter.send_text(event[:chat_id], "Sorry, this message type is not supported.")
    return
  end

  text  = event[:text]&.strip
  files = event[:files] || []
  return if (text.nil? || text.empty?) && files.empty?

  # Handle built-in commands
  if text&.match?(KNOWN_COMMAND) || text&.match?(/\A([\?h]|help)\z/i)
    handle_command(adapter, event, text)
    return
  end

  session_id = resolve_session(event)
  if session_id
    bind_key_to_session(channel_key(event), session_id)
  else
    session_id = auto_create_session(adapter, event)
  end

  session = @registry.get(session_id)
  unless session
    Clacky::Logger.warn("[ChannelManager] Session #{session_id[0, 8]} not found in registry after create")
    adapter.send_text(event[:chat_id], "Failed to initialize session. Please try again.")
    return
  end

  sub_count = web_ui_for_session_diag(session_id)
  Clacky::Logger.info("[ChannelManager] Routing to session #{session_id[0, 8]} (status=#{session[:status]}, text=#{text.inspect}, channel_subs=#{sub_count})")

  # If session is running, interrupt it AND wait for the old thread to
  # actually unwind before starting a new task. Without the join, two
  # threads briefly race on the same agent/history and the old thread
  # can land an assistant.tool_calls message that the new thread then
  # ships to the LLM with no matching tool result — DeepSeek (strict
  # OpenAI-compat) rejects this with HTTP 400 "insufficient tool
  # messages following tool_calls message". CLI already waits via
  # join(2); we do the same here so all entrypoints behave alike.
  if session[:status] == :running
    Clacky::Logger.info("[ChannelManager] Session busy, interrupting previous task")
    old_thread = nil
    @registry.with_session(session_id) { |s| old_thread = s[:thread] }
    @interrupt_session.call(session_id)
    if old_thread&.alive?
      old_thread.join(2)
      if old_thread.alive?
        Clacky::Logger.warn("[ChannelManager] previous task did not finish within 2s; continuing anyway (watchdog will escalate)")
      end
    end
  end

  agent  = session[:agent]
  web_ui = session[:ui]

  # Set channel info on the agent so session context includes platform/sender.
  agent.channel_info = extract_channel_info(event) if agent.respond_to?(:channel_info=)

  # Re-attach channel UI if it was dropped (session was evicted from memory and rebuilt by ensure).
  ensure_channel_ui_subscribed(session_id, event)

  # Update reply context so responses thread under the current message.
  channel_ui_for_session(session_id)&.update_message_context(event)

  # Sync the inbound message to WebUI so it shows up in the browser session.
  # source: :channel prevents the message from being echoed back to the IM channel.
  web_ui&.show_user_message(text, source: :channel) unless text.nil? || text.empty?

  # Prepend buffered group history so the agent knows what was discussed
  # before it was @-mentioned. Buffer is cleared atomically on take.
  # WebUI always receives the raw user text — context is agent-only.
  prompt = build_prompt_with_context(event[:chat_id], text)

  # Start typing keepalive BEFORE sending any message.
  # sendmessage cancels the typing indicator in WeChat protocol,
  # so keepalive must be running when "Thinking..." is sent so it
  # immediately re-asserts the typing state after that message.
  chat_id       = event[:chat_id]
  context_token = event[:context_token]
  adapter.start_typing_keepalive(chat_id, context_token) if adapter.respond_to?(:start_typing_keepalive)

  # Acknowledge to the IM channel only — WebUI doesn't need a "Thinking..." noise.
  adapter.send_text(chat_id, "Thinking...")

  @run_agent_task.call(session_id, agent) do
    begin
      Clacky::Logger.info("[ChannelManager] agent.run START session=#{session_id[0, 8]} text=#{text.inspect}")
      agent.run(prompt, files: files, display_text: text)
      Clacky::Logger.info("[ChannelManager] agent.run END   session=#{session_id[0, 8]} text=#{text.inspect}")
    rescue StandardError => e
      Clacky::Logger.error("[ChannelManager] agent.run RAISED session=#{session_id[0, 8]} #{e.class}: #{e.message}\n#{e.backtrace.first(8).join("\n")}")
      raise
    ensure
      adapter.stop_typing_keepalive(chat_id) if adapter.respond_to?(:stop_typing_keepalive)
    end
  end
end

#running_platformsArray<Symbol>

Returns platforms currently running.

Returns:

  • (Array<Symbol>)

    platforms currently running



81
82
83
# File 'lib/clacky/server/channel/channel_manager.rb', line 81

def running_platforms
  @mutex.synchronize { @adapters.map(&:platform_id) }
end

#safe_stop_adapter(adapter) ⇒ Object



736
737
738
739
740
# File 'lib/clacky/server/channel/channel_manager.rb', line 736

def safe_stop_adapter(adapter)
  adapter.stop
rescue StandardError => e
  Clacky::Logger.warn("[ChannelManager] Error stopping #{adapter.platform_id}: #{e.message}")
end

#send_to_user(platform, user_id, message) ⇒ Hash?

If no token is found the message cannot be delivered and nil is returned.

For Feishu and WeCom the chat_id / user_id is sufficient — no token needed.

Parameters:

  • platform (Symbol, String)

    e.g. :weixin, :feishu, :wecom

  • user_id (String)

    IM user identifier

  • message (String)

    plain-text (or markdown) message to send

Returns:

  • (Hash, nil)

    adapter result hash, or nil on failure



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/clacky/server/channel/channel_manager.rb', line 116

def send_to_user(platform, user_id, message)
  platform = platform.to_sym
  adapter  = adapter_for(platform)

  unless adapter
    Clacky::Logger.warn("[ChannelManager] send_to_user: no running adapter for :#{platform}")
    return nil
  end

  Clacky::Logger.info("[ChannelManager] send_to_user :#{platform}#{user_id}")
  adapter.send_text(user_id, message)
rescue StandardError => e
  Clacky::Logger.error("[ChannelManager] send_to_user failed: #{e.message}")
  nil
end

#startObject

Start all enabled adapters in background threads. Non-blocking.



54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/clacky/server/channel/channel_manager.rb', line 54

def start
  enabled_platforms = @channel_config.enabled_platforms
  if enabled_platforms.empty?
    Clacky::Logger.info("[ChannelManager] No channels configured — skipping")
    return
  end

  Clacky::Logger.info("[ChannelManager] Starting channels: #{enabled_platforms.join(", ")}")
  @running = true

  restore_channel_bindings

  enabled_platforms.each { |platform| start_adapter(platform) }
end

#start_adapter(platform) ⇒ Object



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/clacky/server/channel/channel_manager.rb', line 197

def start_adapter(platform)
  klass = Adapters.find(platform)
  unless klass
    Clacky::Logger.warn("[ChannelManager] No adapter registered for :#{platform} — skipping")
    return
  end

  raw_config = @channel_config.platform_config(platform)
  Clacky::Logger.info("[ChannelManager] Initializing :#{platform} adapter")
  adapter = klass.new(raw_config)

  errors = adapter.validate_config(raw_config)
  if errors.any?
    Clacky::Logger.warn("[ChannelManager] Config errors for :#{platform}: #{errors.join(", ")}")
    return
  end

  @mutex.synchronize { @adapters << adapter }
  Clacky::Logger.info("[ChannelManager] :#{platform} adapter ready, starting thread")

  thread = Thread.new do
    Thread.current.name = "channel-#{platform}"
    adapter_loop(adapter)
  end

  @adapter_threads << thread
end

#stopObject

Stop all adapters gracefully.



70
71
72
73
74
75
76
77
78
# File 'lib/clacky/server/channel/channel_manager.rb', line 70

def stop
  @running = false
  @mutex.synchronize do
    @adapters.each { |adapter| safe_stop_adapter(adapter) }
    @adapters.clear
  end
  @adapter_threads.each { |t| t.join(1) }
  @adapter_threads.clear
end

#web_ui_for_session_diag(session_id) ⇒ Object



616
617
618
619
620
621
622
623
624
625
626
627
# File 'lib/clacky/server/channel/channel_manager.rb', line 616

def web_ui_for_session_diag(session_id)
  result = nil
  @registry.with_session(session_id) do |s|
    ui = s[:ui]
    result = if ui.respond_to?(:channel_subscribed?)
      ui.instance_variable_get(:@channel_subscribers)&.size || 0
    else
      -1
    end
  end
  result
end