Class: Clacky::Channel::ChannelManager

Inherits:
Object
  • Object
show all
Defined in:
lib/clacky/server/channel/channel_manager.rb

Overview

ChannelManager starts and supervises IM platform adapter threads. When an inbound message arrives it:

1. Resolves (or auto-creates) a Session bound to this IM identity
2. Retrieves the WebUIController for that session
3. Creates a ChannelUIController and subscribes it to the WebUIController
4. Runs the agent task via run_agent_task (same as HttpServer)
5. Unsubscribes the ChannelUIController when the task finishes

Thread model: each adapter runs two long-lived threads (read loop + ping). ChannelManager itself is non-blocking — call #start from HttpServer after the WEBrick server has started.

Session binding: the first message from an IM identity automatically creates a new session and binds it. Users can use /bind <session_id> to switch to an existing WebUI session instead. Bindings are stored in the session registry as :channel_keys => Set of channel key strings. WebUI sessions are persisted by HttpServer — channel adds no extra persistence.

Constant Summary collapse

KNOWN_COMMAND =
%r{\A/(new|clear|model|skills|bind|stop|unbind|status|list)\b}i
COMMAND_HELP =
<<~HELP.strip
  Commands:
    ? / h / help - show this help
    /new / /clear - start a new session
    /model - show current model & available models
    /model <n> - switch to model n
    /skills - list available skills
    /<skill> <args> - invoke a skill directly
    /bind <n|session_id> - switch to a session (use /list to see numbers)
    /unbind - remove binding
    /stop - interrupt current task
    /status - show current binding
    /list - show recent sessions
HELP

Instance Method Summary collapse

Constructor Details

#initialize(session_registry:, session_builder:, run_agent_task:, interrupt_session:, channel_config:, binding_mode: :chat_user) ⇒ ChannelManager

Returns a new instance of ChannelManager.

Parameters:

  • session_registry (Clacky::Server::SessionRegistry)
  • session_builder (Proc)

    (name:, working_dir:) => session_id — from HttpServer

  • run_agent_task (Proc)

    (session_id, agent, &task) — from HttpServer

  • interrupt_session (Proc)

    (session_id) — from HttpServer

  • channel_config (Clacky::ChannelConfig)
  • binding_mode (:user | :chat | :chat_user) (defaults to: :chat_user)

    how to map IM identities to sessions. :chat_user (default) — one session per (chat, user) pair. Most natural:

    private chat = that user's session; in a group each
    user has their own session; the same user across
    different groups keeps those contexts separate.
    

    :chat — one session per chat (all users in a group share it). :user — one session per user (merges DMs and all groups).



37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/clacky/server/channel/channel_manager.rb', line 37

def initialize(session_registry:, session_builder:, run_agent_task:, interrupt_session:, channel_config:, binding_mode: :chat_user)
  @registry          = session_registry
  @session_builder   = session_builder
  @run_agent_task    = run_agent_task
  @interrupt_session = interrupt_session
  @channel_config    = channel_config
  @binding_mode      = binding_mode
  @adapters          = []
  @adapter_threads   = []
  @running           = false
  @mutex             = Mutex.new
  @session_counters  = Hash.new(0)  # platform => count, for short session names
end

Instance Method Details

#adapter_for(platform) ⇒ Object?

Return the currently-live adapter for a given platform, or nil if none running. Thread-safe — acquires @mutex to read from @adapters.

Parameters:

  • platform (Symbol, String)

Returns:

  • (Object, nil)


89
90
91
92
# File 'lib/clacky/server/channel/channel_manager.rb', line 89

def adapter_for(platform)
  platform = platform.to_sym
  @mutex.synchronize { @adapters.find { |a| a.platform_id == platform } }
end

#adapter_loop(adapter) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/clacky/server/channel/channel_manager.rb', line 211

def adapter_loop(adapter)
  Clacky::Logger.info("[ChannelManager] :#{adapter.platform_id} adapter loop started")
  adapter.start do |event|
    summary = event[:text].to_s.lines.first.to_s.strip[0, 80]
    summary = "[image]" if summary.empty? && !event[:files].to_a.empty?
    Clacky::Logger.info("[ChannelManager] :#{adapter.platform_id} message from #{event[:user_id]} in #{event[:chat_id]}: #{summary}")
    route_message(adapter, event)
  rescue StandardError => e
    Clacky::Logger.warn("[ChannelManager] Error routing :#{adapter.platform_id} message: #{e.message}\n#{e.backtrace.first(3).join("\n")}")
    adapter.send_text(event[:chat_id], "Error: #{e.message}")
  end
rescue StandardError => e
  Clacky::Logger.warn("[ChannelManager] :#{adapter.platform_id} adapter crashed: #{e.message}\n#{e.backtrace.first(3).join("\n")}")
  if @running
    Clacky::Logger.info("[ChannelManager] :#{adapter.platform_id} restarting in 5s...")
    sleep 5
    retry
  end
end

#auto_create_session(adapter, event) ⇒ Object



509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
# File 'lib/clacky/server/channel/channel_manager.rb', line 509

def auto_create_session(adapter, event)
  key      = channel_key(event)
  platform = event[:platform].to_s
  count    = @mutex.synchronize { @session_counters[platform] += 1 }
  name     = "#{platform}-#{count}"
  session_id = @session_builder.call(name: name, working_dir: Dir.home, source: :channel)
  bind_key_to_session(key, session_id)

  # Create a long-lived ChannelUIController for this session and subscribe it
  # to the session's WebUIController. It stays for the session's full lifetime
  # so all events (agent output, errors, status) flow through web_ui → channel_ui.
  channel_ui = ChannelUIController.new(event, -> { adapter_for(event[:platform]) })
  @registry.with_session(session_id) do |s|
    s[:ui]&.subscribe_channel(channel_ui)
    s[:channel_ui] = channel_ui
  end

  Clacky::Logger.info("[ChannelManager] Auto-created session #{session_id[0, 8]} for #{key}")
  session_id
end

#bind_key_to_session(key, session_id) ⇒ Object



550
551
552
553
554
555
556
557
558
# File 'lib/clacky/server/channel/channel_manager.rb', line 550

def bind_key_to_session(key, session_id)
  @registry.list.each do |summary|
    @registry.with_session(summary[:id]) { |s| s[:channel_keys]&.delete(key) }
  end
  @registry.with_session(session_id) do |s|
    s[:channel_keys] ||= Set.new
    s[:channel_keys].add(key)
  end
end

#channel_key(event) ⇒ Object



574
575
576
577
578
579
580
581
582
# File 'lib/clacky/server/channel/channel_manager.rb', line 574

def channel_key(event)
  platform = event[:platform].to_s
  case @binding_mode
  when :chat      then "#{platform}:chat:#{event[:chat_id]}"
  when :user      then "#{platform}:user:#{event[:user_id]}"
  else # :chat_user (default)
    "#{platform}:chat:#{event[:chat_id]}:user:#{event[:user_id]}"
  end
end

#channel_ui_for_session(session_id) ⇒ Object

Retrieve the ChannelUIController bound to a session (if any).



531
532
533
534
535
# File 'lib/clacky/server/channel/channel_manager.rb', line 531

def channel_ui_for_session(session_id)
  result = nil
  @registry.with_session(session_id) { |s| result = s[:channel_ui] }
  result
end

#handle_command(adapter, event, text) ⇒ Object



299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
# File 'lib/clacky/server/channel/channel_manager.rb', line 299

def handle_command(adapter, event, text)
  chat_id = event[:chat_id]
  key     = channel_key(event)

  case text
  when /\A([\?h]|help)\z/i
    adapter.send_text(chat_id, COMMAND_HELP)

  when "/new", "/clear"
    session_id = auto_create_session(adapter, event)
    adapter.send_text(chat_id, "New session `#{session_id[0, 8]}` created.") if session_id

  when /\A\/model\b/i
    handle_model_command(adapter, event, text)

  when /\A\/skills\b/i
    handle_skills_command(adapter, event)

  when /\A\/bind\s+(\S+)\z/i
    arg = Regexp.last_match(1)
    # Support numeric index from /list (1-based)
    session_id = if arg =~ /\A\d+\z/
      recent = @registry.list.first(5)
      idx = arg.to_i - 1
      recent[idx]&.fetch(:id, nil)
    else
      arg
    end
    unless session_id && @registry.get(session_id)
      adapter.send_text(chat_id, "Session not found. Use /list to see available sessions.")
      return
    end

    # Detach channel_ui from the old session's web_ui, reattach to the new one.
    old_session_id = resolve_session(event)
    channel_ui = old_session_id ? channel_ui_for_session(old_session_id) : nil

    if channel_ui
      @registry.with_session(old_session_id) { |s| s[:ui]&.unsubscribe_channel(channel_ui); s.delete(:channel_ui) }
    else
      channel_ui = ChannelUIController.new(event, -> { adapter_for(event[:platform]) })
    end

    bind_key_to_session(key, session_id)
    @registry.with_session(session_id) do |s|
      s[:ui]&.subscribe_channel(channel_ui)
      s[:channel_ui] = channel_ui
    end

    Clacky::Logger.info("[ChannelManager] Bound #{key} -> session #{session_id[0, 8]}")
    adapter.send_text(chat_id, "Bound to session `#{session_id[0, 8]}`.")

  when "/stop"
    session_id = resolve_session(event)
    unless session_id
      adapter.send_text(chat_id, "No session bound.")
      return
    end
    @interrupt_session.call(session_id)
    adapter.send_text(chat_id, "Task interrupted.")

  when "/unbind"
    unbound = false
    @registry.list.each do |summary|
      @registry.with_session(summary[:id]) do |s|
        unbound = true if s[:channel_keys]&.delete(key)
      end
    end
    adapter.send_text(chat_id, unbound ? "Unbound." : "No binding found.")

  when "/status"
    session_id = resolve_session(event)
    if session_id
      session = @registry.get(session_id)
      model = session&.dig(:agent)&.current_model_info
      model_name = model&.dig(:model) || "unknown"
      adapter.send_text(chat_id, "Bound to session `#{session_id[0, 8]}` (status: #{session&.dig(:status) || "unknown"}, model: #{model_name})")
    else
      adapter.send_text(chat_id, "No session bound yet. Send any message to auto-create one.")
    end

  when "/list"
    list_sessions(adapter, chat_id)

  else
    adapter.send_text(chat_id, "Unknown command. Type ? for help.")
  end
end

#handle_model_command(adapter, event, text) ⇒ Object



405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
# File 'lib/clacky/server/channel/channel_manager.rb', line 405

def handle_model_command(adapter, event, text)
  chat_id   = event[:chat_id]
  session_id = resolve_session(event)

  unless session_id
    adapter.send_text(chat_id, "No session bound. Send any message to auto-create one first.")
    return
  end

  session = @registry.get(session_id)
  agent = session&.dig(:agent)
  unless agent
    adapter.send_text(chat_id, "Session not ready.")
    return
  end

  arg = text.sub(/\A\/model\s*/i, "").strip

  if arg.empty?
    # Show current model and available list
    info = agent.current_model_info
    current = info&.dig(:model) || "unknown"
    sub     = info&.dig(:sub_model)
    card    = info&.dig(:card_model)
    header  = "Current model: #{current}"
    header += " (#{card} · #{sub})" if card && sub && sub != current
    header += " (#{card})" if card && !sub

    models = agent.available_models
    if models.empty?
      adapter.send_text(chat_id, "#{header}\nNo other models available.")
      return
    end

    lines = models.each_with_index.map do |name, i|
      marker = name == current ? " *" : ""
      "#{i + 1}. #{name}#{marker}"
    end
    adapter.send_text(chat_id, "#{header}\n\nSwitch with /model <n>:\n#{lines.join("\n")}")
  elsif arg =~ /\A\d+\z/
    idx = arg.to_i - 1
    models = agent.config.models
    if idx < 0 || idx >= models.length
      adapter.send_text(chat_id, "Invalid model number. Use /model to see available models.")
      return
    end

    model_id = models[idx]["id"]
    if agent.switch_model_by_id(model_id)
      new_info = agent.current_model_info
      adapter.send_text(chat_id, "Switched to #{new_info&.dig(:model) || model_id}.")
    else
      adapter.send_text(chat_id, "Failed to switch model.")
    end
  else
    adapter.send_text(chat_id, "Usage: /model to list, /model <n> to switch.")
  end
end

#handle_skills_command(adapter, event) ⇒ Object



464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
# File 'lib/clacky/server/channel/channel_manager.rb', line 464

def handle_skills_command(adapter, event)
  chat_id    = event[:chat_id]
  session_id = resolve_session(event)

  unless session_id
    adapter.send_text(chat_id, "No session bound. Send any message to auto-create one first.")
    return
  end

  session = @registry.get(session_id)
  agent = session&.dig(:agent)
  unless agent
    adapter.send_text(chat_id, "Session not ready.")
    return
  end

  skills = agent.skill_loader.user_invocable_skills
    .reject { |s| s.source == :default }
    .first(10)
  if skills.empty?
    adapter.send_text(chat_id, "No skills available.")
    return
  end

  lines = skills.each_with_index.map do |s, i|
    desc = s.description.to_s.strip
    desc = desc.empty? ? "(no description)" : desc.length > 50 ? "#{desc[0..49]}..." : desc
    "#{i + 1}. #{s.name} - #{desc}"
  end
  adapter.send_text(chat_id, "Skills:\n#{lines.join("\n")}")
end

#known_users(platform) ⇒ Array<String>

Return a list of known user IDs for the given platform. Collected from every message that has been processed since the server started. Weixin stores context_tokens keyed by user_id; feishu/wecom track chat_ids via the session binding table in the registry.

Parameters:

  • platform (Symbol, String)

Returns:

  • (Array<String>)


125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/clacky/server/channel/channel_manager.rb', line 125

def known_users(platform)
  platform = platform.to_sym
  adapter  = adapter_for(platform)
  return [] unless adapter

  # Weixin adapter exposes @context_tokens whose keys are user_ids
  if adapter.respond_to?(:context_token_user_ids)
    return adapter.context_token_user_ids
  end

  # Fallback: scan session registry for channel_keys matching this platform.
  # Key formats depend on binding_mode:
  #   :user       → "platform:user:USER_ID"
  #   :chat       → "platform:chat:CHAT_ID"
  #   :chat_user  → "platform:chat:CHAT_ID:user:USER_ID"
  #
  # For send_text we need the chat_id (Feishu/WeCom use chat_id as the
  # receive_id for outbound messages), so we extract the chat portion.
  prefix = "#{platform}:"
  ids = []
  @registry.list.each do |summary|
    @registry.with_session(summary[:id]) do |s|
      (s[:channel_keys] || []).each do |key|
        next unless key.start_with?(prefix)

        remainder = key.sub(prefix, "") # e.g. "chat:OC_ID:user:OU_ID" or "user:UID" or "chat:CID"
        ids << extract_chat_id(remainder)
      end
    end
  end
  ids.compact.uniq
end

#list_sessions(adapter, chat_id) ⇒ Object



560
561
562
563
564
565
566
567
568
569
570
571
572
# File 'lib/clacky/server/channel/channel_manager.rb', line 560

def list_sessions(adapter, chat_id)
  sessions = @registry.list.first(5)
  if sessions.empty?
    adapter.send_text(chat_id, "No sessions available.")
    return
  end
  lines = sessions.each_with_index.map do |s, i|
    name = s[:name].to_s.empty? ? "(unnamed)" : s[:name]
    time = s[:updated_at].to_s[5, 11]&.tr("T", " ") || "-"
    "#{i + 1}. `#{s[:id][0, 8]}` #{name} (#{s[:status]}) #{time}"
  end
  adapter.send_text(chat_id, "Recent sessions:\n#{lines.join("\n")}\n\nUse `/bind <n>` to switch.")
end

#reload_platform(platform, config) ⇒ Object

Hot-reload a single platform adapter with updated config. Stops the existing adapter (if running), then starts a new one if enabled.

Parameters:



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/clacky/server/channel/channel_manager.rb', line 162

def reload_platform(platform, config)
  # Stop existing adapter for this platform
  @mutex.synchronize do
    existing = @adapters.find { |a| a.platform_id == platform }
    if existing
      safe_stop_adapter(existing)
      @adapters.delete(existing)
    end
  end

  # Start new adapter if enabled
  if config.enabled?(platform)
    @channel_config = config
    start_adapter(platform)
    Clacky::Logger.info("[ChannelManager] :#{platform} adapter reloaded")
  else
    Clacky::Logger.info("[ChannelManager] :#{platform} disabled — adapter not started")
  end
end

#resolve_session(event) ⇒ Object



496
497
498
499
500
501
502
503
504
505
506
507
# File 'lib/clacky/server/channel/channel_manager.rb', line 496

def resolve_session(event)
  key = channel_key(event)
  @registry.list.each do |summary|
    found = nil
    @registry.with_session(summary[:id]) { |s| found = s[:channel_keys]&.include?(key) }
    return summary[:id] if found
  end
  nil
rescue StandardError => e
  Clacky::Logger.error("[ChannelManager] Session resolve failed: #{e.message}")
  nil
end

#route_message(adapter, event) ⇒ Object



231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/clacky/server/channel/channel_manager.rb', line 231

def route_message(adapter, event)
  text  = event[:text]&.strip
  files = event[:files] || []
  return if (text.nil? || text.empty?) && files.empty?

  # Handle built-in commands
  if text&.match?(KNOWN_COMMAND) || text&.match?(/\A([\?h]|help)\z/i)
    handle_command(adapter, event, text)
    return
  end

  session_id = resolve_session(event)
  session_id = auto_create_session(adapter, event) unless session_id

  session = @registry.get(session_id)
  unless session
    Clacky::Logger.warn("[ChannelManager] Session #{session_id[0, 8]} not found in registry after create")
    adapter.send_text(event[:chat_id], "Failed to initialize session. Please try again.")
    return
  end

  sub_count = web_ui_for_session_diag(session_id)
  Clacky::Logger.info("[ChannelManager] Routing to session #{session_id[0, 8]} (status=#{session[:status]}, text=#{text.inspect}, channel_subs=#{sub_count})")

  # If session is running, interrupt it automatically (mimics CLI behavior)
  if session[:status] == :running
    Clacky::Logger.info("[ChannelManager] Session busy, interrupting previous task")
    @interrupt_session.call(session_id)
    # Wait briefly for the thread to catch the interrupt and update status
    sleep 0.1
  end

  agent  = session[:agent]
  web_ui = session[:ui]

  # Update reply context so responses thread under the current message.
  # channel_ui is bound to the session for its full lifetime (created in auto_create_session).
  channel_ui_for_session(session_id)&.update_message_context(event)

  # Sync the inbound message to WebUI so it shows up in the browser session.
  # source: :channel prevents the message from being echoed back to the IM channel.
  web_ui&.show_user_message(text, source: :channel) unless text.nil? || text.empty?

  # Start typing keepalive BEFORE sending any message.
  # sendmessage cancels the typing indicator in WeChat protocol,
  # so keepalive must be running when "Thinking..." is sent so it
  # immediately re-asserts the typing state after that message.
  chat_id       = event[:chat_id]
  context_token = event[:context_token]
  adapter.start_typing_keepalive(chat_id, context_token) if adapter.respond_to?(:start_typing_keepalive)

  # Acknowledge to the IM channel only — WebUI doesn't need a "Thinking..." noise.
  adapter.send_text(chat_id, "Thinking...")

  @run_agent_task.call(session_id, agent) do
    begin
      Clacky::Logger.info("[ChannelManager] agent.run START session=#{session_id[0, 8]} text=#{text.inspect}")
      agent.run(text, files: files)
      Clacky::Logger.info("[ChannelManager] agent.run END   session=#{session_id[0, 8]} text=#{text.inspect}")
    rescue StandardError => e
      Clacky::Logger.error("[ChannelManager] agent.run RAISED session=#{session_id[0, 8]} #{e.class}: #{e.message}\n#{e.backtrace.first(8).join("\n")}")
      raise
    ensure
      adapter.stop_typing_keepalive(chat_id) if adapter.respond_to?(:stop_typing_keepalive)
    end
  end
end

#running_platformsArray<Symbol>

Returns platforms currently running.

Returns:

  • (Array<Symbol>)

    platforms currently running



76
77
78
# File 'lib/clacky/server/channel/channel_manager.rb', line 76

def running_platforms
  @mutex.synchronize { @adapters.map(&:platform_id) }
end

#safe_stop_adapter(adapter) ⇒ Object



607
608
609
610
611
# File 'lib/clacky/server/channel/channel_manager.rb', line 607

def safe_stop_adapter(adapter)
  adapter.stop
rescue StandardError => e
  Clacky::Logger.warn("[ChannelManager] Error stopping #{adapter.platform_id}: #{e.message}")
end

#send_to_user(platform, user_id, message) ⇒ Hash?

If no token is found the message cannot be delivered and nil is returned.

For Feishu and WeCom the chat_id / user_id is sufficient — no token needed.

Parameters:

  • platform (Symbol, String)

    e.g. :weixin, :feishu, :wecom

  • user_id (String)

    IM user identifier

  • message (String)

    plain-text (or markdown) message to send

Returns:

  • (Hash, nil)

    adapter result hash, or nil on failure



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/clacky/server/channel/channel_manager.rb', line 102

def send_to_user(platform, user_id, message)
  platform = platform.to_sym
  adapter  = adapter_for(platform)

  unless adapter
    Clacky::Logger.warn("[ChannelManager] send_to_user: no running adapter for :#{platform}")
    return nil
  end

  Clacky::Logger.info("[ChannelManager] send_to_user :#{platform}#{user_id}")
  adapter.send_text(user_id, message)
rescue StandardError => e
  Clacky::Logger.error("[ChannelManager] send_to_user failed: #{e.message}")
  nil
end

#startObject

Start all enabled adapters in background threads. Non-blocking.



52
53
54
55
56
57
58
59
60
61
62
# File 'lib/clacky/server/channel/channel_manager.rb', line 52

def start
  enabled_platforms = @channel_config.enabled_platforms
  if enabled_platforms.empty?
    Clacky::Logger.info("[ChannelManager] No channels configured — skipping")
    return
  end

  Clacky::Logger.info("[ChannelManager] Starting channels: #{enabled_platforms.join(", ")}")
  @running = true
  enabled_platforms.each { |platform| start_adapter(platform) }
end

#start_adapter(platform) ⇒ Object



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/clacky/server/channel/channel_manager.rb', line 183

def start_adapter(platform)
  klass = Adapters.find(platform)
  unless klass
    Clacky::Logger.warn("[ChannelManager] No adapter registered for :#{platform} — skipping")
    return
  end

  raw_config = @channel_config.platform_config(platform)
  Clacky::Logger.info("[ChannelManager] Initializing :#{platform} adapter")
  adapter = klass.new(raw_config)

  errors = adapter.validate_config(raw_config)
  if errors.any?
    Clacky::Logger.warn("[ChannelManager] Config errors for :#{platform}: #{errors.join(", ")}")
    return
  end

  @mutex.synchronize { @adapters << adapter }
  Clacky::Logger.info("[ChannelManager] :#{platform} adapter ready, starting thread")

  thread = Thread.new do
    Thread.current.name = "channel-#{platform}"
    adapter_loop(adapter)
  end

  @adapter_threads << thread
end

#stopObject

Stop all adapters gracefully.



65
66
67
68
69
70
71
72
73
# File 'lib/clacky/server/channel/channel_manager.rb', line 65

def stop
  @running = false
  @mutex.synchronize do
    @adapters.each { |adapter| safe_stop_adapter(adapter) }
    @adapters.clear
  end
  @adapter_threads.each { |t| t.join(1) }
  @adapter_threads.clear
end

#web_ui_for_session_diag(session_id) ⇒ Object



537
538
539
540
541
542
543
544
545
546
547
548
# File 'lib/clacky/server/channel/channel_manager.rb', line 537

def web_ui_for_session_diag(session_id)
  result = nil
  @registry.with_session(session_id) do |s|
    ui = s[:ui]
    result = if ui.respond_to?(:channel_subscribed?)
      ui.instance_variable_get(:@channel_subscribers)&.size || 0
    else
      -1
    end
  end
  result
end