Class: Octo::Server::HttpServer
- Inherits:
-
Object
- Object
- Octo::Server::HttpServer
- Defined in:
- lib/octo/server/http_server.rb
Overview
HttpServer runs an embedded WEBrick HTTP server with WebSocket support.
Routes:
GET /ws → WebSocket upgrade (all real-time communication)
* /api/* → JSON REST API (sessions, tasks, schedules)
GET /** → static files served from lib/octo/web/ directory
Defined Under Namespace
Classes: WebSocketConnection
Constant Summary collapse
- WEB_ROOT =
File.("../web", __dir__)
- DEFAULT_SOUL_MD =
Default SOUL.md written when the user skips the onboard conversation. A richer version is created by the Agent during the soul_setup phase.
<<~MD.freeze # Octo — Agent Soul You are Octo, a friendly and capable AI coding assistant and technical co-founder. You are sharp, concise, and proactive. You speak plainly and avoid unnecessary formality. You love helping people ship great software. ## Personality - Warm and encouraging, but direct and honest - Think step-by-step before acting; explain your reasoning briefly - Prefer doing over talking — use tools, write code, ship results - Adapt your language and tone to match the user's style ## Strengths - Full-stack software development (Ruby, Python, JS, and more) - Architectural thinking and code review - Debugging tricky problems with patience and creativity - Breaking big goals into small, executable steps MD
- DEFAULT_SOUL_MD_ZH =
Default SOUL.md for Chinese-language users.
<<~MD.freeze # Octo — 助手灵魂 你是 Octo,一位友好、能干的 AI 编程助手和技术联合创始人。 你思维敏锐、言简意赅、主动积极。你说话直接,不喜欢过度客套。 你热爱帮助用户打造优秀的软件产品。 **重要:始终用中文回复用户。** ## 性格特点 - 热情鼓励,但直接诚实 - 行动前先思考;简要说明你的推理过程 - 重行动而非空谈 —— 善用工具,写代码,交付结果 - 根据用户的风格调整语气和表达方式 ## 核心能力 - 全栈软件开发(Ruby、Python、JS 等) - 架构设计与代码审查 - 耐心细致地调试复杂问题 - 将大目标拆解为可执行的小步骤 MD
- PROFILE_USER_AGENTS_DIR =
── Profile API (USER.md / SOUL.md) ──────────────────────────────
User can override the built-in defaults by writing their own ~/.octo/agents/USER.md and ~/.octo/agents/SOUL.md. These endpoints let the Web UI read and edit those files.
File.("~/.octo/agents").freeze
- PROFILE_DEFAULT_AGENTS_DIR =
File.("../../default_agents", __dir__).freeze
- PROFILE_MAX_BYTES =
Hard limit; prevents runaway content.
50_000- MEMORIES_DIR =
── Memories API (~/.octo/memories/*.md) ───────────────────────
Long-term memories are plain Markdown files with YAML frontmatter stored under ~/.octo/memories/. These endpoints let the user inspect, edit, create, and delete them from the Web UI.
File.("~/.octo/memories").freeze
- MEMORY_MAX_BYTES =
50_000
Instance Method Summary collapse
- #_dispatch_rest(req, res) ⇒ Object
-
#api_add_model(req, res) ⇒ Object
POST /api/config/models Body: { model, base_url, api_key, anthropic_format, type? } Creates a new model entry, returns { ok:true, id, index } so the frontend can record the new id without reloading the whole list.
-
#api_benchmark_session_models(session_id, _req, res) ⇒ Object
POST /api/sessions/:id/benchmark.
-
#api_browser_configure(req, res) ⇒ Object
POST /api/browser/configure Called by browser-setup skill to write browser.yml and hot-reload the daemon.
-
#api_browser_reload(res) ⇒ Object
POST /api/browser/reload Called by browser-setup skill after writing browser.yml.
-
#api_browser_status(res) ⇒ Object
GET /api/browser/status Returns real daemon liveness from BrowserManager (not just yml read).
-
#api_browser_toggle(res) ⇒ Object
POST /api/browser/toggle.
- #api_change_session_working_dir(session_id, req, res) ⇒ Object
-
#api_create_cron_task(req, res) ⇒ Object
POST /api/cron-tasks — create task file + schedule in one step Body: { name, content, cron, enabled? }.
- #api_create_session(req, res) ⇒ Object
-
#api_delete_channel(platform, res) ⇒ Object
DELETE /api/channels/:platform Disables the platform (keeps credentials, sets enabled: false).
-
#api_delete_cron_task(name, res) ⇒ Object
DELETE /api/cron-tasks/:name — remove task file + schedule.
-
#api_delete_model(id, res) ⇒ Object
DELETE /api/config/models/:id.
- #api_delete_session(session_id, res) ⇒ Object
-
#api_export_session(session_id, res) ⇒ Object
Export a session bundle as a .zip download containing: - session.json (always) - chunk-*.md (0..N archived conversation chunks) - logs/octo-YYYY-MM-DD.log (today’s logger file, if present) Useful for debugging — user clicks “download” in the WebUI status bar and we can ask them to attach the zip to a bug report.
-
#api_file_action(req, res) ⇒ Object
POST /api/file-action Unified file action endpoint — open locally or download.
-
#api_get_config(res) ⇒ Object
GET /api/config — return current model configurations.
-
#api_get_version(res) ⇒ Object
GET /api/version Returns current version and latest version from RubyGems (cached for 1 hour).
-
#api_list_channel_users(platform, res) ⇒ Object
GET /api/channels/:platform/users Returns the list of known user IDs for the given platform.
- #api_list_channels(res) ⇒ Object
-
#api_list_cron_tasks(res) ⇒ Object
GET /api/cron-tasks.
-
#api_list_providers(res) ⇒ Object
GET /api/providers — return built-in provider presets for quick setup.
-
#api_list_sessions(req, res) ⇒ Object
── REST API ──────────────────────────────────────────────────────────────.
-
#api_list_skills(res) ⇒ Object
GET /api/skills — list all loaded skills with metadata.
-
#api_onboard_complete(req, res) ⇒ Object
POST /api/onboard/complete Called after key setup is done (soul_setup is optional/skipped).
-
#api_onboard_skip_soul(req, res) ⇒ Object
POST /api/onboard/skip-soul Writes a minimal SOUL.md so the soul_setup phase is not re-triggered on the next server start when the user chooses to skip the conversation.
-
#api_onboard_status(res) ⇒ Object
GET /api/onboard/status Phase “key_setup” → no API key configured yet Phase “soul_setup” → key configured, but ~/.octo/agents/SOUL.md missing needs_onboard: false → fully set up.
- #api_rename_session(session_id, req, res) ⇒ Object
-
#api_restart(req, res) ⇒ Object
POST /api/restart Re-execs the current process so the newly installed gem version is loaded.
-
#api_run_cron_task(name, res) ⇒ Object
POST /api/cron-tasks/:name/run — execute immediately.
-
#api_save_channel(platform, req, res) ⇒ Object
POST /api/channels/:platform Body: { fields… } (platform-specific credential fields) Saves credentials and optionally (re)starts the adapter.
-
#api_send_channel_message(platform, req, res) ⇒ Object
POST /api/channels/:platform/send Proactively send a message to a user via the given IM platform.
-
#api_serve_local_image(req, res) ⇒ Object
GET /api/local-image?path=file:///path/to/image.png GET /api/local-image?path=/path/to/image.png.
-
#api_session_messages(session_id, req, res) ⇒ Object
GET /api/sessions/:id/messages?limit=20&before=1709123456.789 Replays conversation history for a session via the agent’s replay_history method.
-
#api_session_skills(session_id, res) ⇒ Object
GET /api/sessions/:id/skills — list user-invocable skills for a session, filtered by the session’s agent profile.
-
#api_set_default_model(id, res) ⇒ Object
POST /api/config/models/:id/default Makes the identified model the new “default” (global initial model for new sessions AND current model for this server instance).
- #api_switch_session_model(session_id, req, res) ⇒ Object
-
#api_switch_session_reasoning_effort(session_id, req, res) ⇒ Object
PATCH /api/sessions/:id/reasoning_effort Body: { “reasoning_effort”: “off” | “low” | “medium” | “high” }.
-
#api_test_channel(platform, req, res) ⇒ Object
POST /api/channels/:platform/test Body: { fields… } (credentials to test — NOT saved) Tests connectivity using the provided credentials without persisting.
-
#api_test_config(req, res) ⇒ Object
POST /api/config/test — test connection for a single model config Body: { model, base_url, api_key, anthropic_format }.
-
#api_toggle_channel(platform, req, res) ⇒ Object
PATCH /api/channels/:platform/enabled Body: { enabled: true|false } Toggles the platform on/off without touching credentials.
-
#api_toggle_skill(name, req, res) ⇒ Object
PATCH /api/skills/:name/toggle — enable or disable a skill Body: { enabled: true/false }.
-
#api_tool_browser(req, res) ⇒ Object
GET /api/channels Returns current config and running status for all supported platforms.
-
#api_update_cron_task(name, req, res) ⇒ Object
PATCH /api/cron-tasks/:name — update content and/or cron/enabled Body: { content?, cron?, enabled? }.
-
#api_update_model(id, req, res) ⇒ Object
PATCH /api/config/models/:id Body: any subset of { model, base_url, api_key, anthropic_format, type } Rules (the whole reason we moved off bulk save): - Missing key → field untouched - api_key with “****” (masked display value) → IGNORED (never overwrites) - api_key empty string → IGNORED (defensive; treat as “not changed”) - api_key real non-masked value → stored - type=“default” transparently clears the marker on other models - Unknown id → 404.
-
#api_upgrade_version(req, res) ⇒ Object
POST /api/version/upgrade Upgrades octo in a background thread, streaming output via WebSocket broadcast.
-
#api_upload_file(req, res) ⇒ Object
POST /api/upload Accepts a multipart/form-data file upload (field name: “file”).
-
#broadcast(session_id, event) ⇒ Object
Broadcast an event to all clients subscribed to a session.
-
#broadcast_all(event) ⇒ Object
Broadcast an event to every connected client (regardless of session subscription).
-
#broadcast_session_update(session_id) ⇒ Object
Broadcast a session_update event to all clients so they can patch their local session list without needing a full session_list refresh.
-
#build_session(name:, working_dir:, permission_mode: :confirm_all, profile: "general", source: :manual, model_id: nil) ⇒ Object
Create a session in the registry and wire up Agent + WebUIController.
-
#build_session_from_data(session_data, permission_mode: :confirm_all) ⇒ Object
Restore a persisted session from saved session_data (from SessionManager).
-
#create_default_session ⇒ Object
Auto-restore persisted sessions (or create a fresh default) when the server starts.
-
#default_working_dir ⇒ Object
── Helpers ───────────────────────────────────────────────────────────────.
- #deliver_confirmation(session_id, conf_id, result) ⇒ Object
-
#dispatch(req, res) ⇒ Object
── Router ────────────────────────────────────────────────────────────────.
-
#handle_user_message(session_id, content, files = []) ⇒ Object
── Session actions ───────────────────────────────────────────────────────.
-
#handle_websocket(req, res) ⇒ Object
Hijacks the TCP socket from WEBrick and upgrades it to WebSocket.
-
#initialize(host: "127.0.0.1", port: 8888, agent_config:, client_factory:, sessions_dir: nil, socket: nil, master_pid: nil) ⇒ HttpServer
constructor
A new instance of HttpServer.
-
#interrupt_session(session_id) ⇒ Object
Interrupt a running agent session.
- #json_response(res, status, data) ⇒ Object
-
#mask_api_key(key) ⇒ Object
Mask API key for display: show first 8 + last 4 chars, middle replaced with **** Mask an api_key for safe display / transport to the browser.
- #not_found(res) ⇒ Object
- #on_ws_close(conn) ⇒ Object
- #on_ws_message(conn, raw) ⇒ Object
- #on_ws_open(conn) ⇒ Object
- #parse_json_body(req) ⇒ Object
-
#serve_file_download(res, path) ⇒ Object
Stream a file to the client as a download.
- #start ⇒ Object
-
#start_pending_task(session_id) ⇒ Object
Start the pending task for a session.
-
#subscribe(session_id, conn) ⇒ Object
── WebSocket subscription management ─────────────────────────────────────.
- #unsubscribe(conn) ⇒ Object
- #unsubscribe_all(session_id) ⇒ Object
-
#websocket_upgrade?(req) ⇒ Boolean
── WebSocket ─────────────────────────────────────────────────────────────.
Constructor Details
#initialize(host: "127.0.0.1", port: 8888, agent_config:, client_factory:, sessions_dir: nil, socket: nil, master_pid: nil) ⇒ HttpServer
Returns a new instance of HttpServer.
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/octo/server/http_server.rb', line 171 def initialize(host: "127.0.0.1", port: 8888, agent_config:, client_factory:, sessions_dir: nil, socket: nil, master_pid: nil) @host = host @port = port @agent_config = agent_config @client_factory = client_factory # callable: -> { Octo::Client.new(...) } @inherited_socket = socket # TCPServer socket passed from Master (nil = standalone mode) @master_pid = master_pid # Master PID so we can send USR1 on upgrade/restart # Capture the absolute path of the entry script and original ARGV at startup, # so api_restart can re-exec the correct binary even if cwd changes later. @restart_script = File.($0) @restart_argv = ARGV.dup @session_manager = Octo::SessionManager.new(sessions_dir: sessions_dir) @registry = SessionRegistry.new( session_manager: @session_manager, session_restorer: method(:build_session_from_data), agent_config: @agent_config ) @ws_clients = {} # session_id => [WebSocketConnection, ...] @all_ws_conns = [] # every connected WS client, regardless of session subscription @ws_mutex = Mutex.new # Version cache: { latest: "x.y.z", checked_at: Time } @version_cache = nil @version_mutex = Mutex.new @scheduler = Scheduler.new( session_registry: @registry, session_builder: method(:build_session), task_runner: method(:run_agent_task) ) @channel_manager = Octo::Channel::ChannelManager.new( session_registry: @registry, session_builder: method(:build_session), run_agent_task: method(:run_agent_task), interrupt_session: method(:interrupt_session), channel_config: Octo::ChannelConfig.load ) @browser_manager = Octo::BrowserManager.instance @skill_loader = Octo::SkillLoader.new(working_dir: nil) # Access key authentication: # - localhost (127.0.0.1 / ::1) is always trusted; auth is skipped entirely. # - Any other bind address requires OCTO_ACCESS_KEY env var. @localhost_only = local_host?(@host) @access_key = @localhost_only ? nil : resolve_access_key @auth_failures = {} @auth_failures_mutex = Mutex.new if @localhost_only Octo::Logger.info("[HttpServer] Localhost mode — authentication disabled") else Octo::Logger.info("[HttpServer] Public mode — access key authentication ENABLED") end end |
Instance Method Details
#_dispatch_rest(req, res) ⇒ Object
368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 |
# File 'lib/octo/server/http_server.rb', line 368 def _dispatch_rest(req, res) path = req.path method = req.request_method case [method, path] when ["GET", "/api/sessions"] then api_list_sessions(req, res) when ["POST", "/api/sessions"] then api_create_session(req, res) when ["GET", "/api/cron-tasks"] then api_list_cron_tasks(res) when ["POST", "/api/cron-tasks"] then api_create_cron_task(req, res) when ["GET", "/api/skills"] then api_list_skills(res) when ["GET", "/api/config"] then api_get_config(res) when ["POST", "/api/config/models"] then api_add_model(req, res) when ["POST", "/api/config/test"] then api_test_config(req, res) when ["GET", "/api/providers"] then api_list_providers(res) when ["GET", "/api/onboard/status"] then api_onboard_status(res) when ["GET", "/api/browser/status"] then api_browser_status(res) when ["POST", "/api/browser/configure"] then api_browser_configure(req, res) when ["POST", "/api/browser/reload"] then api_browser_reload(res) when ["POST", "/api/browser/toggle"] then api_browser_toggle(res) when ["POST", "/api/onboard/complete"] then api_onboard_complete(req, res) when ["POST", "/api/onboard/skip-soul"] then api_onboard_skip_soul(req, res) when ["GET", "/api/trash"] then api_trash(req, res) when ["POST", "/api/trash/restore"] then api_trash_restore(req, res) when ["DELETE", "/api/trash"] then api_trash_delete(req, res) when ["GET", "/api/profile"] then api_profile_get(res) when ["PUT", "/api/profile"] then api_profile_put(req, res) when ["GET", "/api/memories"] then api_memories_list(res) when ["POST", "/api/memories"] then api_memories_create(req, res) when ["GET", "/api/channels"] then api_list_channels(res) when ["POST", "/api/tool/browser"] then api_tool_browser(req, res) when ["POST", "/api/upload"] then api_upload_file(req, res) when ["POST", "/api/file-action"] then api_file_action(req, res) when ["GET", "/api/local-image"] then api_serve_local_image(req, res) when ["GET", "/api/version"] then api_get_version(res) when ["POST", "/api/version/upgrade"] then api_upgrade_version(req, res) when ["POST", "/api/restart"] then api_restart(req, res) when ["PATCH", "/api/sessions/:id/model"] then api_switch_session_model(req, res) when ["PATCH", "/api/sessions/:id/working_dir"] then api_change_session_working_dir(req, res) else if method == "POST" && path.match?(%r{^/api/channels/[^/]+/send$}) platform = path.sub("/api/channels/", "").sub("/send", "") (platform, req, res) elsif method == "GET" && path.match?(%r{^/api/channels/[^/]+/users$}) platform = path.sub("/api/channels/", "").sub("/users", "") api_list_channel_users(platform, res) elsif method == "POST" && path.match?(%r{^/api/channels/[^/]+/test$}) platform = path.sub("/api/channels/", "").sub("/test", "") api_test_channel(platform, req, res) elsif method == "PATCH" && path.match?(%r{^/api/channels/[^/]+/enabled$}) platform = path.sub("/api/channels/", "").sub("/enabled", "") api_toggle_channel(platform, req, res) elsif method == "POST" && path.start_with?("/api/channels/") platform = path.sub("/api/channels/", "") api_save_channel(platform, req, res) elsif method == "DELETE" && path.start_with?("/api/channels/") platform = path.sub("/api/channels/", "") api_delete_channel(platform, res) elsif method == "GET" && path.match?(%r{^/api/sessions/[^/]+/skills$}) session_id = path.sub("/api/sessions/", "").sub("/skills", "") api_session_skills(session_id, res) elsif method == "GET" && path.match?(%r{^/api/sessions/[^/]+/export$}) session_id = path.sub("/api/sessions/", "").sub("/export", "") api_export_session(session_id, res) elsif method == "GET" && path.match?(%r{^/api/sessions/[^/]+/messages$}) session_id = path.sub("/api/sessions/", "").sub("/messages", "") (session_id, req, res) elsif method == "PATCH" && path.match?(%r{^/api/sessions/[^/]+$}) session_id = path.sub("/api/sessions/", "") api_rename_session(session_id, req, res) elsif method == "PATCH" && path.match?(%r{^/api/sessions/[^/]+/model$}) session_id = path.sub("/api/sessions/", "").sub("/model", "") api_switch_session_model(session_id, req, res) elsif method == "PATCH" && path.match?(%r{^/api/sessions/[^/]+/reasoning_effort$}) session_id = path.sub("/api/sessions/", "").sub("/reasoning_effort", "") api_switch_session_reasoning_effort(session_id, req, res) elsif method == "POST" && path.match?(%r{^/api/sessions/[^/]+/benchmark$}) session_id = path.sub("/api/sessions/", "").sub("/benchmark", "") api_benchmark_session_models(session_id, req, res) elsif method == "PATCH" && path.match?(%r{^/api/sessions/[^/]+/working_dir$}) session_id = path.sub("/api/sessions/", "").sub("/working_dir", "") api_change_session_working_dir(session_id, req, res) elsif method == "DELETE" && path.start_with?("/api/sessions/") session_id = path.sub("/api/sessions/", "") api_delete_session(session_id, res) elsif method == "POST" && path.match?(%r{^/api/config/models/[^/]+/default$}) id = path.sub("/api/config/models/", "").sub("/default", "") api_set_default_model(id, res) elsif method == "PATCH" && path.match?(%r{^/api/config/models/[^/]+$}) id = path.sub("/api/config/models/", "") api_update_model(id, req, res) elsif method == "DELETE" && path.match?(%r{^/api/config/models/[^/]+$}) id = path.sub("/api/config/models/", "") api_delete_model(id, res) elsif method == "POST" && path.match?(%r{^/api/cron-tasks/[^/]+/run$}) name = URI.decode_www_form_component(path.sub("/api/cron-tasks/", "").sub("/run", "")) api_run_cron_task(name, res) elsif method == "PATCH" && path.match?(%r{^/api/cron-tasks/[^/]+$}) name = URI.decode_www_form_component(path.sub("/api/cron-tasks/", "")) api_update_cron_task(name, req, res) elsif method == "DELETE" && path.match?(%r{^/api/cron-tasks/[^/]+$}) name = URI.decode_www_form_component(path.sub("/api/cron-tasks/", "")) api_delete_cron_task(name, res) elsif method == "PATCH" && path.match?(%r{^/api/skills/[^/]+/toggle$}) name = URI.decode_www_form_component(path.sub("/api/skills/", "").sub("/toggle", "")) api_toggle_skill(name, req, res) elsif method == "GET" && path.match?(%r{^/api/memories/[^/]+$}) filename = URI.decode_www_form_component(path.sub("/api/memories/", "")) api_memories_get(filename, res) elsif method == "PUT" && path.match?(%r{^/api/memories/[^/]+$}) filename = URI.decode_www_form_component(path.sub("/api/memories/", "")) api_memories_update(filename, req, res) elsif method == "DELETE" && path.match?(%r{^/api/memories/[^/]+$}) filename = URI.decode_www_form_component(path.sub("/api/memories/", "")) api_memories_delete(filename, res) else not_found(res) end end end |
#api_add_model(req, res) ⇒ Object
POST /api/config/models Body: { model, base_url, api_key, anthropic_format, type? } Creates a new model entry, returns { ok:true, id, index } so the frontend can record the new id without reloading the whole list.
2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 |
# File 'lib/octo/server/http_server.rb', line 2110 def api_add_model(req, res) body = parse_json_body(req) return json_response(res, 400, { error: "Invalid JSON" }) unless body model = body["model"].to_s.strip base_url = body["base_url"].to_s.strip api_key = body["api_key"].to_s # Masked placeholders are never a valid api_key on creation — # a new model MUST come with a real key. if api_key.empty? || api_key.include?("****") return json_response(res, 422, { error: "api_key is required" }) end entry = { "id" => SecureRandom.uuid, "model" => model, "base_url" => base_url, "api_key" => api_key, "anthropic_format" => body["anthropic_format"] || false } type = body["type"].to_s unless type.empty? # Preserve the single-slot "default" invariant. if type == "default" @agent_config.models.each { |m| m.delete("type") if m["type"] == "default" } end entry["type"] = type end @agent_config.models << entry # If this is the only model and no default marker exists yet, # adopt it as the default so downstream lookups resolve cleanly. if @agent_config.models.none? { |m| m["type"] == "default" } entry["type"] = "default" @agent_config.current_model_id = entry["id"] @agent_config.current_model_index = @agent_config.models.length - 1 elsif type == "default" # Re-anchor current_* to the newly-defaulted entry. @agent_config.current_model_id = entry["id"] @agent_config.current_model_index = @agent_config.models.length - 1 end @agent_config.save json_response(res, 200, { ok: true, id: entry["id"], index: @agent_config.models.length - 1 }) rescue => e json_response(res, 422, { error: e. }) end |
#api_benchmark_session_models(session_id, _req, res) ⇒ Object
POST /api/sessions/:id/benchmark
Speed-test every configured model in one shot so the user can pick the fastest available model for this session. We send a minimal one-token request to each model *in parallel* (one thread per model) and measure total HTTP duration — for non-streaming calls this equals the user’s perceived time-to-first-token, so the field is named ‘ttft_ms` for forward-compatibility with a future streaming implementation.
Cost note: each request is ‘max_tokens: 1` + a 2-byte prompt, so the total cost across a dozen models is well under one cent.
Response shape:
{
ok: true,
results: [
{ model_id: "...", model: "...", ttft_ms: 812, ok: true },
{ model_id: "...", model: "...", ok: false, error: "timeout" },
...
]
}
2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 |
# File 'lib/octo/server/http_server.rb', line 2463 def api_benchmark_session_models(session_id, _req, res) return json_response(res, 404, { error: "Session not found" }) unless @registry.ensure(session_id) # Snapshot the models list — @agent_config.models is a shared reference # that the user might mutate from the settings panel during the test; # a shallow dup is enough since we only read string fields below. models = Array(@agent_config.models).dup return json_response(res, 200, { ok: true, results: [] }) if models.empty? # Kick off one thread per model. We deliberately cap per-request wall # time inside each thread via a Faraday timeout so a single dead model # can't block the response. The outer join uses a generous ceiling # (timeout + small buffer) as a last-resort safety net. per_model_timeout = 15 threads = models.map do |m| Thread.new do Thread.current.report_on_exception = false benchmark_single_model(m, per_model_timeout) end end results = threads.map do |t| t.join(per_model_timeout + 3) t.value rescue { ok: false, error: "thread failed" } end json_response(res, 200, { ok: true, results: results }) rescue => e Octo::Logger.error("[benchmark] #{e.class}: #{e.}", error: e) json_response(res, 500, { error: e. }) end |
#api_browser_configure(req, res) ⇒ Object
POST /api/browser/configure Called by browser-setup skill to write browser.yml and hot-reload the daemon. Body: { chrome_version: “146” }
608 609 610 611 612 613 614 615 616 617 |
# File 'lib/octo/server/http_server.rb', line 608 def api_browser_configure(req, res) body = JSON.parse(req.body.to_s) rescue {} chrome_version = body["chrome_version"].to_s.strip return json_response(res, 422, { ok: false, error: "chrome_version is required" }) if chrome_version.empty? @browser_manager.configure(chrome_version: chrome_version) json_response(res, 200, { ok: true }) rescue StandardError => e json_response(res, 500, { ok: false, error: e. }) end |
#api_browser_reload(res) ⇒ Object
POST /api/browser/reload Called by browser-setup skill after writing browser.yml. Hot-reloads the MCP daemon with the new configuration.
622 623 624 625 626 627 |
# File 'lib/octo/server/http_server.rb', line 622 def api_browser_reload(res) @browser_manager.reload json_response(res, 200, { ok: true }) rescue StandardError => e json_response(res, 500, { ok: false, error: e. }) end |
#api_browser_status(res) ⇒ Object
GET /api/browser/status Returns real daemon liveness from BrowserManager (not just yml read).
601 602 603 |
# File 'lib/octo/server/http_server.rb', line 601 def api_browser_status(res) json_response(res, 200, @browser_manager.status) end |
#api_browser_toggle(res) ⇒ Object
POST /api/browser/toggle
630 631 632 633 634 635 |
# File 'lib/octo/server/http_server.rb', line 630 def api_browser_toggle(res) enabled = @browser_manager.toggle json_response(res, 200, { ok: true, enabled: enabled }) rescue StandardError => e json_response(res, 500, { ok: false, error: e. }) end |
#api_change_session_working_dir(session_id, req, res) ⇒ Object
2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 |
# File 'lib/octo/server/http_server.rb', line 2533 def api_change_session_working_dir(session_id, req, res) body = parse_json_body(req) new_dir = body["working_dir"].to_s.strip return json_response(res, 400, { error: "working_dir is required" }) if new_dir.empty? return json_response(res, 404, { error: "Session not found" }) unless @registry.ensure(session_id) # Expand ~ to home directory = File.(new_dir) # Validate directory exists unless Dir.exist?() return json_response(res, 400, { error: "Directory does not exist: #{}" }) end agent = nil @registry.with_session(session_id) { |s| agent = s[:agent] } # Change the agent's working directory agent.change_working_dir() # Persist the change @session_manager.save(agent.to_session_data) # Broadcast update to all clients broadcast_session_update(session_id) json_response(res, 200, { ok: true, working_dir: }) rescue => e json_response(res, 500, { error: e. }) end |
#api_create_cron_task(req, res) ⇒ Object
POST /api/cron-tasks — create task file + schedule in one step Body: { name, content, cron, enabled? }
1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 |
# File 'lib/octo/server/http_server.rb', line 1444 def api_create_cron_task(req, res) body = parse_json_body(req) name = body["name"].to_s.strip content = body["content"].to_s cron = body["cron"].to_s.strip enabled = body.key?("enabled") ? body["enabled"] : true return json_response(res, 422, { error: "name is required" }) if name.empty? return json_response(res, 422, { error: "content is required" }) if content.empty? return json_response(res, 422, { error: "cron is required" }) if cron.empty? fields = cron.strip.split(/\s+/) unless fields.size == 5 return json_response(res, 422, { error: "cron must have 5 fields (min hour dom month dow)" }) end @scheduler.create_cron_task(name: name, content: content, cron: cron, enabled: enabled) json_response(res, 201, { ok: true, name: name }) end |
#api_create_session(req, res) ⇒ Object
519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 |
# File 'lib/octo/server/http_server.rb', line 519 def api_create_session(req, res) body = parse_json_body(req) name = body["name"] return json_response(res, 400, { error: "name is required" }) if name.nil? || name.strip.empty? # Optional agent_profile; defaults to "general" if omitted or invalid profile = body["agent_profile"].to_s.strip profile = "general" if profile.empty? # Optional source; defaults to :manual. Accept "system" for skill-launched sessions # (e.g. /onboard, /browser-setup, /channel-manager). raw_source = body["source"].to_s.strip source = %w[manual cron channel setup].include?(raw_source) ? raw_source.to_sym : :manual raw_dir = body["working_dir"].to_s.strip working_dir = raw_dir.empty? ? default_working_dir : File.(raw_dir) # Optional model override — passed as a stable model id (matches the # id returned by GET /api/config). Name-based override was removed: # a bare model name can't disambiguate between entries from different # providers (e.g. "deepseek-v4-pro" on DeepSeek direct vs its dsk-* # alias on Octo/Bedrock), and mutating current_model["model"] # kept the wrong api_key / base_url / api format, producing # "unknown model" errors at the provider. model_id_override = body["model_id"].to_s.strip model_id_override = nil if model_id_override.empty? if model_id_override && !@agent_config.models.any? { |m| m["id"] == model_id_override } return json_response(res, 400, { error: "Model not found in configuration" }) end # Create working directory if it doesn't exist # Allow multiple sessions in the same directory FileUtils.mkdir_p(working_dir) session_id = build_session(name: name, working_dir: working_dir, profile: profile, source: source, model_id: model_id_override) broadcast_session_update(session_id) json_response(res, 201, { session: @registry.session_summary(session_id) }) end |
#api_delete_channel(platform, res) ⇒ Object
DELETE /api/channels/:platform Disables the platform (keeps credentials, sets enabled: false).
1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 |
# File 'lib/octo/server/http_server.rb', line 1328 def api_delete_channel(platform, res) platform = platform.to_sym config = Octo::ChannelConfig.load config.disable_platform(platform) config.save @channel_manager.reload_platform(platform, config) json_response(res, 200, { ok: true }) rescue StandardError => e json_response(res, 422, { ok: false, error: e. }) end |
#api_delete_cron_task(name, res) ⇒ Object
DELETE /api/cron-tasks/:name — remove task file + schedule
1483 1484 1485 1486 1487 1488 1489 |
# File 'lib/octo/server/http_server.rb', line 1483 def api_delete_cron_task(name, res) if @scheduler.delete_cron_task(name) json_response(res, 200, { ok: true }) else json_response(res, 404, { error: "Cron task not found: #{name}" }) end end |
#api_delete_model(id, res) ⇒ Object
DELETE /api/config/models/:id
2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 |
# File 'lib/octo/server/http_server.rb', line 2223 def api_delete_model(id, res) models = @agent_config.models return json_response(res, 404, { error: "model not found" }) unless models.any? { |m| m["id"] == id } return json_response(res, 422, { error: "cannot delete the last model" }) if models.length <= 1 index = models.find_index { |m| m["id"] == id } removed = models.delete_at(index) # Re-anchor current_* if we just deleted the active model. if @agent_config.current_model_id == removed["id"] new_default = models.find { |m| m["type"] == "default" } || models.first # If the removed model was the default, promote the new current # model so the config always has exactly one default entry. if removed["type"] == "default" && new_default && new_default["type"] != "default" new_default["type"] = "default" end @agent_config.current_model_id = new_default["id"] @agent_config.current_model_index = models.find_index { |m| m["id"] == new_default["id"] } || 0 elsif @agent_config.current_model_index >= models.length @agent_config.current_model_index = models.length - 1 end @agent_config.save json_response(res, 200, { ok: true }) rescue => e json_response(res, 422, { error: e. }) end |
#api_delete_session(session_id, res) ⇒ Object
2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 |
# File 'lib/octo/server/http_server.rb', line 2565 def api_delete_session(session_id, res) # A session exists if it's either in the runtime registry OR on disk. # Old sessions that were never restored into memory this server run # (e.g. shown via "load more" in the WebUI list) are disk-only — we # must still be able to delete them. Previously this endpoint only # consulted @registry and returned 404 for disk-only sessions, # causing the "can't delete old sessions" bug. in_registry = @registry.exist?(session_id) on_disk = !@session_manager.load(session_id).nil? unless in_registry || on_disk return json_response(res, 404, { error: "Session not found" }) end # Registry delete is best-effort — only meaningful when the session # is actually live (cancels idle timer, interrupts the agent thread). # For disk-only sessions this is a no-op and returns false, which is # fine and no longer blocks the disk cleanup below. @registry.delete(session_id) if in_registry # Always physically remove the persisted session file (+ chunks). @session_manager.delete(session_id) if on_disk # Notify any still-connected clients (mainly matters when the # session was live, but harmless otherwise). broadcast(session_id, { type: "session_deleted", session_id: session_id }) unsubscribe_all(session_id) json_response(res, 200, { ok: true }) end |
#api_export_session(session_id, res) ⇒ Object
Export a session bundle as a .zip download containing:
- session.json (always)
- chunk-*.md (0..N archived conversation chunks)
- logs/octo-YYYY-MM-DD.log (today's logger file, if present)
Useful for debugging — user clicks “download” in the WebUI status bar and we can ask them to attach the zip to a bug report.
2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 |
# File 'lib/octo/server/http_server.rb', line 2602 def api_export_session(session_id, res) bundle = @session_manager.files_for(session_id) unless bundle return json_response(res, 404, { error: "Session not found" }) end require "zip" short_id = bundle[:session][:session_id].to_s[0..7] # Build the zip entirely in memory — session files are small (< few MB). buffer = Zip::OutputStream.write_buffer do |zos| zos.put_next_entry("session.json") zos.write(File.binread(bundle[:json_path])) bundle[:chunks].each do |chunk_path| # Preserve original chunk filename so the ordering (chunk-1.md, chunk-2.md, ...) is clear. zos.put_next_entry(File.basename(chunk_path)) zos.write(File.binread(chunk_path)) end log_path = Octo::Logger.current_log_file if log_path && File.exist?(log_path) zos.put_next_entry("logs/#{File.basename(log_path)}") zos.write(File.binread(log_path)) end end buffer.rewind data = buffer.read filename = "octo-session-#{short_id}.zip" res.status = 200 res.content_type = "application/zip" res["Content-Disposition"] = %(attachment; filename="#{filename}") res["Access-Control-Allow-Origin"] = "*" # Force a fresh copy each time — debugging sessions get new chunks over time. res["Cache-Control"] = "no-store" res.body = data rescue => e Octo::Logger.error("Session export failed: #{e.}") if defined?(Octo::Logger) json_response(res, 500, { error: "Export failed: #{e.}" }) end |
#api_file_action(req, res) ⇒ Object
POST /api/file-action Unified file action endpoint — open locally or download. Body: { path: String, action: “open” | “download” }
open: opens the file with the OS default handler (local deployments).
download: returns the file as a download (remote deployments).
1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 |
# File 'lib/octo/server/http_server.rb', line 1206 def api_file_action(req, res) body = parse_json_body(req) path = body["path"] action = body["action"] || "open" return json_response(res, 400, { error: "path is required" }) unless path && !path.empty? # Expand ~ to the user's home directory (e.g. "~/Desktop/file.pdf"). # Ruby's File.exist? does NOT automatically expand ~ — that's a shell feature. path = File.(path) # On WSL the file may be specified as a Windows path (e.g. "C:/Users/…"). # Convert it to the Linux-side path so File.exist? works. linux_path = Utils::EnvironmentDetector.win_to_linux_path(path) return json_response(res, 404, { error: "file not found" }) unless File.exist?(linux_path) case action when "open" result = Utils::EnvironmentDetector.open_file(linux_path) return json_response(res, 501, { error: "unsupported OS" }) if result.nil? json_response(res, 200, { ok: true }) when "download" serve_file_download(res, linux_path) else json_response(res, 400, { error: "invalid action. Must be 'open' or 'download'" }) end rescue => e json_response(res, 500, { ok: false, error: e. }) end |
#api_get_config(res) ⇒ Object
GET /api/config — return current model configurations
2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 |
# File 'lib/octo/server/http_server.rb', line 2071 def api_get_config(res) models = @agent_config.models.map.with_index do |m, i| { id: m["id"], # Stable runtime id — use this for switching index: i, model: m["model"], base_url: m["base_url"], api_key_masked: mask_api_key(m["api_key"]), anthropic_format: m["anthropic_format"] || false, type: m["type"] } end # Filter out auto-injected models (like lite) from UI display models.reject! { |m| @agent_config.models[m[:index]]["auto_injected"] } json_response(res, 200, { models: models, current_index: @agent_config.current_model_index, current_id: @agent_config.current_model&.dig("id") }) end |
#api_get_version(res) ⇒ Object
GET /api/version Returns current version and latest version from RubyGems (cached for 1 hour).
665 666 667 668 669 670 671 672 673 674 675 |
# File 'lib/octo/server/http_server.rb', line 665 def api_get_version(res) current = Octo::VERSION latest = fetch_latest_version_cached json_response(res, 200, { current: current, latest: latest, needs_update: latest ? version_older?(current, latest) : false, launcher: ENV["OCTO_LAUNCHER"] || "cli", cli_command: "octo" }) end |
#api_list_channel_users(platform, res) ⇒ Object
GET /api/channels/:platform/users Returns the list of known user IDs for the given platform. These are users who have sent at least one message to the bot in this server session.
For Weixin: returns users with a cached context_token (required for proactive messaging). For Feishu / WeCom: returns user IDs extracted from channel session bindings.
Response:
200 { users: ["uid1", "uid2", ...] }
1172 1173 1174 1175 1176 1177 1178 |
# File 'lib/octo/server/http_server.rb', line 1172 def api_list_channel_users(platform, res) platform = platform.to_sym users = @channel_manager.known_users(platform) json_response(res, 200, { platform: platform, users: users }) rescue StandardError => e json_response(res, 500, { ok: false, error: e. }) end |
#api_list_channels(res) ⇒ Object
1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 |
# File 'lib/octo/server/http_server.rb', line 1089 def api_list_channels(res) config = Octo::ChannelConfig.load running = @channel_manager.running_platforms platforms = Octo::Channel::Adapters.all.map do |klass| platform = klass.platform_id raw = config.instance_variable_get(:@channels)[platform.to_s] || {} { platform: platform, enabled: !!raw["enabled"], running: running.include?(platform), has_config: !config.platform_config(platform).nil? }.merge(platform_safe_fields(platform, config)) end json_response(res, 200, { channels: platforms }) end |
#api_list_cron_tasks(res) ⇒ Object
GET /api/cron-tasks
1438 1439 1440 |
# File 'lib/octo/server/http_server.rb', line 1438 def api_list_cron_tasks(res) json_response(res, 200, { cron_tasks: @scheduler.list_cron_tasks }) end |
#api_list_providers(res) ⇒ Object
GET /api/providers — return built-in provider presets for quick setup
2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 |
# File 'lib/octo/server/http_server.rb', line 2299 def api_list_providers(res) providers = Octo::Providers::PRESETS.map do |id, preset| { id: id, name: preset["name"], base_url: preset["base_url"], default_model: preset["default_model"], models: preset["models"] || [], # Frontend uses this to render a Base URL dropdown (regional / # provider variants) when present. Absent for single-endpoint # providers — UI renders a plain text input in that case. endpoint_variants: preset["endpoint_variants"], website_url: preset["website_url"] } end json_response(res, 200, { providers: providers }) end |
#api_list_sessions(req, res) ⇒ Object
── REST API ──────────────────────────────────────────────────────────────
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 |
# File 'lib/octo/server/http_server.rb', line 493 def api_list_sessions(req, res) query = URI.decode_www_form(req.query_string.to_s).to_h limit = [query["limit"].to_i.then { |n| n > 0 ? n : 20 }, 50].min before = query["before"].to_s.strip.then { |v| v.empty? ? nil : v } q = query["q"].to_s.strip.then { |v| v.empty? ? nil : v } date = query["date"].to_s.strip.then { |v| v.empty? ? nil : v } type = query["type"].to_s.strip.then { |v| v.empty? ? nil : v } # Backward-compat: ?source=<x> and ?profile=coding → type type ||= query["profile"].to_s.strip.then { |v| v.empty? ? nil : v } type ||= query["source"].to_s.strip.then { |v| v.empty? ? nil : v } # Fetch one extra NON-PINNED row to detect has_more without a separate count query. # `registry.list` always returns ALL matching pinned rows first (on the # first page; `before` == nil), followed by non-pinned rows up to `limit+1`. # So has_more is determined by whether the non-pinned section overflowed. sessions = @registry.list(limit: limit + 1, before: before, q: q, date: date, type: type) # Split pinned vs non-pinned to apply has_more only to the non-pinned tail. pinned_part, non_pinned_part = sessions.partition { |s| s[:pinned] } has_more = non_pinned_part.size > limit non_pinned_part = non_pinned_part.first(limit) sessions = pinned_part + non_pinned_part json_response(res, 200, { sessions: sessions, has_more: has_more, cron_count: @registry.cron_count }) end |
#api_list_skills(res) ⇒ Object
GET /api/skills — list all loaded skills with metadata
1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 |
# File 'lib/octo/server/http_server.rb', line 1513 def api_list_skills(res) @skill_loader.load_all # refresh from disk on each request skills = @skill_loader.all_skills.map do |skill| source = @skill_loader.loaded_from[skill.identifier] # Compute local modification time of SKILL.md for "has local changes" indicator skill_md_path = File.join(skill.directory.to_s, "SKILL.md") local_modified_at = File.exist?(skill_md_path) ? File.mtime(skill_md_path).utc.iso8601 : nil entry = { name: skill.identifier, name_zh: skill.name_zh, description: skill.context_description, description_zh: skill.description_zh, source: source, enabled: !skill.disabled?, invalid: skill.invalid?, warnings: skill.warnings, local_modified_at: local_modified_at } entry[:invalid_reason] = skill.invalid_reason if skill.invalid? entry end json_response(res, 200, { skills: skills }) end |
#api_onboard_complete(req, res) ⇒ Object
POST /api/onboard/complete Called after key setup is done (soul_setup is optional/skipped). Creates the default session if none exists yet, returns it.
640 641 642 643 644 |
# File 'lib/octo/server/http_server.rb', line 640 def api_onboard_complete(req, res) create_default_session if @registry.list(limit: 1).empty? first_session = @registry.list(limit: 1).first json_response(res, 200, { ok: true, session: first_session }) end |
#api_onboard_skip_soul(req, res) ⇒ Object
POST /api/onboard/skip-soul Writes a minimal SOUL.md so the soul_setup phase is not re-triggered on the next server start when the user chooses to skip the conversation.
649 650 651 652 653 654 655 656 657 658 659 660 661 |
# File 'lib/octo/server/http_server.rb', line 649 def api_onboard_skip_soul(req, res) body = parse_json_body(req) lang = body["lang"].to_s.strip soul_content = lang == "zh" ? DEFAULT_SOUL_MD_ZH : DEFAULT_SOUL_MD agents_dir = File.("~/.octo/agents") FileUtils.mkdir_p(agents_dir) soul_path = File.join(agents_dir, "SOUL.md") unless File.exist?(soul_path) File.write(soul_path, soul_content) end json_response(res, 200, { ok: true }) end |
#api_onboard_status(res) ⇒ Object
GET /api/onboard/status Phase “key_setup” → no API key configured yet Phase “soul_setup” → key configured, but ~/.octo/agents/SOUL.md missing needs_onboard: false → fully set up
591 592 593 594 595 596 597 |
# File 'lib/octo/server/http_server.rb', line 591 def api_onboard_status(res) if !@agent_config.models_configured? json_response(res, 200, { needs_onboard: true, phase: "key_setup" }) else json_response(res, 200, { needs_onboard: false }) end end |
#api_rename_session(session_id, req, res) ⇒ Object
2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 |
# File 'lib/octo/server/http_server.rb', line 2347 def api_rename_session(session_id, req, res) body = parse_json_body(req) new_name = body["name"]&.to_s&.strip pinned = body["pinned"] return json_response(res, 404, { error: "Session not found" }) unless @registry.ensure(session_id) agent = nil @registry.with_session(session_id) { |s| agent = s[:agent] } # Update name if provided if new_name && !new_name.empty? agent.rename(new_name) end # Update pinned status if provided if !pinned.nil? agent.pinned = pinned end # Save session data @session_manager.save(agent.to_session_data) # Broadcast update event update_data = { type: "session_updated", session_id: session_id } update_data[:name] = new_name if new_name && !new_name.empty? update_data[:pinned] = pinned unless pinned.nil? broadcast(session_id, update_data) response_data = { ok: true } response_data[:name] = new_name if new_name && !new_name.empty? response_data[:pinned] = pinned unless pinned.nil? json_response(res, 200, response_data) rescue => e json_response(res, 500, { error: e. }) end |
#api_restart(req, res) ⇒ Object
POST /api/restart Re-execs the current process so the newly installed gem version is loaded. Uses the absolute script path captured at startup to avoid relative-path issues. Responds 200 first, then waits briefly for WEBrick to flush the response before exec.
946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 |
# File 'lib/octo/server/http_server.rb', line 946 def api_restart(req, res) json_response(res, 200, { ok: true, message: "Restarting…" }) Thread.new do sleep 0.5 # Let WEBrick flush the HTTP response if @master_pid # Worker mode: tell master to hot-restart. Master will TERM us after the # new worker boots; our trap("TERM") then runs shutdown_proc, which detaches # the inherited listen socket before WEBrick shutdown. Do NOT exit(0) here — # that bypasses trap handlers and lets the OS close(fd) on a socket shared # with master+new worker, corrupting the listener on Linux/WSL. Octo::Logger.info("[Restart] Sending USR1 to master (PID=#{@master_pid})") begin Process.kill("USR1", @master_pid) rescue Errno::ESRCH Octo::Logger.warn("[Restart] Master PID=#{@master_pid} not found, falling back to exec.") standalone_exec_restart end else # Standalone mode (no master): fall back to the original exec approach. standalone_exec_restart end end end |
#api_run_cron_task(name, res) ⇒ Object
POST /api/cron-tasks/:name/run — execute immediately
1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 |
# File 'lib/octo/server/http_server.rb', line 1492 def api_run_cron_task(name, res) unless @scheduler.list_tasks.include?(name) return json_response(res, 404, { error: "Cron task not found: #{name}" }) end prompt = @scheduler.read_task(name) session_name = "▶ #{name} #{Time.now.strftime("%H:%M")}" working_dir = File.("~/octo_workspace") FileUtils.mkdir_p(working_dir) session_id = build_session(name: session_name, working_dir: working_dir, permission_mode: :auto_approve) @registry.update(session_id, pending_task: prompt, pending_working_dir: working_dir) json_response(res, 202, { ok: true, session: @registry.session_summary(session_id) }) rescue => e json_response(res, 422, { error: e. }) end |
#api_save_channel(platform, req, res) ⇒ Object
POST /api/channels/:platform Body: { fields… } (platform-specific credential fields) Saves credentials and optionally (re)starts the adapter.
1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 |
# File 'lib/octo/server/http_server.rb', line 1290 def api_save_channel(platform, req, res) platform = platform.to_sym body = parse_json_body(req) config = Octo::ChannelConfig.load fields = body.transform_keys(&:to_sym).reject { |k, _| k == :platform } fields = fields.transform_values { |v| v.is_a?(String) ? v.strip : v } # Record when the token was last updated so clients can detect re-login fields[:token_updated_at] = Time.now.to_i if platform == :weixin && fields.key?(:token) fields[:token_updated_at] = Time.now.to_i if platform == :discord && fields.key?(:bot_token) # Validate credentials against live API before persisting. # Merge with existing config so partial updates (e.g. allowed_users only) still validate correctly. klass = Octo::Channel::Adapters.find(platform) if klass && klass.respond_to?(:test_connection) existing = config.platform_config(platform) || {} merged = existing.merge(fields) result = klass.test_connection(merged) unless result[:ok] json_response(res, 422, { ok: false, error: result[:error] || "Credential validation failed" }) return end end config.set_platform(platform, **fields) config.save # Hot-reload: stop existing adapter for this platform (if running) and restart @channel_manager.reload_platform(platform, config) json_response(res, 200, { ok: true }) rescue StandardError => e json_response(res, 422, { ok: false, error: e. }) end |
#api_send_channel_message(platform, req, res) ⇒ Object
POST /api/channels/:platform/send Proactively send a message to a user via the given IM platform.
Body:
{ "message": "hello", # required
"user_id": "some_user_id" } # optional — defaults to most-recently active user
Response:
200 { ok: true }
400 { ok: false, error: "..." } — missing/invalid params or platform not running
503 { ok: false, error: "..." } — no known users (nobody has messaged the bot yet)
Constraints:
- The platform adapter must be running (channel must be enabled + connected).
- For Weixin (iLink protocol), a context_token is required per message. This is
automatically looked up from the in-memory cache populated by inbound messages.
If no token exists for the target user (i.e. the user has never messaged the bot
in this server session), the message cannot be delivered.
1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 |
# File 'lib/octo/server/http_server.rb', line 1125 def (platform, req, res) platform = platform.to_sym body = parse_json_body(req) = body["message"].to_s.strip if .empty? json_response(res, 400, { ok: false, error: "message is required" }) return end # Resolve target user_id user_id = body["user_id"].to_s.strip if user_id.empty? # Default to the most-recently active user for this platform known = @channel_manager.known_users(platform) if known.empty? json_response(res, 503, { ok: false, error: "No known users for :#{platform}. The user must send a message to the bot first." }) return end user_id = known.last end result = @channel_manager.send_to_user(platform, user_id, ) if result.nil? json_response(res, 400, { ok: false, error: "Failed to send message. The :#{platform} adapter may not be running, or no context_token is available for user #{user_id}." }) else json_response(res, 200, { ok: true, platform: platform, user_id: user_id }) end rescue StandardError => e json_response(res, 500, { ok: false, error: e. }) end |
#api_serve_local_image(req, res) ⇒ Object
GET /api/local-image?path=file:///path/to/image.png GET /api/local-image?path=/path/to/image.png
Serves a local image file with the correct Content-Type. Used by the Web UI to render local images that would otherwise be blocked by the browser’s security policy (file:// from http:// origin).
1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 |
# File 'lib/octo/server/http_server.rb', line 1257 def api_serve_local_image(req, res) raw_path = URI.decode_www_form(req.query_string.to_s).to_h["path"].to_s return json_response(res, 400, { error: "path is required" }) if raw_path.empty? # Strip file:// prefix if present path = raw_path.sub(%r{\Afile://}, "") path = CGI.unescape(path) path = File.(path) # On WSL the file may be specified as a Windows path (e.g. "C:/Users/…"). # Convert it to the Linux-side path so File.exist? works. path = Utils::EnvironmentDetector.win_to_linux_path(path) # Security: only serve image files ext = File.extname(path).downcase unless Utils::FileProcessor::LOCAL_IMAGE_EXTENSIONS.include?(ext) return json_response(res, 403, { error: "not an image file" }) end return json_response(res, 404, { error: "file not found" }) unless File.exist?(path) mime = Utils::FileProcessor::MIME_TYPES[ext] || "application/octet-stream" res.status = 200 res["Content-Type"] = mime res["Cache-Control"] = "private, max-age=3600" res.body = File.binread(path) rescue => e json_response(res, 500, { error: e. }) end |
#api_session_messages(session_id, req, res) ⇒ Object
GET /api/sessions/:id/messages?limit=20&before=1709123456.789 Replays conversation history for a session via the agent’s replay_history method. Returns a list of UI events (same format as WS events) for the frontend to render.
2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 |
# File 'lib/octo/server/http_server.rb', line 2320 def (session_id, req, res) unless @registry.ensure(session_id) Octo::Logger.warn("[messages] registry.ensure failed", session_id: session_id) return json_response(res, 404, { error: "Session not found" }) end # Parse query params query = URI.decode_www_form(req.query_string.to_s).to_h limit = [query["limit"].to_i.then { |n| n > 0 ? n : 20 }, 100].min before = query["before"]&.to_f agent = nil @registry.with_session(session_id) { |s| agent = s[:agent] } unless agent Octo::Logger.warn("[messages] agent is nil", session_id: session_id) return json_response(res, 200, { events: [], has_more: false }) end # Collect events emitted by replay_history via a lightweight collector UI collected = [] collector = HistoryCollector.new(session_id, collected) result = agent.replay_history(collector, limit: limit, before: before) json_response(res, 200, { events: collected, has_more: result[:has_more] }) end |
#api_session_skills(session_id, res) ⇒ Object
GET /api/sessions/:id/skills — list user-invocable skills for a session, filtered by the session’s agent profile. Used by the frontend slash-command autocomplete so only skills valid for the current profile are suggested.
1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 |
# File 'lib/octo/server/http_server.rb', line 1543 def api_session_skills(session_id, res) unless @registry.ensure(session_id) json_response(res, 404, { error: "Session not found" }) return end session = @registry.get(session_id) unless session json_response(res, 404, { error: "Session not found" }) return end agent = session[:agent] unless agent json_response(res, 404, { error: "Agent not found" }) return end agent.skill_loader.load_all profile = agent.agent_profile skills = agent.skill_loader.user_invocable_skills skills = skills.select { |s| s.allowed_for_agent?(profile.name) } if profile loader = agent.skill_loader loaded_from = loader.loaded_from skill_data = skills.map do |skill| source_type = loaded_from[skill.identifier] { name: skill.identifier, name_zh: skill.name_zh, description: skill.description || skill.context_description, description_zh: skill.description_zh, source_type: source_type } end json_response(res, 200, { skills: skill_data }) end |
#api_set_default_model(id, res) ⇒ Object
POST /api/config/models/:id/default Makes the identified model the new “default” (global initial model for new sessions AND current model for this server instance).
2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 |
# File 'lib/octo/server/http_server.rb', line 2254 def api_set_default_model(id, res) ok = @agent_config.set_default_model_by_id(id) return json_response(res, 404, { error: "model not found" }) unless ok @agent_config.current_model_id = id @agent_config.current_model_index = @agent_config.models.find_index { |m| m["id"] == id } || 0 @agent_config.save json_response(res, 200, { ok: true }) rescue => e json_response(res, 422, { error: e. }) end |
#api_switch_session_model(session_id, req, res) ⇒ Object
2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 |
# File 'lib/octo/server/http_server.rb', line 2384 def api_switch_session_model(session_id, req, res) body = parse_json_body(req) model_id = body["model_id"].to_s.strip return json_response(res, 400, { error: "model_id is required" }) if model_id.empty? return json_response(res, 404, { error: "Session not found" }) unless @registry.ensure(session_id) agent = nil @registry.with_session(session_id) { |s| agent = s[:agent] } # With Plan B (shared @models reference), every session's AgentConfig # points at the same @models array as the global @agent_config. So # resolving the model by stable id here and in agent.switch_model_by_id # will always agree — no more index divergence after add/delete. target_model = @agent_config.models.find { |m| m["id"] == model_id } if target_model.nil? return json_response(res, 400, { error: "Model not found in configuration" }) end # Switch to the model by id (unified interface with CLI) # Handles: config.switch_model_by_id + client rebuild + message_compressor rebuild success = agent.switch_model_by_id(model_id) unless success return json_response(res, 500, { error: "Failed to switch model" }) end # Persist the change (saves to session file, NOT global config.yml) @session_manager.save(agent.to_session_data) # Broadcast update to all clients broadcast_session_update(session_id) json_response(res, 200, { ok: true, model_id: model_id, model: target_model["model"] }) rescue => e json_response(res, 500, { error: e. }) end |
#api_switch_session_reasoning_effort(session_id, req, res) ⇒ Object
PATCH /api/sessions/:id/reasoning_effort Body: { “reasoning_effort”: “off” | “low” | “medium” | “high” }
2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 |
# File 'lib/octo/server/http_server.rb', line 2424 def api_switch_session_reasoning_effort(session_id, req, res) body = parse_json_body(req) raw = body["reasoning_effort"] return json_response(res, 404, { error: "Session not found" }) unless @registry.ensure(session_id) agent = nil @registry.with_session(session_id) { |s| agent = s[:agent] } return json_response(res, 404, { error: "Session not found" }) unless agent agent.reasoning_effort = raw @session_manager.save(agent.to_session_data) broadcast_session_update(session_id) json_response(res, 200, { ok: true, reasoning_effort: agent.reasoning_effort }) rescue => e json_response(res, 500, { error: e. }) end |
#api_test_channel(platform, req, res) ⇒ Object
POST /api/channels/:platform/test Body: { fields… } (credentials to test — NOT saved) Tests connectivity using the provided credentials without persisting.
1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 |
# File 'lib/octo/server/http_server.rb', line 1372 def api_test_channel(platform, req, res) platform = platform.to_sym body = parse_json_body(req) fields = body.transform_keys(&:to_sym).reject { |k, _| k == :platform } klass = Octo::Channel::Adapters.find(platform) unless klass json_response(res, 404, { ok: false, error: "Unknown platform: #{platform}" }) return end result = klass.test_connection(fields) json_response(res, 200, result) rescue StandardError => e json_response(res, 200, { ok: false, error: e. }) end |
#api_test_config(req, res) ⇒ Object
POST /api/config/test — test connection for a single model config Body: { model, base_url, api_key, anthropic_format }
2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 |
# File 'lib/octo/server/http_server.rb', line 2268 def api_test_config(req, res) body = parse_json_body(req) return json_response(res, 400, { error: "Invalid JSON" }) unless body api_key = body["api_key"].to_s # If masked, use the stored key from the matching model (by index or current) if api_key.include?("****") idx = body["index"]&.to_i || @agent_config.current_model_index api_key = @agent_config.models.dig(idx, "api_key").to_s end begin model = body["model"].to_s test_client = Octo::Client.new( api_key, base_url: body["base_url"].to_s, model: model, anthropic_format: body["anthropic_format"] || false ) result = test_client.test_connection(model: model) if result[:success] json_response(res, 200, { ok: true, message: "Connected successfully" }) else json_response(res, 200, { ok: false, message: result[:error].to_s }) end rescue => e json_response(res, 200, { ok: false, message: e. }) end end |
#api_toggle_channel(platform, req, res) ⇒ Object
PATCH /api/channels/:platform/enabled Body: { enabled: true|false } Toggles the platform on/off without touching credentials. Enabling requires the platform to already be configured.
1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 |
# File 'lib/octo/server/http_server.rb', line 1345 def api_toggle_channel(platform, req, res) platform = platform.to_sym enabled = parse_json_body(req)["enabled"] == true config = Octo::ChannelConfig.load if enabled unless config.platform_config(platform) json_response(res, 422, { ok: false, error: "Platform is not configured yet" }) return end config.enable_platform(platform) else config.disable_platform(platform) end config.save @channel_manager.reload_platform(platform, config) json_response(res, 200, { ok: true, enabled: config.enabled?(platform) }) rescue StandardError => e json_response(res, 422, { ok: false, error: e. }) end |
#api_toggle_skill(name, req, res) ⇒ Object
PATCH /api/skills/:name/toggle — enable or disable a skill Body: { enabled: true/false }
1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 |
# File 'lib/octo/server/http_server.rb', line 1585 def api_toggle_skill(name, req, res) body = parse_json_body(req) enabled = body["enabled"] if enabled.nil? json_response(res, 422, { error: "enabled field required" }) return end skill = @skill_loader.toggle_skill(name, enabled: enabled) json_response(res, 200, { ok: true, name: skill.identifier, enabled: !skill.disabled? }) rescue Octo::AgentError => e json_response(res, 422, { error: e. }) end |
#api_tool_browser(req, res) ⇒ Object
GET /api/channels Returns current config and running status for all supported platforms. POST /api/tool/browser Executes a browser tool action via the shared BrowserManager daemon. Used by skill scripts (e.g. feishu_setup.rb) to reuse the server’s existing Chrome connection without spawning a second MCP daemon.
Request body: JSON with same params as the browser tool
{ "action": "snapshot", "interactive": true, ... }
Response: JSON result from the browser tool
1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 |
# File 'lib/octo/server/http_server.rb', line 1076 def api_tool_browser(req, res) params = parse_json_body(req) action = params["action"] return json_response(res, 400, { error: "action is required" }) if action.nil? || action.empty? tool = Octo::Tools::Browser.new result = tool.execute(**params.transform_keys(&:to_sym)) json_response(res, 200, result) rescue StandardError => e json_response(res, 500, { error: e. }) end |
#api_update_cron_task(name, req, res) ⇒ Object
PATCH /api/cron-tasks/:name — update content and/or cron/enabled Body: { content?, cron?, enabled? }
1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 |
# File 'lib/octo/server/http_server.rb', line 1466 def api_update_cron_task(name, req, res) body = parse_json_body(req) content = body["content"] cron = body["cron"]&.to_s&.strip enabled = body["enabled"] if cron && cron.split(/\s+/).size != 5 return json_response(res, 422, { error: "cron must have 5 fields (min hour dom month dow)" }) end @scheduler.update_cron_task(name, content: content, cron: cron, enabled: enabled) json_response(res, 200, { ok: true, name: name }) rescue => e json_response(res, 404, { error: e. }) end |
#api_update_model(id, req, res) ⇒ Object
PATCH /api/config/models/:id Body: any subset of { model, base_url, api_key, anthropic_format, type } Rules (the whole reason we moved off bulk save):
- Missing key → field untouched
- api_key with "****" (masked display value) → IGNORED (never overwrites)
- api_key empty string → IGNORED (defensive; treat as "not changed")
- api_key real non-masked value → stored
- type="default" transparently clears the marker on other models
- Unknown id → 404
2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 |
# File 'lib/octo/server/http_server.rb', line 2171 def api_update_model(id, req, res) body = parse_json_body(req) return json_response(res, 400, { error: "Invalid JSON" }) unless body target = @agent_config.models.find { |m| m["id"] == id } return json_response(res, 404, { error: "model not found" }) unless target if body.key?("model") v = body["model"].to_s.strip target["model"] = v unless v.empty? end if body.key?("base_url") v = body["base_url"].to_s.strip target["base_url"] = v unless v.empty? end if body.key?("anthropic_format") target["anthropic_format"] = !!body["anthropic_format"] end if body.key?("api_key") new_key = body["api_key"].to_s # Only store a real, unmasked, non-empty value. This is the # single place the "api_key disappeared" bug can no longer # happen — there is no path that writes "" into api_key. if !new_key.empty? && !new_key.include?("****") target["api_key"] = new_key end end if body.key?("type") new_type = body["type"] new_type = nil if new_type.is_a?(String) && new_type.strip.empty? if new_type == "default" @agent_config.models.each do |m| next if m["id"] == id m.delete("type") if m["type"] == "default" end target["type"] = "default" @agent_config.current_model_id = target["id"] @agent_config.current_model_index = @agent_config.models.find_index { |m| m["id"] == id } || 0 elsif new_type.nil? target.delete("type") else target["type"] = new_type end end @agent_config.save json_response(res, 200, { ok: true }) rescue => e json_response(res, 422, { error: e. }) end |
#api_upgrade_version(req, res) ⇒ Object
POST /api/version/upgrade Upgrades octo in a background thread, streaming output via WebSocket broadcast. If the user’s gem source is the official RubyGems, use ‘gem update`. Otherwise (e.g. Aliyun mirror) download the .gem from OSS CDN to bypass mirror lag.
681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 |
# File 'lib/octo/server/http_server.rb', line 681 def api_upgrade_version(req, res) json_response(res, 202, { ok: true, message: "Upgrade started" }) Thread.new do begin if official_gem_source? upgrade_via_gem_update else upgrade_via_oss_cdn end rescue StandardError => e Octo::Logger.error("[Upgrade] Exception: #{e.class}: #{e.}\n#{e.backtrace.first(5).join("\n")}") broadcast_all(type: "upgrade_log", line: "\n✗ Error during upgrade: #{e.}\n") broadcast_all(type: "upgrade_complete", success: false) end end end |
#api_upload_file(req, res) ⇒ Object
POST /api/upload Accepts a multipart/form-data file upload (field name: “file”). Runs the file through FileProcessor: saves original + generates structured preview (Markdown) for Office/ZIP files so the agent can read them directly.
1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 |
# File 'lib/octo/server/http_server.rb', line 1184 def api_upload_file(req, res) upload = parse_multipart_upload(req, "file") unless upload json_response(res, 400, { ok: false, error: "No file field found in multipart body" }) return end saved = Octo::Utils::FileProcessor.save( body: upload[:data], filename: upload[:filename].to_s ) json_response(res, 200, { ok: true, name: saved[:name], path: saved[:path] }) rescue => e json_response(res, 500, { ok: false, error: e. }) end |
#broadcast(session_id, event) ⇒ Object
Broadcast an event to all clients subscribed to a session. Dead connections (broken pipe / closed socket / deadline exceeded) are removed automatically. Connections already marked closed are skipped upfront so one sluggish client can’t delay delivery to healthy ones.
3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 |
# File 'lib/octo/server/http_server.rb', line 3140 def broadcast(session_id, event) clients = @ws_mutex.synchronize { (@ws_clients[session_id] || []).dup } dead = [] clients.each do |conn| if conn.closed? dead << conn next end dead << conn unless conn.send_json(event) end return if dead.empty? @ws_mutex.synchronize do (@ws_clients[session_id] || []).reject! { |conn| dead.include?(conn) } @all_ws_conns.reject! { |conn| dead.include?(conn) } end end |
#broadcast_all(event) ⇒ Object
Broadcast an event to every connected client (regardless of session subscription). Dead connections are removed automatically.
3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 |
# File 'lib/octo/server/http_server.rb', line 3160 def broadcast_all(event) clients = @ws_mutex.synchronize { @all_ws_conns.dup } dead = [] clients.each do |conn| if conn.closed? dead << conn next end dead << conn unless conn.send_json(event) end return if dead.empty? @ws_mutex.synchronize do @all_ws_conns.reject! { |conn| dead.include?(conn) } @ws_clients.each_value { |list| list.reject! { |conn| dead.include?(conn) } } end end |
#broadcast_session_update(session_id) ⇒ Object
Broadcast a session_update event to all clients so they can patch their local session list without needing a full session_list refresh.
3180 3181 3182 3183 3184 3185 |
# File 'lib/octo/server/http_server.rb', line 3180 def broadcast_session_update(session_id) session = @registry.snapshot(session_id) return unless session broadcast_all(type: "session_update", session: session) end |
#build_session(name:, working_dir:, permission_mode: :confirm_all, profile: "general", source: :manual, model_id: nil) ⇒ Object
Create a session in the registry and wire up Agent + WebUIController. Returns the new session_id. Build a new agent session.
3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 |
# File 'lib/octo/server/http_server.rb', line 3200 def build_session(name:, working_dir:, permission_mode: :confirm_all, profile: "general", source: :manual, model_id: nil) session_id = Octo::SessionManager.generate_id @registry.create(session_id: session_id) config = @agent_config.deep_copy config. = # Apply model override BEFORE creating the client — otherwise the # client is built from the default model entry and may route through # the wrong provider (e.g. sending a deepseek-v4-pro request to the # Bedrock-format Octo endpoint, which replies "unknown model"). # # We use switch_model_by_id (not a name-based rewrite of # current_model["model"]) because: # 1. Ids uniquely identify an entry across providers; names can # collide between entries (deepseek vs dsk-deepseek aliases). # 2. switch_model_by_id only flips per-session @current_model_id # in the dup'd config — it never mutates the shared @models # array (see AgentConfig#deep_copy's shared-ref contract). # A name rewrite would have leaked into every live session # AND corrupted the on-disk config at next save. config.switch_model_by_id(model_id) if model_id # Build client from the (possibly overridden) config so api format # detection (Bedrock vs OpenAI vs Anthropic) uses the correct model. client = Octo::Client.new( config.api_key, base_url: config.base_url, model: config.model_name, anthropic_format: config.anthropic_format? ) broadcaster = method(:broadcast) ui = WebUIController.new(session_id, broadcaster) agent = Octo::Agent.new(client, config, working_dir: working_dir, ui: ui, profile: profile, session_id: session_id, source: source) agent.rename(name) unless name.nil? || name.empty? idle_timer = build_idle_timer(session_id, agent) @registry.with_session(session_id) do |s| s[:agent] = agent s[:ui] = ui s[:idle_timer] = idle_timer end # Persist an initial snapshot so the session is immediately visible in registry.list # (which reads from disk). Without this, new sessions only appear after their first task. @session_manager.save(agent.to_session_data) session_id end |
#build_session_from_data(session_data, permission_mode: :confirm_all) ⇒ Object
Restore a persisted session from saved session_data (from SessionManager). The agent keeps its original session_id so the frontend URL hash stays valid across server restarts.
3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268 3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280 3281 |
# File 'lib/octo/server/http_server.rb', line 3255 def build_session_from_data(session_data, permission_mode: :confirm_all) original_id = session_data[:session_id] client = @client_factory.call config = @agent_config.deep_copy config. = broadcaster = method(:broadcast) ui = WebUIController.new(original_id, broadcaster) # Restore the agent profile from the persisted session; fall back to "general" # for sessions saved before the agent_profile field was introduced. profile = session_data[:agent_profile].to_s profile = "general" if profile.empty? agent = Octo::Agent.from_session(client, config, session_data, ui: ui, profile: profile) idle_timer = build_idle_timer(original_id, agent) # Register session atomically with a fully-built agent so no concurrent # caller ever sees agent=nil for this session. The duplicate-restore guard # is handled upstream by SessionRegistry#ensure via @restoring. @registry.create(session_id: original_id) @registry.with_session(original_id) do |s| s[:agent] = agent s[:ui] = ui s[:idle_timer] = idle_timer end original_id end |
#create_default_session ⇒ Object
Auto-restore persisted sessions (or create a fresh default) when the server starts. Skipped when no API key is configured (onboard flow will handle it).
Strategy: load the most recent sessions from ~/.octo/sessions/ for the current working directory and restore them into @registry so their IDs are stable across restarts (frontend hash stays valid). If no persisted sessions exist, fall back to creating a new default session.
566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 |
# File 'lib/octo/server/http_server.rb', line 566 def create_default_session return unless @agent_config.models_configured? # Restore up to 5 sessions per source type from disk into the registry. @registry.restore_from_disk(n: 5) # Recover any orphaned .jsonl incremental logs from crashed sessions # and merge them back into their parent session .json files. recovered = @session_manager.recover_jsonl_sessions Octo::Logger.info("http_server.recovered_jsonl_sessions", count: recovered) if recovered > 0 # If nothing was restored (no persisted sessions), create a fresh default. unless @registry.list(limit: 1).any? working_dir = default_working_dir FileUtils.mkdir_p(working_dir) unless Dir.exist?(working_dir) build_session(name: "Session 1", working_dir: working_dir) end end |
#default_working_dir ⇒ Object
── Helpers ───────────────────────────────────────────────────────────────
3189 3190 3191 |
# File 'lib/octo/server/http_server.rb', line 3189 def default_working_dir @agent_config&.default_working_dir || File.("~/octo_workspace") end |
#deliver_confirmation(session_id, conf_id, result) ⇒ Object
2858 2859 2860 2861 2862 |
# File 'lib/octo/server/http_server.rb', line 2858 def deliver_confirmation(session_id, conf_id, result) ui = nil @registry.with_session(session_id) { |s| ui = s[:ui] } ui&.deliver_confirmation(conf_id, result) end |
#dispatch(req, res) ⇒ Object
── Router ────────────────────────────────────────────────────────────────
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 |
# File 'lib/octo/server/http_server.rb', line 337 def dispatch(req, res) path = req.path method = req.request_method # Access key guard (skip for WebSocket upgrades) return unless check_access_key(req, res) # WebSocket upgrade — no timeout applied (long-lived connection) if websocket_upgrade?(req) handle_websocket(req, res) return end # Wrap all REST handlers in a timeout so a hung handler (e.g. infinite # recursion in chunk parsing) returns a proper 503 instead of an empty 200. timeout_sec = if path == "/api/tool/browser" 30 else 10 end Timeout.timeout(timeout_sec) do _dispatch_rest(req, res) end rescue Timeout::Error Octo::Logger.warn("[HTTP 503] #{method} #{path} timed out after #{timeout_sec}s") json_response(res, 503, { error: "Request timed out" }) rescue => e Octo::Logger.warn("[HTTP 500] #{e.class}: #{e.}\n#{e.backtrace.first(5).join("\n")}") json_response(res, 500, { error: e. }) end |
#handle_user_message(session_id, content, files = []) ⇒ Object
── Session actions ───────────────────────────────────────────────────────
2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856 |
# File 'lib/octo/server/http_server.rb', line 2820 def (session_id, content, files = []) return unless @registry.exist?(session_id) agent = nil @registry.with_session(session_id) { |s| agent = s[:agent] } return unless agent # Auto-name the session from the first user message (before agent starts running). # Skip if the name looks like it was set by the user (not a system-generated "Session N"). if agent.history.empty? && agent.name.match?(/\ASession \d+\z/) auto_name = content.gsub(/\s+/, " ").strip[0, 30] auto_name += "…" if content.strip.length > 30 agent.rename(auto_name) broadcast(session_id, { type: "session_renamed", session_id: session_id, name: auto_name }) end # The frontend always renders a ghost bubble on send. The real # bubble is rendered by the agent when it drains the inbox — # this avoids duplicate bubbles for idle agents. # Enqueue — files (if any) are processed eagerly on this HTTP thread # inside enqueue_user_message, so the inbox carries a fully-formed # payload by the time it lands. The agent decides whether to drain # in an in-flight run or spawn a fresh drain-only one. decision = agent.(content, files: files) case decision when :running, :spawn_pending # Existing or imminent run will drain the inbox at its next # iteration. Nothing more to do here. return when :spawn # Agent was idle and we won the right to spawn. Kick off a # drain-only run; it will pick up our queued message (and any # other items queued concurrently) at iteration top. run_agent_task(session_id, agent) { agent.run } end end |
#handle_websocket(req, res) ⇒ Object
Hijacks the TCP socket from WEBrick and upgrades it to WebSocket.
2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 |
# File 'lib/octo/server/http_server.rb', line 2651 def handle_websocket(req, res) socket = req.instance_variable_get(:@socket) # Server handshake — parse the upgrade request handshake = WebSocket::Handshake::Server.new handshake << build_handshake_request(req) unless handshake.finished? && handshake.valid? Octo::Logger.warn("WebSocket handshake invalid") return end # Send the 101 Switching Protocols response socket.write(handshake.to_s) version = handshake.version incoming = WebSocket::Frame::Incoming::Server.new(version: version) conn = WebSocketConnection.new(socket, version) on_ws_open(conn) begin buf = String.new("", encoding: "BINARY") loop do chunk = socket.read_nonblock(4096, buf, exception: false) case chunk when :wait_readable IO.select([socket], nil, nil, 30) when nil break # EOF else incoming << chunk.dup while (frame = incoming.next) case frame.type when :text (conn, frame.data) when :binary (conn, frame.data) when :ping conn.send_raw(:pong, frame.data) when :close conn.send_raw(:close, "") break end end end end rescue IOError, Errno::ECONNRESET, Errno::EPIPE, Errno::EBADF # Client disconnected or socket became invalid ensure on_ws_close(conn) socket.close rescue nil end # Tell WEBrick not to send any response (we handled everything) res.instance_variable_set(:@header, {}) res.status = -1 rescue => e Octo::Logger.error("WebSocket handler error: #{e.class}: #{e.}") end |
#interrupt_session(session_id) ⇒ Object
Interrupt a running agent session.
Thread#raise alone is not reliable enough in practice — it’s best-effort against blocked syscalls (socket writes, OpenSSL read, ConditionVariable#wait with a held mutex) and we’ve seen sessions that stay “running” forever even after multiple interrupt attempts.
Strategy: three-tier escalation in a background watchdog Thread so the HTTP handler returns immediately.
Tier 1 (t=0): Thread#raise(AgentInterrupted).
Unblocks most pure-Ruby waits and Faraday reads.
Handles the common case.
Tier 2 (t=3): force-close this session's WebSocket connections
so any send_raw stuck on socket write wakes up.
Try Thread#raise again (idempotent).
Tier 3 (t=8): Thread#kill — last resort. Leaks any held
resources but frees the session so the user can
move on.
Each transition is logged so that when users report “stuck sessions” we can see in the log whether tier 2/3 ever had to fire — that’s our signal to dig deeper on the underlying block.
2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 |
# File 'lib/octo/server/http_server.rb', line 2903 def interrupt_session(session_id) thread = nil agent = nil @registry.with_session(session_id) do |s| s[:idle_timer]&.cancel thread = s[:thread] agent = s[:agent] if thread&.alive? Octo::Logger.info("[interrupt] session=#{session_id} tier=1 raise") begin thread.raise(Octo::AgentInterrupted, "Interrupted by user") rescue ThreadError => e Octo::Logger.warn("[interrupt] tier=1 raise failed: #{e.}") end end end # Also set @discard_threshold + raise into Agent's tracked run thread. # This covers drain-only inbox runs that may bypass session[:thread]. agent&.interrupt_current_run! return unless thread&.alive? start_interrupt_watchdog(session_id, thread) end |
#json_response(res, status, data) ⇒ Object
3312 3313 3314 3315 3316 3317 |
# File 'lib/octo/server/http_server.rb', line 3312 def json_response(res, status, data) res.status = status res.content_type = "application/json; charset=utf-8" res["Access-Control-Allow-Origin"] = "*" res.body = JSON.generate(data) end |
#mask_api_key(key) ⇒ Object
Mask API key for display: show first 8 + last 4 chars, middle replaced with **** Mask an api_key for safe display / transport to the browser.
Contract: the returned string MUST contain “****” so callers (incl. the frontend) can reliably detect “this is a display placeholder, not a real key” and refuse to treat it as input. The old behaviour of returning the raw value for short keys was a correctness bug —it leaked short keys in plaintext to GET /api/config, and it let short masked values slip past the frontend’s mask-detection.
3303 3304 3305 3306 3307 3308 3309 3310 |
# File 'lib/octo/server/http_server.rb', line 3303 def mask_api_key(key) return "" if key.nil? || key.empty? if key.length <= 12 # Very short key — show the first char only, redact the rest. return "#{key[0]}****" end "#{key[0..7]}****#{key[-4..]}" end |
#not_found(res) ⇒ Object
3375 3376 3377 3378 |
# File 'lib/octo/server/http_server.rb', line 3375 def not_found(res) res.status = 404 res.body = "Not Found" end |
#on_ws_close(conn) ⇒ Object
2813 2814 2815 2816 |
# File 'lib/octo/server/http_server.rb', line 2813 def on_ws_close(conn) @ws_mutex.synchronize { @all_ws_conns.delete(conn) } unsubscribe(conn) end |
#on_ws_message(conn, raw) ⇒ Object
2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 |
# File 'lib/octo/server/http_server.rb', line 2724 def (conn, raw) msg = JSON.parse(raw) unless msg.is_a?(Hash) Octo::Logger.warn("[on_ws_message] Ignoring non-Hash message: #{raw[0,200].inspect}") return end type = msg["type"] case type when "subscribe" session_id = msg["session_id"] if @registry.ensure(session_id) conn.session_id = session_id subscribe(session_id, conn) conn.send_json(type: "subscribed", session_id: session_id) # Push a fresh snapshot so a reconnecting tab sees the true current # status (it may have missed session_update events while offline). if (snap = @registry.snapshot(session_id)) conn.send_json(type: "session_update", session: snap) end # If a shell command is still running, replay progress + buffered stdout # to the newly subscribed tab so it sees the live state it may have missed. @registry.with_session(session_id) { |s| s[:ui]&.replay_live_state } # Replay inbox queue status AND pending message content so the # reconnected tab sees both the count hint AND the pending # ghost bubbles for messages still waiting to be drained. @registry.with_session(session_id) do |s| if (agent = s[:agent]) pending = agent. if pending > 0 s[:ui]&.(pending: pending) conn.send_json({ type: "pending_user_messages", session_id: session_id, messages: agent. }) end end end # Push the current background-task badge so a refreshed tab doesn't # lose track of in-flight async tasks. _push_background_tasks_snapshot(session_id, conn) else conn.send_json(type: "error", message: "Session not found: #{session_id}") end when "message" session_id = msg["session_id"] || conn.session_id # Merge legacy images array into files as { data_url:, name:, mime_type: } entries raw_images = (msg["images"] || []).map do |data_url| { "data_url" => data_url, "name" => "image.jpg", "mime_type" => "image/jpeg" } end (session_id, msg["content"].to_s, (msg["files"] || []) + raw_images) when "confirmation" session_id = msg["session_id"] || conn.session_id deliver_confirmation(session_id, msg["id"], msg["result"]) when "interrupt" session_id = msg["session_id"] || conn.session_id interrupt_session(session_id) when "list_sessions" # Initial load: newest 20 sessions regardless of source/profile. # Single unified query — frontend shows all in one time-sorted list. page = @registry.list(limit: 21) has_more = page.size > 20 all_sessions = page.first(20) conn.send_json(type: "session_list", sessions: all_sessions, has_more: has_more, cron_count: @registry.cron_count) when "run_task" # Client sends this after subscribing to guarantee it's ready to receive # broadcasts before the agent starts executing. session_id = msg["session_id"] || conn.session_id start_pending_task(session_id) when "ping" conn.send_json(type: "pong") else conn.send_json(type: "error", message: "Unknown message type: #{type}") end rescue JSON::ParserError => e conn.send_json(type: "error", message: "Invalid JSON: #{e.}") rescue => e Octo::Logger.error("[on_ws_message] #{e.class}: #{e.}\n#{e.backtrace.first(10).join("\n")}") conn.send_json(type: "error", message: e.) end |
#on_ws_open(conn) ⇒ Object
2719 2720 2721 2722 |
# File 'lib/octo/server/http_server.rb', line 2719 def on_ws_open(conn) @ws_mutex.synchronize { @all_ws_conns << conn } # Client will send a "subscribe" message to bind to a session end |
#parse_json_body(req) ⇒ Object
3319 3320 3321 3322 3323 3324 3325 |
# File 'lib/octo/server/http_server.rb', line 3319 def parse_json_body(req) return {} if req.body.nil? || req.body.empty? JSON.parse(req.body) rescue JSON::ParserError {} end |
#serve_file_download(res, path) ⇒ Object
Stream a file to the client as a download. Content-Type is always application/octet-stream — the browser determines file type and handling from the filename extension in Content-Disposition.
1240 1241 1242 1243 1244 1245 1246 1247 1248 |
# File 'lib/octo/server/http_server.rb', line 1240 def serve_file_download(res, path) filename = File.basename(path) res.status = 200 res["Content-Type"] = "application/octet-stream" res["Content-Disposition"] = "attachment; filename=\"#{filename}\"" res["Content-Length"] = File.size(path).to_s res.body = File.binread(path) end |
#start ⇒ Object
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 |
# File 'lib/octo/server/http_server.rb', line 222 def start # Enable console logging for the server process so log lines are visible in the terminal. Octo::Logger.console = true Octo::Logger.info("[HttpServer PID=#{Process.pid}] start() mode=#{@inherited_socket ? 'worker' : 'standalone'} inherited_socket=#{@inherited_socket.inspect} master_pid=#{@master_pid.inspect}") # Expose server address and product name to all child processes (skill scripts, shell commands, etc.) # so they can call back into the server without hardcoding the port. ENV["OCTO_SERVER_PORT"] = @port.to_s ENV["OCTO_SERVER_HOST"] = (@host == "0.0.0.0" ? "127.0.0.1" : @host) ENV["OCTO_PRODUCT_NAME"] = "Octo" # Override WEBrick's built-in signal traps via StartCallback, # which fires after WEBrick sets its own INT/TERM handlers. # This ensures Ctrl-C always exits immediately. # # When running as a worker under Master, DoNotListen: true prevents WEBrick # from calling bind() on its own — we inject the inherited socket instead. webrick_opts = { BindAddress: @host, Port: @port, Logger: WEBrick::Log.new(File::NULL), AccessLog: [], StartCallback: proc { } # signal traps set below, after `server` is created } webrick_opts[:DoNotListen] = true if @inherited_socket Octo::Logger.info("[HttpServer PID=#{Process.pid}] WEBrick DoNotListen=#{webrick_opts[:DoNotListen].inspect}") server = WEBrick::HTTPServer.new(**webrick_opts) # Override WEBrick's signal traps now that `server` is available. # On INT/TERM: call server.shutdown (graceful), with a 1s hard-kill fallback. # Also stop BrowserManager so the chrome-devtools-mcp node process is killed # before this worker exits — otherwise it becomes an orphan and holds port 8888. shutdown_once = false shutdown_proc = proc do next if shutdown_once shutdown_once = true # Persist in-flight agent sessions BEFORE starting the forced-exit # timer, so any new messages added to @history since the last save # are on disk before the new worker reads them after a hot restart. interrupt_all_agents # Detach the inherited (shared) listen socket BEFORE WEBrick.shutdown # so that cleanup_listener does not call shutdown(SHUT_RDWR)+close on # it — that would propagate to every process sharing the underlying # kernel socket (Master + new worker), breaking subsequent accept() # on Linux. macOS's BSD stack tolerates this; Linux does not. if @inherited_socket && server.listeners.include?(@inherited_socket) server.listeners.delete(@inherited_socket) Octo::Logger.info("[HttpServer PID=#{Process.pid}] detached inherited socket fd=#{@inherited_socket.fileno} before shutdown") end t1 = Thread.new { @channel_manager.stop rescue nil } t2 = Thread.new { Octo::BrowserManager.instance.stop rescue nil } t1.join(1.5) t2.join(1.5) server.shutdown rescue nil end trap("INT") { shutdown_proc.call } trap("TERM") { shutdown_proc.call } if @inherited_socket server.listeners << @inherited_socket Octo::Logger.info("[HttpServer PID=#{Process.pid}] injected inherited fd=#{@inherited_socket.fileno} listeners=#{server.listeners.map(&:fileno).inspect}") else Octo::Logger.info("[HttpServer PID=#{Process.pid}] standalone, WEBrick listeners=#{server.listeners.map(&:fileno).inspect}") end # Mount API + WebSocket handler (takes priority). # Use a custom Servlet so that DELETE/PUT/PATCH requests are not rejected # by WEBrick's default method whitelist before reaching our dispatcher. dispatcher = self servlet_class = Class.new(WEBrick::HTTPServlet::AbstractServlet) do define_method(:do_GET) { |req, res| dispatcher.send(:dispatch, req, res) } define_method(:do_POST) { |req, res| dispatcher.send(:dispatch, req, res) } define_method(:do_PUT) { |req, res| dispatcher.send(:dispatch, req, res) } define_method(:do_DELETE) { |req, res| dispatcher.send(:dispatch, req, res) } define_method(:do_PATCH) { |req, res| dispatcher.send(:dispatch, req, res) } define_method(:do_OPTIONS) { |req, res| dispatcher.send(:dispatch, req, res) } end server.mount("/api", servlet_class) server.mount("/ws", servlet_class) # Mount static file handler for the entire web directory. # Use mount_proc so we can inject no-cache headers on every response, # preventing stale JS/CSS from being served after a gem update. # file_handler = WEBrick::HTTPServlet::FileHandler.new(server, WEB_ROOT, FancyIndexing: false) server.mount_proc("/") do |req, res| file_handler.service(req, res) res["Cache-Control"] = "no-store" res["Pragma"] = "no-cache" end # Auto-create a default session on startup create_default_session # Start the background scheduler @scheduler.start puts " Scheduler: #{@scheduler.schedules.size} task(s) loaded" # Start IM channel adapters (non-blocking — each platform runs in its own thread) @channel_manager.start # Start browser MCP daemon if browser.yml is configured (non-blocking) @browser_manager.start server.start end |
#start_pending_task(session_id) ⇒ Object
Start the pending task for a session. Called when the client sends “run_task” over WS — by that point the client has already subscribed, so every broadcast will be delivered.
2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 |
# File 'lib/octo/server/http_server.rb', line 2996 def start_pending_task(session_id) return unless @registry.exist?(session_id) session = @registry.get(session_id) prompt = session[:pending_task] working_dir = session[:pending_working_dir] return unless prompt # nothing pending # Clear the pending fields so a re-connect doesn't re-run @registry.update(session_id, pending_task: nil, pending_working_dir: nil) agent = nil @registry.with_session(session_id) { |s| agent = s[:agent] } return unless agent run_agent_task(session_id, agent) { agent.run(prompt) } end |
#subscribe(session_id, conn) ⇒ Object
── WebSocket subscription management ─────────────────────────────────────
3116 3117 3118 3119 3120 3121 3122 3123 3124 |
# File 'lib/octo/server/http_server.rb', line 3116 def subscribe(session_id, conn) @ws_mutex.synchronize do # Remove conn from any previous session subscription first, # so switching sessions never results in duplicate delivery. @ws_clients.each_value { |list| list.delete(conn) } @ws_clients[session_id] ||= [] @ws_clients[session_id] << conn unless @ws_clients[session_id].include?(conn) end end |
#unsubscribe(conn) ⇒ Object
3126 3127 3128 3129 3130 |
# File 'lib/octo/server/http_server.rb', line 3126 def unsubscribe(conn) @ws_mutex.synchronize do @ws_clients.each_value { |list| list.delete(conn) } end end |
#unsubscribe_all(session_id) ⇒ Object
3132 3133 3134 |
# File 'lib/octo/server/http_server.rb', line 3132 def unsubscribe_all(session_id) @ws_mutex.synchronize { @ws_clients.delete(session_id) } end |
#websocket_upgrade?(req) ⇒ Boolean
── WebSocket ─────────────────────────────────────────────────────────────
2646 2647 2648 |
# File 'lib/octo/server/http_server.rb', line 2646 def websocket_upgrade?(req) req["Upgrade"]&.downcase == "websocket" end |