Class: Clacky::Channel::ChannelManager

Inherits:
Object
  • Object
show all
Defined in:
lib/clacky/server/channel/channel_manager.rb

Overview

ChannelManager starts and supervises IM platform adapter threads. When an inbound message arrives it:

1. Resolves (or auto-creates) a Session bound to this IM identity
2. Retrieves the WebUIController for that session
3. Creates a ChannelUIController and subscribes it to the WebUIController
4. Runs the agent task via run_agent_task (same as HttpServer)
5. Unsubscribes the ChannelUIController when the task finishes

Thread model: each adapter runs two long-lived threads (read loop + ping). ChannelManager itself is non-blocking — call #start from HttpServer after the WEBrick server has started.

Session binding: the first message from an IM identity automatically creates a new session and binds it. Users can use /bind <session_id> to switch to an existing WebUI session instead. Bindings are stored in the session registry as :channel_keys => Set of channel key strings. WebUI sessions are persisted by HttpServer — channel adds no extra persistence.

Constant Summary collapse

KNOWN_COMMAND =
%r{\A/(new|clear|model|skills|bind|stop|unbind|status|list)\b}i
COMMAND_HELP =
<<~HELP.strip
  Commands:
    ? / h / help - show this help
    /new / /clear - start a new session
    /model - show current model & available models
    /model <n> - switch to model n
    /skills - list available skills
    /<skill> <args> - invoke a skill directly
    /bind <n|session_id> - switch to a session (use /list to see numbers)
    /unbind - remove binding
    /stop - interrupt current task
    /status - show current binding
    /list - show recent sessions
HELP

Instance Method Summary collapse

Constructor Details

#initialize(session_registry:, session_builder:, run_agent_task:, interrupt_session:, channel_config:, binding_mode: :chat_user) ⇒ ChannelManager

Returns a new instance of ChannelManager.

Parameters:

  • session_registry (Clacky::Server::SessionRegistry)
  • session_builder (Proc)

    (name:, working_dir:) => session_id — from HttpServer

  • run_agent_task (Proc)

    (session_id, agent, &task) — from HttpServer

  • interrupt_session (Proc)

    (session_id) — from HttpServer

  • channel_config (Clacky::ChannelConfig)
  • binding_mode (:user | :chat | :chat_user) (defaults to: :chat_user)

    how to map IM identities to sessions. :chat_user (default) — one session per (chat, user) pair. Most natural:

    private chat = that user's session; in a group each
    user has their own session; the same user across
    different groups keeps those contexts separate.
    

    :chat — one session per chat (all users in a group share it). :user — one session per user (merges DMs and all groups).



37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/clacky/server/channel/channel_manager.rb', line 37

def initialize(session_registry:, session_builder:, run_agent_task:, interrupt_session:, channel_config:, binding_mode: :chat_user)
  @registry          = session_registry
  @session_builder   = session_builder
  @run_agent_task    = run_agent_task
  @interrupt_session = interrupt_session
  @channel_config    = channel_config
  @binding_mode      = binding_mode
  @adapters          = []
  @adapter_threads   = []
  @running           = false
  @mutex             = Mutex.new
  @session_counters  = Hash.new(0)  # platform => count, for short session names
end

Instance Method Details

#adapter_for(platform) ⇒ Object?

Return the currently-live adapter for a given platform, or nil if none running. Thread-safe — acquires @mutex to read from @adapters.

Parameters:

  • platform (Symbol, String)

Returns:

  • (Object, nil)


92
93
94
95
# File 'lib/clacky/server/channel/channel_manager.rb', line 92

def adapter_for(platform)
  platform = platform.to_sym
  @mutex.synchronize { @adapters.find { |a| a.platform_id == platform } }
end

#adapter_loop(adapter) ⇒ Object



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/clacky/server/channel/channel_manager.rb', line 214

def adapter_loop(adapter)
  Clacky::Logger.info("[ChannelManager] :#{adapter.platform_id} adapter loop started")
  adapter.start do |event|
    summary = event[:text].to_s.lines.first.to_s.strip[0, 80]
    summary = "[image]" if summary.empty? && !event[:files].to_a.empty?
    Clacky::Logger.info("[ChannelManager] :#{adapter.platform_id} message from #{event[:user_id]} in #{event[:chat_id]}: #{summary}")
    route_message(adapter, event)
  rescue StandardError => e
    Clacky::Logger.warn("[ChannelManager] Error routing :#{adapter.platform_id} message: #{e.message}\n#{e.backtrace.first(3).join("\n")}")
    adapter.send_text(event[:chat_id], "Error: #{e.message}")
  end
rescue StandardError => e
  Clacky::Logger.warn("[ChannelManager] :#{adapter.platform_id} adapter crashed: #{e.message}\n#{e.backtrace.first(3).join("\n")}")
  if @running
    Clacky::Logger.info("[ChannelManager] :#{adapter.platform_id} restarting in 5s...")
    sleep 5
    retry
  end
end

#auto_create_session(adapter, event) ⇒ Object



531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
# File 'lib/clacky/server/channel/channel_manager.rb', line 531

def auto_create_session(adapter, event)
  key      = channel_key(event)
  platform = event[:platform].to_s
  count    = @mutex.synchronize { @session_counters[platform] += 1 }
  name     = "#{platform}-#{count}"
  session_id = @session_builder.call(name: name, source: :channel)
  bind_key_to_session(key, session_id)

  # Create a long-lived ChannelUIController for this session and subscribe it
  # to the session's WebUIController. It stays for the session's full lifetime
  # so all events (agent output, errors, status) flow through web_ui → channel_ui.
  channel_ui = ChannelUIController.new(event, -> { adapter_for(event[:platform]) })
  @registry.with_session(session_id) do |s|
    s[:ui]&.subscribe_channel(channel_ui)
    s[:channel_ui] = channel_ui
  end

  Clacky::Logger.info("[ChannelManager] Auto-created session #{session_id[0, 8]} for #{key}")
  session_id
end

#bind_key_to_session(key, session_id) ⇒ Object



590
591
592
593
594
595
596
597
598
# File 'lib/clacky/server/channel/channel_manager.rb', line 590

def bind_key_to_session(key, session_id)
  @registry.list.each do |summary|
    @registry.with_session(summary[:id]) { |s| s[:channel_keys]&.delete(key) }
  end
  @registry.with_session(session_id) do |s|
    s[:channel_keys] ||= Set.new
    s[:channel_keys].add(key)
  end
end

#channel_key(event) ⇒ Object



614
615
616
617
618
619
620
621
622
# File 'lib/clacky/server/channel/channel_manager.rb', line 614

def channel_key(event)
  platform = event[:platform].to_s
  case @binding_mode
  when :chat      then "#{platform}:chat:#{event[:chat_id]}"
  when :user      then "#{platform}:user:#{event[:user_id]}"
  else # :chat_user (default)
    "#{platform}:chat:#{event[:chat_id]}:user:#{event[:user_id]}"
  end
end

#channel_key_from_info(channel_info) ⇒ Object



624
625
626
627
628
629
630
631
632
633
634
# File 'lib/clacky/server/channel/channel_manager.rb', line 624

def channel_key_from_info(channel_info)
  platform = channel_info[:platform].to_s
  chat_id  = channel_info[:chat_id].to_s
  user_id  = channel_info[:user_id].to_s
  case @binding_mode
  when :chat      then "#{platform}:chat:#{chat_id}"
  when :user      then "#{platform}:user:#{user_id}"
  else # :chat_user (default)
    "#{platform}:chat:#{chat_id}:user:#{user_id}"
  end
end

#channel_ui_for_session(session_id) ⇒ Object

Retrieve the ChannelUIController bound to a session (if any).



553
554
555
556
557
# File 'lib/clacky/server/channel/channel_manager.rb', line 553

def channel_ui_for_session(session_id)
  result = nil
  @registry.with_session(session_id) { |s| result = s[:channel_ui] }
  result
end

#ensure_channel_ui_subscribed(session_id, event) ⇒ Object

Make sure session has a ChannelUIController subscribed to its WebUIController. Needed both at startup (for restored sessions) and after a session is evicted from memory and rebuilt by SessionRegistry#ensure (which drops :ui/:channel_ui).



562
563
564
565
566
567
568
569
570
571
572
573
574
575
# File 'lib/clacky/server/channel/channel_manager.rb', line 562

def ensure_channel_ui_subscribed(session_id, event)
  needs_attach = false
  @registry.with_session(session_id) do |s|
    needs_attach = s[:ui] && s[:channel_ui].nil?
  end
  return unless needs_attach

  channel_ui = ChannelUIController.new(event, -> { adapter_for(event[:platform]) })
  @registry.with_session(session_id) do |s|
    next unless s[:ui] && s[:channel_ui].nil?
    s[:ui].subscribe_channel(channel_ui)
    s[:channel_ui] = channel_ui
  end
end

#handle_command(adapter, event, text) ⇒ Object



311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
# File 'lib/clacky/server/channel/channel_manager.rb', line 311

def handle_command(adapter, event, text)
  chat_id = event[:chat_id]
  key     = channel_key(event)

  case text
  when /\A([\?h]|help)\z/i
    adapter.send_text(chat_id, COMMAND_HELP)

  when "/new", "/clear"
    session_id = auto_create_session(adapter, event)
    adapter.send_text(chat_id, "New session `#{session_id[0, 8]}` created.") if session_id

  when /\A\/model\b/i
    handle_model_command(adapter, event, text)

  when /\A\/skills\b/i
    handle_skills_command(adapter, event)

  when /\A\/bind\s+(\S+)\z/i
    arg = Regexp.last_match(1)
    # Support numeric index from /list (1-based)
    session_id = if arg =~ /\A\d+\z/
      recent = @registry.list.first(5)
      idx = arg.to_i - 1
      recent[idx]&.fetch(:id, nil)
    else
      arg
    end
    unless session_id && @registry.get(session_id)
      adapter.send_text(chat_id, "Session not found. Use /list to see available sessions.")
      return
    end

    # Detach channel_ui from the old session's web_ui, reattach to the new one.
    old_session_id = resolve_session(event)
    channel_ui = old_session_id ? channel_ui_for_session(old_session_id) : nil

    if channel_ui
      @registry.with_session(old_session_id) { |s| s[:ui]&.unsubscribe_channel(channel_ui); s.delete(:channel_ui) }
    else
      channel_ui = ChannelUIController.new(event, -> { adapter_for(event[:platform]) })
    end

    bind_key_to_session(key, session_id)
    @registry.with_session(session_id) do |s|
      s[:ui]&.subscribe_channel(channel_ui)
      s[:channel_ui] = channel_ui
    end

    Clacky::Logger.info("[ChannelManager] Bound #{key} -> session #{session_id[0, 8]}")
    adapter.send_text(chat_id, "Bound to session `#{session_id[0, 8]}`.")

  when "/stop"
    session_id = resolve_session(event)
    unless session_id
      adapter.send_text(chat_id, "No session bound.")
      return
    end
    @interrupt_session.call(session_id)
    adapter.send_text(chat_id, "Task interrupted.")

  when "/unbind"
    unbound = false
    @registry.list.each do |summary|
      @registry.with_session(summary[:id]) do |s|
        unbound = true if s[:channel_keys]&.delete(key)
      end
    end
    adapter.send_text(chat_id, unbound ? "Unbound." : "No binding found.")

  when "/status"
    session_id = resolve_session(event)
    if session_id
      session = @registry.get(session_id)
      model = session&.dig(:agent)&.current_model_info
      model_name = model&.dig(:model) || "unknown"
      adapter.send_text(chat_id, "Bound to session `#{session_id[0, 8]}` (status: #{session&.dig(:status) || "unknown"}, model: #{model_name})")
    else
      adapter.send_text(chat_id, "No session bound yet. Send any message to auto-create one.")
    end

  when "/list"
    list_sessions(adapter, chat_id)

  else
    adapter.send_text(chat_id, "Unknown command. Type ? for help.")
  end
end

#handle_model_command(adapter, event, text) ⇒ Object



417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
# File 'lib/clacky/server/channel/channel_manager.rb', line 417

def handle_model_command(adapter, event, text)
  chat_id   = event[:chat_id]
  session_id = resolve_session(event)

  unless session_id
    adapter.send_text(chat_id, "No session bound. Send any message to auto-create one first.")
    return
  end

  session = @registry.get(session_id)
  agent = session&.dig(:agent)
  unless agent
    adapter.send_text(chat_id, "Session not ready.")
    return
  end

  arg = text.sub(/\A\/model\s*/i, "").strip

  if arg.empty?
    # Show current model and available list
    info = agent.current_model_info
    current = info&.dig(:model) || "unknown"
    sub     = info&.dig(:sub_model)
    card    = info&.dig(:card_model)
    header  = "Current model: #{current}"
    header += " (#{card} · #{sub})" if card && sub && sub != current
    header += " (#{card})" if card && !sub

    models = agent.available_models
    if models.empty?
      adapter.send_text(chat_id, "#{header}\nNo other models available.")
      return
    end

    lines = models.each_with_index.map do |name, i|
      marker = name == current ? " *" : ""
      "#{i + 1}. #{name}#{marker}"
    end
    adapter.send_text(chat_id, "#{header}\n\nSwitch with /model <n>:\n#{lines.join("\n")}")
  elsif arg =~ /\A\d+\z/
    idx = arg.to_i - 1
    models = agent.config.models
    if idx < 0 || idx >= models.length
      adapter.send_text(chat_id, "Invalid model number. Use /model to see available models.")
      return
    end

    model_id = models[idx]["id"]
    if agent.switch_model_by_id(model_id)
      new_info = agent.current_model_info
      adapter.send_text(chat_id, "Switched to #{new_info&.dig(:model) || model_id}.")
    else
      adapter.send_text(chat_id, "Failed to switch model.")
    end
  else
    adapter.send_text(chat_id, "Usage: /model to list, /model <n> to switch.")
  end
end

#handle_skills_command(adapter, event) ⇒ Object



476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
# File 'lib/clacky/server/channel/channel_manager.rb', line 476

def handle_skills_command(adapter, event)
  chat_id    = event[:chat_id]
  session_id = resolve_session(event)

  unless session_id
    adapter.send_text(chat_id, "No session bound. Send any message to auto-create one first.")
    return
  end

  session = @registry.get(session_id)
  agent = session&.dig(:agent)
  unless agent
    adapter.send_text(chat_id, "Session not ready.")
    return
  end

  skills = agent.skill_loader.user_invocable_skills
    .reject { |s| s.source == :default }
    .first(10)
  if skills.empty?
    adapter.send_text(chat_id, "No skills available.")
    return
  end

  lines = skills.each_with_index.map do |s, i|
    desc = s.description.to_s.strip
    desc = desc.empty? ? "(no description)" : desc.length > 50 ? "#{desc[0..49]}..." : desc
    "#{i + 1}. #{s.name} - #{desc}"
  end
  adapter.send_text(chat_id, "Skills:\n#{lines.join("\n")}")
end

#known_users(platform) ⇒ Array<String>

Return a list of known user IDs for the given platform. Collected from every message that has been processed since the server started. Weixin stores context_tokens keyed by user_id; feishu/wecom track chat_ids via the session binding table in the registry.

Parameters:

  • platform (Symbol, String)

Returns:

  • (Array<String>)


128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/clacky/server/channel/channel_manager.rb', line 128

def known_users(platform)
  platform = platform.to_sym
  adapter  = adapter_for(platform)
  return [] unless adapter

  # Weixin adapter exposes @context_tokens whose keys are user_ids
  if adapter.respond_to?(:context_token_user_ids)
    return adapter.context_token_user_ids
  end

  # Fallback: scan session registry for channel_keys matching this platform.
  # Key formats depend on binding_mode:
  #   :user       → "platform:user:USER_ID"
  #   :chat       → "platform:chat:CHAT_ID"
  #   :chat_user  → "platform:chat:CHAT_ID:user:USER_ID"
  #
  # For send_text we need the chat_id (Feishu/WeCom use chat_id as the
  # receive_id for outbound messages), so we extract the chat portion.
  prefix = "#{platform}:"
  ids = []
  @registry.list.each do |summary|
    @registry.with_session(summary[:id]) do |s|
      (s[:channel_keys] || []).each do |key|
        next unless key.start_with?(prefix)

        remainder = key.sub(prefix, "") # e.g. "chat:OC_ID:user:OU_ID" or "user:UID" or "chat:CID"
        ids << extract_chat_id(remainder)
      end
    end
  end
  ids.compact.uniq
end

#list_sessions(adapter, chat_id) ⇒ Object



600
601
602
603
604
605
606
607
608
609
610
611
612
# File 'lib/clacky/server/channel/channel_manager.rb', line 600

def list_sessions(adapter, chat_id)
  sessions = @registry.list.first(5)
  if sessions.empty?
    adapter.send_text(chat_id, "No sessions available.")
    return
  end
  lines = sessions.each_with_index.map do |s, i|
    name = s[:name].to_s.empty? ? "(unnamed)" : s[:name]
    time = s[:updated_at].to_s[5, 11]&.tr("T", " ") || "-"
    "#{i + 1}. `#{s[:id][0, 8]}` #{name} (#{s[:status]}) #{time}"
  end
  adapter.send_text(chat_id, "Recent sessions:\n#{lines.join("\n")}\n\nUse `/bind <n>` to switch.")
end

#reload_platform(platform, config) ⇒ Object

Hot-reload a single platform adapter with updated config. Stops the existing adapter (if running), then starts a new one if enabled.

Parameters:



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/clacky/server/channel/channel_manager.rb', line 165

def reload_platform(platform, config)
  # Stop existing adapter for this platform
  @mutex.synchronize do
    existing = @adapters.find { |a| a.platform_id == platform }
    if existing
      safe_stop_adapter(existing)
      @adapters.delete(existing)
    end
  end

  # Start new adapter if enabled
  if config.enabled?(platform)
    @channel_config = config
    start_adapter(platform)
    Clacky::Logger.info("[ChannelManager] :#{platform} adapter reloaded")
  else
    Clacky::Logger.info("[ChannelManager] :#{platform} disabled — adapter not started")
  end
end

#resolve_session(event) ⇒ Object



508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
# File 'lib/clacky/server/channel/channel_manager.rb', line 508

def resolve_session(event)
  key = channel_key(event)
  @registry.list.each do |summary|
    found = nil
    @registry.with_session(summary[:id]) { |s| found = s[:channel_keys]&.include?(key) }
    return summary[:id] if found

    # Check evicted channel sessions via persisted channel_info
    next unless summary[:source] == "channel"
    next unless @registry.ensure(summary[:id])
    agent = nil
    @registry.with_session(summary[:id]) { |s| agent = s[:agent] }
    next unless agent&.channel_info
    next unless channel_key_from_info(agent.channel_info) == key
    bind_key_to_session(key, summary[:id])
    return summary[:id]
  end
  nil
rescue StandardError => e
  Clacky::Logger.error("[ChannelManager] Session resolve failed: #{e.message}")
  nil
end

#restore_channel_bindingsObject



668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
# File 'lib/clacky/server/channel/channel_manager.rb', line 668

def restore_channel_bindings
  bound_keys = Set.new
  restored_count = 0
  @registry.list(limit: nil).each do |summary|
    @registry.ensure(summary[:id])
    agent = nil
    @registry.with_session(summary[:id]) { |s| agent = s[:agent] }
    next unless agent&.channel_info

    info = agent.channel_info
    next unless info[:platform] && info[:user_id] && info[:chat_id]

    key = channel_key_from_info(info)

    event = { platform: info[:platform], chat_id: info[:chat_id] }
    ensure_channel_ui_subscribed(summary[:id], event)

    next unless bound_keys.add?(key)
    bind_key_to_session(key, summary[:id])

    Clacky::Logger.info("[ChannelManager] Restored channel binding #{key} -> session #{summary[:id][0, 8]}")
    restored_count += 1
  end
  Clacky::Logger.info("[ChannelManager] Restored #{restored_count} channel binding(s)") if restored_count > 0
end

#route_message(adapter, event) ⇒ Object



234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/clacky/server/channel/channel_manager.rb', line 234

def route_message(adapter, event)
  text  = event[:text]&.strip
  files = event[:files] || []
  return if (text.nil? || text.empty?) && files.empty?

  # Handle built-in commands
  if text&.match?(KNOWN_COMMAND) || text&.match?(/\A([\?h]|help)\z/i)
    handle_command(adapter, event, text)
    return
  end

  session_id = resolve_session(event)
  if session_id
    bind_key_to_session(channel_key(event), session_id)
  else
    session_id = auto_create_session(adapter, event)
  end

  session = @registry.get(session_id)
  unless session
    Clacky::Logger.warn("[ChannelManager] Session #{session_id[0, 8]} not found in registry after create")
    adapter.send_text(event[:chat_id], "Failed to initialize session. Please try again.")
    return
  end

  sub_count = web_ui_for_session_diag(session_id)
  Clacky::Logger.info("[ChannelManager] Routing to session #{session_id[0, 8]} (status=#{session[:status]}, text=#{text.inspect}, channel_subs=#{sub_count})")

  # If session is running, interrupt it automatically (mimics CLI behavior)
  if session[:status] == :running
    Clacky::Logger.info("[ChannelManager] Session busy, interrupting previous task")
    @interrupt_session.call(session_id)
    # Wait briefly for the thread to catch the interrupt and update status
    sleep 0.1
  end

  agent  = session[:agent]
  web_ui = session[:ui]

  # Set channel info on the agent so session context includes platform/sender.
  agent.channel_info = extract_channel_info(event) if agent.respond_to?(:channel_info=)

  # Re-attach channel UI if it was dropped (session was evicted from memory and rebuilt by ensure).
  ensure_channel_ui_subscribed(session_id, event)

  # Update reply context so responses thread under the current message.
  channel_ui_for_session(session_id)&.update_message_context(event)

  # Sync the inbound message to WebUI so it shows up in the browser session.
  # source: :channel prevents the message from being echoed back to the IM channel.
  web_ui&.show_user_message(text, source: :channel) unless text.nil? || text.empty?

  # Start typing keepalive BEFORE sending any message.
  # sendmessage cancels the typing indicator in WeChat protocol,
  # so keepalive must be running when "Thinking..." is sent so it
  # immediately re-asserts the typing state after that message.
  chat_id       = event[:chat_id]
  context_token = event[:context_token]
  adapter.start_typing_keepalive(chat_id, context_token) if adapter.respond_to?(:start_typing_keepalive)

  # Acknowledge to the IM channel only — WebUI doesn't need a "Thinking..." noise.
  adapter.send_text(chat_id, "Thinking...")

  @run_agent_task.call(session_id, agent) do
    begin
      Clacky::Logger.info("[ChannelManager] agent.run START session=#{session_id[0, 8]} text=#{text.inspect}")
      agent.run(text, files: files)
      Clacky::Logger.info("[ChannelManager] agent.run END   session=#{session_id[0, 8]} text=#{text.inspect}")
    rescue StandardError => e
      Clacky::Logger.error("[ChannelManager] agent.run RAISED session=#{session_id[0, 8]} #{e.class}: #{e.message}\n#{e.backtrace.first(8).join("\n")}")
      raise
    ensure
      adapter.stop_typing_keepalive(chat_id) if adapter.respond_to?(:stop_typing_keepalive)
    end
  end
end

#running_platformsArray<Symbol>

Returns platforms currently running.

Returns:

  • (Array<Symbol>)

    platforms currently running



79
80
81
# File 'lib/clacky/server/channel/channel_manager.rb', line 79

def running_platforms
  @mutex.synchronize { @adapters.map(&:platform_id) }
end

#safe_stop_adapter(adapter) ⇒ Object



694
695
696
697
698
# File 'lib/clacky/server/channel/channel_manager.rb', line 694

def safe_stop_adapter(adapter)
  adapter.stop
rescue StandardError => e
  Clacky::Logger.warn("[ChannelManager] Error stopping #{adapter.platform_id}: #{e.message}")
end

#send_to_user(platform, user_id, message) ⇒ Hash?

If no token is found the message cannot be delivered and nil is returned.

For Feishu and WeCom the chat_id / user_id is sufficient — no token needed.

Parameters:

  • platform (Symbol, String)

    e.g. :weixin, :feishu, :wecom

  • user_id (String)

    IM user identifier

  • message (String)

    plain-text (or markdown) message to send

Returns:

  • (Hash, nil)

    adapter result hash, or nil on failure



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/clacky/server/channel/channel_manager.rb', line 105

def send_to_user(platform, user_id, message)
  platform = platform.to_sym
  adapter  = adapter_for(platform)

  unless adapter
    Clacky::Logger.warn("[ChannelManager] send_to_user: no running adapter for :#{platform}")
    return nil
  end

  Clacky::Logger.info("[ChannelManager] send_to_user :#{platform}#{user_id}")
  adapter.send_text(user_id, message)
rescue StandardError => e
  Clacky::Logger.error("[ChannelManager] send_to_user failed: #{e.message}")
  nil
end

#startObject

Start all enabled adapters in background threads. Non-blocking.



52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/clacky/server/channel/channel_manager.rb', line 52

def start
  enabled_platforms = @channel_config.enabled_platforms
  if enabled_platforms.empty?
    Clacky::Logger.info("[ChannelManager] No channels configured — skipping")
    return
  end

  Clacky::Logger.info("[ChannelManager] Starting channels: #{enabled_platforms.join(", ")}")
  @running = true

  restore_channel_bindings

  enabled_platforms.each { |platform| start_adapter(platform) }
end

#start_adapter(platform) ⇒ Object



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/clacky/server/channel/channel_manager.rb', line 186

def start_adapter(platform)
  klass = Adapters.find(platform)
  unless klass
    Clacky::Logger.warn("[ChannelManager] No adapter registered for :#{platform} — skipping")
    return
  end

  raw_config = @channel_config.platform_config(platform)
  Clacky::Logger.info("[ChannelManager] Initializing :#{platform} adapter")
  adapter = klass.new(raw_config)

  errors = adapter.validate_config(raw_config)
  if errors.any?
    Clacky::Logger.warn("[ChannelManager] Config errors for :#{platform}: #{errors.join(", ")}")
    return
  end

  @mutex.synchronize { @adapters << adapter }
  Clacky::Logger.info("[ChannelManager] :#{platform} adapter ready, starting thread")

  thread = Thread.new do
    Thread.current.name = "channel-#{platform}"
    adapter_loop(adapter)
  end

  @adapter_threads << thread
end

#stopObject

Stop all adapters gracefully.



68
69
70
71
72
73
74
75
76
# File 'lib/clacky/server/channel/channel_manager.rb', line 68

def stop
  @running = false
  @mutex.synchronize do
    @adapters.each { |adapter| safe_stop_adapter(adapter) }
    @adapters.clear
  end
  @adapter_threads.each { |t| t.join(1) }
  @adapter_threads.clear
end

#web_ui_for_session_diag(session_id) ⇒ Object



577
578
579
580
581
582
583
584
585
586
587
588
# File 'lib/clacky/server/channel/channel_manager.rb', line 577

def web_ui_for_session_diag(session_id)
  result = nil
  @registry.with_session(session_id) do |s|
    ui = s[:ui]
    result = if ui.respond_to?(:channel_subscribed?)
      ui.instance_variable_get(:@channel_subscribers)&.size || 0
    else
      -1
    end
  end
  result
end