Class: Octo::Agent

Inherits:
Object
  • Object
show all
Includes:
LlmCaller, MemoryUpdater, MessageCompressorHelper, NextMessageSuggester, SessionSerializer, SkillAutoCreator, SkillEvolution, SkillManager, SkillReflector, SystemPromptBuilder, TimeMachine, ToolExecutor
Defined in:
lib/octo/agent.rb,
lib/octo/agent/llm_caller.rb,
lib/octo/agent/time_machine.rb,
lib/octo/agent/skill_manager.rb,
lib/octo/agent/tool_executor.rb,
lib/octo/agent/memory_updater.rb,
lib/octo/agent/skill_evolution.rb,
lib/octo/agent/skill_reflector.rb,
lib/octo/agent/session_serializer.rb,
lib/octo/agent/skill_auto_creator.rb,
lib/octo/agent/system_prompt_builder.rb,
lib/octo/agent/next_message_suggester.rb,
lib/octo/agent/message_compressor_helper.rb

Defined Under Namespace

Modules: LlmCaller, MemoryUpdater, MessageCompressorHelper, NextMessageSuggester, SessionSerializer, SkillAutoCreator, SkillEvolution, SkillManager, SkillReflector, SystemPromptBuilder, TimeMachine, ToolExecutor

Constant Summary collapse

REASONING_EFFORTS =
%w[low medium high].freeze

Constants included from SkillAutoCreator

SkillAutoCreator::DEFAULT_AUTO_CREATE_THRESHOLD

Constants included from SkillReflector

SkillReflector::MIN_SKILL_ITERATIONS

Constants included from NextMessageSuggester

NextMessageSuggester::MAX_SUGGESTION_CHARS, NextMessageSuggester::RECENT_HISTORY_LIMIT, NextMessageSuggester::SUGGESTION_MAX_TOKENS

Constants included from MemoryUpdater

MemoryUpdater::MEMORIES_DIR, MemoryUpdater::MEMORY_UPDATE_MIN_ITERATIONS

Constants included from LlmCaller

LlmCaller::MAX_RETRIES_ON_FALLBACK, LlmCaller::RETRIES_BEFORE_FALLBACK

Constants included from SystemPromptBuilder

SystemPromptBuilder::MAX_MEMORY_FILE_CHARS

Constants included from SkillManager

SkillManager::MAX_CONTEXT_SKILLS

Constants included from MessageCompressorHelper

MessageCompressorHelper::COMPRESSION_THRESHOLD, MessageCompressorHelper::IDLE_COMPRESSION_THRESHOLD, MessageCompressorHelper::MAX_RECENT_MESSAGES, MessageCompressorHelper::MESSAGE_COUNT_THRESHOLD, MessageCompressorHelper::TARGET_COMPRESSED_TOKENS

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from SkillAutoCreator

#maybe_create_skill_from_task

Methods included from SkillReflector

#maybe_reflect_on_skill

Methods included from SkillEvolution

#run_skill_evolution_hooks

Methods included from NextMessageSuggester

#next_message_suggestion_enabled?, #run_next_message_suggestion!

Methods included from MemoryUpdater

#run_memory_update_subagent, #should_update_memory?

Methods included from TimeMachine

#active_messages, #get_child_tasks, #get_task_history, #restore_to_task_state, #save_modified_files_snapshot, #start_new_task, #switch_to_task, #undo_last_task

Methods included from LlmCaller

#collect_iteration_tokens

Methods included from SystemPromptBuilder

#build_system_prompt

Methods included from SkillManager

#build_skill_context, #build_template_context, #execute_skill_with_subagent, #filter_skills_by_profile, #inject_skill_as_assistant_message, #inject_skill_command_as_assistant_message, #load_memories_meta, #load_skills, #memories_base_dir, #parse_memory_frontmatter, #parse_skill_command, #shred_directory, warn_skill_limit_once

Methods included from SessionSerializer

#_replay_single_message, #extract_image_files_from_content, #extract_images_from_content, #extract_text_from_content, #get_recent_user_messages, #inject_chunk_index_if_needed, #refresh_system_prompt, #replay_history, #restore_session, #to_session_data

Methods included from ToolExecutor

#build_denied_result, #build_error_result, #build_success_result, #confirm_tool_use?, #format_tool_prompt, #is_safe_operation?, #should_auto_execute?, #show_tool_preview

Methods included from MessageCompressorHelper

#build_chunk_md, #calculate_target_recent_count, #compress_messages_if_needed, #empty_extraction_data, #extract_decision_text, #extract_from_messages, #extract_key_information, #extract_tool_names, #filter_todo_results, #filter_write_results, #find_in_progress, #format_message_content, #generate_hierarchical_summary, #generate_level1_summary, #generate_level2_summary, #generate_level3_summary, #generate_level4_summary, #get_recent_messages_with_tool_pairs, #handle_compression_response, #parse_shell_result, #parse_todo_result, #parse_write_result, #pull_assistant_before, #pull_tool_results_after, #save_compressed_chunk, #tool_result_for?, #tool_result_ids, #tool_result_message?, #trigger_idle_compression, #truncate_content, #truncate_tool_result

Constructor Details

#initialize(client, config, working_dir:, ui:, profile:, session_id:, source:) ⇒ Agent

Returns a new instance of Agent.



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/octo/agent.rb', line 71

def initialize(client, config, working_dir:, ui:, profile:, session_id:, source:)
  @client = client  # Client for current model
  @config = config.is_a?(AgentConfig) ? config : AgentConfig.new(config)
  @agent_profile = AgentProfile.load(profile)
  @source = source.to_sym  # :manual | :cron | :channel
  @tool_registry = ToolRegistry.new
  @hooks = HookManager.new
  @session_id = session_id
  @name = ""
  @pinned = false
  @history = MessageHistory.new
  @todos = []  # Store todos in memory
  @iterations = 0
  @cache_stats = {
    cache_creation_input_tokens: 0,
    cache_read_input_tokens: 0,
    total_requests: 0,
    cache_hit_requests: 0,
    raw_api_usage_samples: []  # Store raw API usage for debugging
  }
  @start_time = nil
  @working_dir = working_dir || Dir.pwd
  @created_at = Time.now.iso8601
  @total_tasks = 0
  @previous_total_tokens = 0  # Track tokens from previous iteration for delta calculation
  @latest_latency = nil  # Most recent LLM call's latency metrics (see Client#send_messages_with_tools)
  @reasoning_effort = nil  # Per-session reasoning effort override; nil = provider default
  @ui = ui  # UIController for direct UI interaction
  @debug_logs = []  # Debug logs for troubleshooting
  @pending_injections = []     # Pending inline skill injections to flush after observe()
  @pending_script_tmpdirs = [] # Decrypted-script tmpdirs to shred when agent.run completes
  @pending_error_rollback = false  # Deferred rollback flag set by restore_session on error
  @in_run_loop = false         # True while agent.run() is active (set under @state_mutex)
  # Unified inbox for events that should land in @history at the next
  # iteration boundary inside the run loop. Items are typed:
  #   {kind: :bg_notification, content:, bubble:, enqueued_at:}
  #   {kind: :user_msg,        content:, files:, enqueued_at:}
  # Drained chronologically by drain_inbox_into_history! (run loop top).
  @inbox = []
  @inbox_run_pending = false   # Set true after enqueue_user_message decides to spawn a run; cleared at run() entry. Dedupes concurrent spawns.
  @state_mutex = Mutex.new     # Protects @in_run_loop, @inbox, @inbox_run_pending, @current_run_thread, @discard_threshold
  @run_mutex = Mutex.new       # Serializes every Agent#run invocation regardless of caller
  @checkpoint_index = 0        # Tracks how many messages have been persisted to incremental log
  @on_checkpoint = nil         # Callback proc(messages) for incremental persistence
  @current_run_thread = nil    # Thread currently inside run()'s body — set under @state_mutex; used by interrupt_current_run!
  @discard_threshold = nil     # Time. Stale run attempts whose enqueue time predates this are dropped.

  # Compression tracking
  @compression_level = 0  # Tracks how many times we've compressed (for progressive summarization)
  @compressed_summaries = []  # Store summaries from previous compressions for reference

  # Message compressor for LLM-based intelligent compression
  # Uses LLM to preserve key decisions, errors, and context while reducing token count
  @message_compressor = MessageCompressor.new(@client, model: current_model)

  # Skill loader for skill management
  @skill_loader = SkillLoader.new(working_dir: @working_dir)

  # Initialize Time Machine
  init_time_machine

  # Register built-in tools
  register_builtin_tools

  # Ensure user-space parsers are in place (~/.octo/parsers/)
  Utils::ParserManager.setup!

  # Ensure bundled shell scripts are in place (~/.octo/scripts/)
  Utils::ScriptsManager.setup!
end

Instance Attribute Details

#agent_profileObject (readonly)

Returns the value of attribute agent_profile.



44
45
46
# File 'lib/octo/agent.rb', line 44

def agent_profile
  @agent_profile
end

#cache_statsObject (readonly)

Returns the value of attribute cache_stats.



44
45
46
# File 'lib/octo/agent.rb', line 44

def cache_stats
  @cache_stats
end

#created_atObject (readonly)

Returns the value of attribute created_at.



44
45
46
# File 'lib/octo/agent.rb', line 44

def created_at
  @created_at
end

#errorObject (readonly)

Returns the value of attribute error.



44
45
46
# File 'lib/octo/agent.rb', line 44

def error
  @error
end

#historyObject (readonly)

Returns the value of attribute history.



44
45
46
# File 'lib/octo/agent.rb', line 44

def history
  @history
end

#iterationsObject (readonly)

Returns the value of attribute iterations.



44
45
46
# File 'lib/octo/agent.rb', line 44

def iterations
  @iterations
end

#latest_latencyObject (readonly)

Returns the value of attribute latest_latency.



44
45
46
# File 'lib/octo/agent.rb', line 44

def latest_latency
  @latest_latency
end

#nameObject (readonly)

Returns the value of attribute name.



44
45
46
# File 'lib/octo/agent.rb', line 44

def name
  @name
end

#pinnedObject

Returns the value of attribute pinned.



49
50
51
# File 'lib/octo/agent.rb', line 49

def pinned
  @pinned
end

#reasoning_effortObject

Returns the value of attribute reasoning_effort.



44
45
46
# File 'lib/octo/agent.rb', line 44

def reasoning_effort
  @reasoning_effort
end

#session_idObject (readonly)

Returns the value of attribute session_id.



44
45
46
# File 'lib/octo/agent.rb', line 44

def session_id
  @session_id
end

#skill_loaderObject (readonly)

Returns the value of attribute skill_loader.



44
45
46
# File 'lib/octo/agent.rb', line 44

def skill_loader
  @skill_loader
end

#sourceObject (readonly)

Returns the value of attribute source.



44
45
46
# File 'lib/octo/agent.rb', line 44

def source
  @source
end

#statusObject (readonly)

Returns the value of attribute status.



44
45
46
# File 'lib/octo/agent.rb', line 44

def status
  @status
end

#todosObject (readonly)

Returns the value of attribute todos.



44
45
46
# File 'lib/octo/agent.rb', line 44

def todos
  @todos
end

#total_tasksObject (readonly)

Returns the value of attribute total_tasks.



44
45
46
# File 'lib/octo/agent.rb', line 44

def total_tasks
  @total_tasks
end

#uiObject (readonly)

Returns the value of attribute ui.



44
45
46
# File 'lib/octo/agent.rb', line 44

def ui
  @ui
end

#updated_atObject (readonly)

Returns the value of attribute updated_at.



44
45
46
# File 'lib/octo/agent.rb', line 44

def updated_at
  @updated_at
end

#working_dirObject (readonly)

Returns the value of attribute working_dir.



44
45
46
# File 'lib/octo/agent.rb', line 44

def working_dir
  @working_dir
end

Class Method Details

.from_session(client, config, session_data, ui: nil, profile:) ⇒ Object

Restore from a saved session



143
144
145
146
147
148
149
150
151
152
# File 'lib/octo/agent.rb', line 143

def self.from_session(client, config, session_data, ui: nil, profile:)
  working_dir = session_data[:working_dir] || session_data["working_dir"] || Dir.pwd
  original_id = session_data[:session_id] || session_data["session_id"] || Octo::SessionManager.generate_id
  # Restore source from persisted data; fall back to :manual for legacy sessions
  source = (session_data[:source] || session_data["source"] || "manual").to_sym
  agent = new(client, config, working_dir: working_dir, ui: ui, profile: profile,
              session_id: original_id, source: source)
  agent.restore_session(session_data)
  agent
end

Instance Method Details

#add_hook(event, &block) ⇒ Object



154
155
156
# File 'lib/octo/agent.rb', line 154

def add_hook(event, &block)
  @hooks.add(event, &block)
end

#available_modelsObject

Get list of available model names



221
222
223
# File 'lib/octo/agent.rb', line 221

def available_models
  @config.model_names
end

#change_working_dir(new_dir) ⇒ Object

Change the working directory for this session Injects a new session context to notify the AI of the directory change



214
215
216
217
218
# File 'lib/octo/agent.rb', line 214

def change_working_dir(new_dir)
  @working_dir = new_dir
  inject_session_context
  true
end

#checkpoint!Object

Emit any messages added to @history since the last checkpoint through the registered @on_callback handler.



173
174
175
176
177
178
179
180
181
182
# File 'lib/octo/agent.rb', line 173

def checkpoint!
  return unless @on_checkpoint

  current = @history.to_a
  new_msgs = current[@checkpoint_index..] || []
  return if new_msgs.empty?

  @on_checkpoint.call(new_msgs)
  @checkpoint_index = current.size
end

#current_model_infoObject

Get current model configuration info



226
227
228
229
230
231
232
233
234
235
236
# File 'lib/octo/agent.rb', line 226

def current_model_info
  model = @config.current_model
  return nil unless model

  {
    id: model["id"],
    name: model["name"],
    model: model["model"],
    base_url: model["base_url"]
  }
end

#enqueue_injection(skill, task) ⇒ Object

Enqueue an inline skill injection to be flushed after observe(). Called by InvokeSkill#execute to avoid injecting during tool execution, which would break Bedrock’s toolUse/toolResult pairing requirement.

Parameters:

  • skill (Octo::Skill)

    The skill whose instructions should be injected

  • task (String)

    The task description passed to the skill



1537
1538
1539
# File 'lib/octo/agent.rb', line 1537

def enqueue_injection(skill, task)
  @pending_injections << { skill: skill, task: task }
end

#enqueue_user_message(content, files: []) ⇒ Object

Queue a user message (text and/or files) into the inbox. Returns a tristate describing what the caller should do next:

:running       — a run is currently in flight; the in-loop drain at
                 the next iteration boundary will pick this up.
                 Caller does NOT spawn a new run.
:spawn         — agent is idle AND no other caller has been told to
                 spawn yet. Caller IS responsible for spawning a
                 drain-only run (typically via run_agent_task so the
                 thread is registered for interrupt_session).
:spawn_pending — agent is idle BUT another concurrent caller has
                 already been told to spawn. Caller does NOT spawn —
                 the other run will absorb this message too.

Files are processed eagerly **on the caller’s thread** (typically the HTTP-handler thread) so the processed payload is fully formed by the time it sits in the inbox. The agent thread’s drain then just appends to @history — no @history mutation off-thread.



839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
# File 'lib/octo/agent.rb', line 839

def enqueue_user_message(content, files: [])
  processed = nil
  if files && !files.empty?
    processed = process_files_for_user_message(content, files)
  end

  result = nil
  pending_user_msgs = 0
  @state_mutex.synchronize do
    @inbox << {
      kind:        :user_msg,
      content:     content.to_s,
      processed:   processed,
      enqueued_at: Time.now
    }
    pending_user_msgs = @inbox.count { |i| i[:kind] == :user_msg }
    if @in_run_loop
      result = :running
    elsif @inbox_run_pending
      result = :spawn_pending
    else
      @inbox_run_pending = true
      result = :spawn
    end
  end

  # Only broadcast the "N waiting" hint when the message will actually
  # WAIT behind an in-flight run. For :spawn / :spawn_pending the agent
  # will drain it within milliseconds, so flashing a count then
  # immediately clearing it would just be visual noise.
  if result == :running
    broadcast_user_message_queue_status(pending_user_msgs)
  end

  Octo::Logger.info("agent.user_message_enqueued",
    session_id: @session_id,
    decision: result,
    has_files: !processed.nil?,
    pending_user_msgs: pending_user_msgs
  )
  result
end

#fork_subagent(model: nil, forbidden_tools: [], system_prompt_suffix: nil) ⇒ Agent

Fork a subagent with specified configuration The subagent inherits all messages and tools from parent agent Tools are not modified (for cache reuse), but forbidden tools are blocked at runtime via hooks

Parameters:

  • model (String, nil) (defaults to: nil)

    Model name to use (nil = use current model)

  • forbidden_tools (Array<String>) (defaults to: [])

    List of tool names to forbid

  • system_prompt_suffix (String, nil) (defaults to: nil)

    Additional instructions (inserted as user message for cache reuse)

Returns:

  • (Agent)

    New subagent instance



1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
# File 'lib/octo/agent.rb', line 1627

def fork_subagent(model: nil, forbidden_tools: [], system_prompt_suffix: nil)
  # Clone config to avoid affecting parent
  subagent_config = @config.deep_copy

  # Switch to specified model if provided
  if model
    if model == "lite"
      # Special keyword: use lite model if available, otherwise fall back to default.
      #
      # Lite is now a *virtual* role — we don't require it to exist as a
      # concrete entry in @models. Instead we derive it from whatever
      # model the user is currently on (current_model), so switching
      # primary models automatically re-pairs with the right lite
      # companion (Claude → Haiku, DeepSeek V4-pro → V4-flash, ...).
      lite_cfg = subagent_config.lite_model_config_for_current
      if lite_cfg
        if lite_cfg["virtual"]
          # Provider-preset derived: apply the lite fields as a *session
          # overlay* on the subagent's config — this intentionally avoids
          # mutating the shared @models array / hashes which would pollute
          # the parent agent's own current model (e.g. turning the parent's
          # Opus entry into Haiku for the rest of the session).
          subagent_config.apply_virtual_model_overlay!(
            "api_key"          => lite_cfg["api_key"],
            "base_url"         => lite_cfg["base_url"],
            "model"            => lite_cfg["model"],
            "anthropic_format" => lite_cfg["anthropic_format"]
          )
        elsif lite_cfg["id"]
          # Explicit user-configured lite (from OCTO_LITE_* env): a
          # real @models entry with a stable id. Switch to it normally.
          subagent_config.switch_model_by_id(lite_cfg["id"])
        end
      end
      # If no lite is resolvable, just use current (primary) model.
    else
      # Regular model name lookup — find the first model with a matching
      # name and switch by its stable id.
      target = subagent_config.models.find { |m| m["model"] == model }
      if target && target["id"]
        subagent_config.switch_model_by_id(target["id"])
      else
        raise AgentError, "Model '#{model}' not found in config. Available models: #{subagent_config.model_names.join(', ')}"
      end
    end
  end

  # Create new client for subagent
  subagent_client = Octo::Client.new(
    subagent_config.api_key,
    base_url: subagent_config.base_url,
    model: subagent_config.model_name,
    anthropic_format: subagent_config.anthropic_format?
  )

  # Create subagent (reuses all tools from parent, inherits agent profile from parent)
  # Subagent gets its own unique session_id.
  subagent = self.class.new(
    subagent_client,
    subagent_config,
    working_dir: @working_dir,
    ui: @ui,
    profile: @agent_profile.name,
    session_id: Octo::SessionManager.generate_id,
    source: @source
  )
  subagent.instance_variable_set(:@is_subagent, true)

  # Inherit previous_total_tokens so the first iteration delta is calculated correctly
  subagent.instance_variable_set(:@previous_total_tokens, @previous_total_tokens)

  # Deep clone history to avoid cross-contamination.
  # Dangling tool_calls (no tool_result yet) are cleaned up automatically by
  # MessageHistory#append when the subagent appends its first user message.
  cloned_messages = deep_clone(@history.to_a)
  subagent.instance_variable_set(:@history, MessageHistory.new(cloned_messages))

  # Append system prompt suffix as user message (for cache reuse)
  if system_prompt_suffix
    subagent_history = subagent.history

    # Build forbidden tools notice if any tools are forbidden
    forbidden_notice = if forbidden_tools.any?
      tool_list = forbidden_tools.map { |t| "`#{t}`" }.join(", ")
      "\n\n[System Notice] The following tools are disabled in this subagent and will be rejected if called: #{tool_list}"
    else
      ""
    end

    subagent_history.append({
      role: "user",
      content: "CRITICAL: TASK CONTEXT SWITCH - FORKED SUBAGENT MODE\n\nYou are now running as a forked subagent — a temporary, isolated agent spawned by the parent agent to handle a specific task. You run independently and cannot communicate back to the parent mid-task. When you finish (i.e., you stop calling tools and return a final response), your output will be automatically summarized and returned to the parent agent as a result so it can continue.\n\n#{system_prompt_suffix}#{forbidden_notice}",
      system_injected: true,
      subagent_instructions: true
    })

    # Insert an assistant acknowledgement so the conversation structure is complete:
    #   [user] role/constraints  →  [assistant] ack  →  [user] actual task (from run())
    subagent_history.append({
      role: "assistant",
      content: "Understood. I am now operating as a subagent with the constraints above. Please provide the task.",
      system_injected: true
    })
  end

  # Register hook to forbid certain tools at runtime (doesn't affect tool registry for cache)
  if forbidden_tools.any?
    subagent.add_hook(:before_tool_use) do |call|
      if forbidden_tools.include?(call[:name])
        {
          action: :deny,
          reason: "Tool '#{call[:name]}' is forbidden in this subagent context"
        }
      else
        { action: :allow }
      end
    end
  end

  # Mark subagent metadata for summary generation
  subagent.instance_variable_set(:@is_subagent, true)
  subagent.instance_variable_set(:@parent_message_count, @history.size)

  subagent
end

#generate_subagent_summary(subagent) ⇒ String

Generate summary from subagent execution Extracts new messages added by subagent and creates a concise summary This summary will replace the subagent instructions message in parent agent

Parameters:

  • subagent (Agent)

    The subagent that completed execution

Returns:

  • (String)

    Summary text to insert into parent agent



1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
# File 'lib/octo/agent.rb', line 1758

def generate_subagent_summary(subagent)
  parent_count = subagent.instance_variable_get(:@parent_message_count) || 0
  new_messages = subagent.history.to_a[parent_count..] || []

  # Extract tool calls
  tool_calls = new_messages
    .select { |m| m[:role] == "assistant" && m[:tool_calls] }
    .flat_map { |m| m[:tool_calls].map { |tc| tc[:name] } }
    .uniq

  # Extract final assistant response
  last_response = new_messages
    .reverse
    .find { |m| m[:role] == "assistant" && m[:content] && !m[:content].empty? }
    &.dig(:content)

  # Build summary (this will replace the subagent instructions message)
  parts = []
  parts << "[SUBAGENT SUMMARY]"
  parts << "Completed in #{subagent.iterations} iterations"
  parts << "Tools used: #{tool_calls.join(', ')}" if tool_calls.any?
  parts << ""
  parts << "Results:"
  parts << (last_response || "(No response)")

  parts.join("\n")
end

#in_run_loop?Boolean

True iff a thread is currently inside Agent#run (between acquiring whether a new user message can be enqueued (run in flight will drain it) or needs a fresh run() to be spawned (agent is idle).

Returns:

  • (Boolean)


818
819
820
# File 'lib/octo/agent.rb', line 818

def in_run_loop?
  @state_mutex.synchronize { @in_run_loop }
end

#inbox_user_message_countObject

Public: count of pending :user_msg items in the inbox. Used by WebSocket reconnect to replay queue status to newly subscribed tabs.



761
762
763
# File 'lib/octo/agent.rb', line 761

def inbox_user_message_count
  @state_mutex.synchronize { @inbox.count { |i| i[:kind] == :user_msg } }
end

#inbox_user_messages_snapshotObject

Public: snapshot of pending :user_msg items in the inbox, in a format ready for replay via UI#show_user_message on WebSocket reconnect. Each entry: { content:, files:, created_at: } — files is an array of display-file hashes (name, data_url, mime_type).



769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
# File 'lib/octo/agent.rb', line 769

def inbox_user_messages_snapshot
  @state_mutex.synchronize do
    @inbox.select { |i| i[:kind] == :user_msg }.map do |item|
      created_at = item[:enqueued_at] || Time.now
      if item[:processed]
        {
          content:    item[:processed][:user_content],
          files:      item[:processed][:display_files] || [],
          created_at: created_at.to_f
        }
      else
        {
          content:    item[:content],
          files:      [],
          created_at: created_at.to_f
        }
      end
    end
  end
end

#interrupt_current_run!Object

Set @discard_threshold to now and (best-effort) raise AgentInterrupted into the thread currently inside Agent#run. Called by http_server’s interrupt_session in addition to the existing session.raise —the existing path only catches user-msg runs (whose thread is tracked in session).

Idempotent: harmless to call multiple times. Best-effort: Thread#raise against a thread blocked deep in a syscall may not fire immediately; the watchdog in http_server escalates if needed.



799
800
801
802
803
804
805
806
807
808
809
810
811
812
# File 'lib/octo/agent.rb', line 799

def interrupt_current_run!
  target = nil
  @state_mutex.synchronize do
    @discard_threshold = Time.now
    target = @current_run_thread
  end
  return false unless target
  begin
    target.raise(Octo::AgentInterrupted, "Interrupted by user")
  rescue StandardError
    # Thread may have just exited; nothing to do.
  end
  true
end

#on_checkpoint(&block) ⇒ Object

Register a callback that receives newly-added messages after each iteration checkpoint. Used by the server layer for incremental persistence (.jsonl crash-recovery logs).



161
162
163
# File 'lib/octo/agent.rb', line 161

def on_checkpoint(&block)
  @on_checkpoint = block
end

#permission_modeObject



53
54
55
# File 'lib/octo/agent.rb', line 53

def permission_mode
  @config&.permission_mode&.to_s || ""
end

#redact_tool_args(args) ⇒ String, ...

Redact volatile tmpdir paths from tool call arguments before showing in UI.

Parameters:

  • args (String, Hash, nil)

    Raw tool arguments

Returns:

  • (String, Hash, nil)

    Redacted arguments (same type as input)



1544
1545
1546
# File 'lib/octo/agent.rb', line 1544

def redact_tool_args(args)
  args
end

#rename(new_name) ⇒ Object

Rename this session. Called by auto-naming (first message) or user explicit rename.



244
245
246
# File 'lib/octo/agent.rb', line 244

def rename(new_name)
  @name = new_name.to_s.strip
end

#reset_checkpoint!Object

Reset the checkpoint cursor so the next checkpoint! captures everything added from this point forward.



167
168
169
# File 'lib/octo/agent.rb', line 167

def reset_checkpoint!
  @checkpoint_index = @history.to_a.size
end

#resume_with_notification(notification_content, bubble: nil) ⇒ Object

Called by BackgroundTaskRegistry when a fire-and-forget background task completes. Two delivery paths depending on agent state:

- Agent is mid-run  → enqueue; the per-iteration drain picks it up
- Agent is idle     → start a fresh run on the caller's thread


895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
# File 'lib/octo/agent.rb', line 895

def resume_with_notification(notification_content, bubble: nil)
  enqueue_at = Time.now
  should_run = false

  @state_mutex.synchronize do
    if @in_run_loop
      @inbox << {
        kind:        :bg_notification,
        content:     notification_content,
        bubble:      bubble,
        enqueued_at: enqueue_at
      }
      Octo::Logger.info("agent.notification_queued",
        session_id: @session_id,
        queue_size: @inbox.size
      )
      return
    end
    should_run = true
  end

  return unless should_run

  emit_bubble_for_notification(bubble)

  formatted = format_notification_for_history(notification_content)
  run("", system_notification: formatted, _bg_enqueued_at: enqueue_at)
end

#run(user_input = nil, files: [], system_notification: nil, _bg_enqueued_at: nil) ⇒ Object

Entry point for an agent turn. Two modes:

run("user typed this")  — user message mode
run                     — drain-only mode: nothing to append directly;
                           the inbox drain at iteration top is expected
                           to find something. If the inbox is also empty,
                           the loop exits immediately (no wasted LLM call).


255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
# File 'lib/octo/agent.rb', line 255

def run(user_input = nil, files: [], system_notification: nil, _bg_enqueued_at: nil)
  # 救法 1: bg-notification runs carry their enqueue timestamp. If a user
  # interrupt has bumped @discard_threshold past that timestamp, the
  # notification's "ticket" is stale — drop it before we grab the heavy
  # @run_mutex (which a real user message may be waiting behind us for).
  if system_notification && _bg_enqueued_at
    @state_mutex.synchronize do
      return if @discard_threshold && _bg_enqueued_at < @discard_threshold
    end
  end

  # Serialize every Agent#run invocation so concurrent callers cannot
  # mutate @history, @iterations, etc. simultaneously.
  @run_mutex.synchronize do
    if system_notification && _bg_enqueued_at
      @state_mutex.synchronize do
        return if @discard_threshold && _bg_enqueued_at < @discard_threshold
      end
    end

    @state_mutex.synchronize do
      @in_run_loop = true
      @current_run_thread = Thread.current
      # We're entering run() — any concurrent caller that observed
      # @inbox_run_pending == true and decided NOT to spawn can rely
      # on this run absorbing their inbox items. Clear the flag.
      @inbox_run_pending = false
    end

    # Drain-only mode: no direct input, and nothing queued either. Don't
    # bother the LLM with an empty turn.
    if user_input.nil? && system_notification.nil?
      empty_inbox = @state_mutex.synchronize { @inbox.empty? }
      if empty_inbox
        @state_mutex.synchronize do
          @in_run_loop = false
          @current_run_thread = nil
        end
        Octo::Logger.info("agent.drain_only_run_empty_inbox", session_id: @session_id)
        return
      end
    end

  # Show the "thinking" indicator as early as possible so the user gets
  # immediate feedback after sending a message. Without this the UI stays
  # silent during synchronous setup work (system prompt assembly, file
  # parsing, history compression checks) before the first LLM call. The
  # subsequent `think` call will re-emit show_progress, which is an
  # idempotent update on the same progress UI element.
  @ui&.show_progress

  # Start new task for Time Machine
  task_id = start_new_task

  @start_time = Time.now
  @task_truncation_count = 0  # Reset truncation counter for each task
  @task_timeout_hint_injected = false  # Reset read-timeout hint injection (see LlmCaller)
  @task_upstream_truncation_hint_injected = false  # Reset upstream-truncation hint injection (see LlmCaller)
  # Note: Do NOT reset @previous_total_tokens here - it should maintain the value from the last iteration
  # across tasks to correctly calculate delta tokens in each iteration
  @task_start_iterations = @iterations  # Track starting iterations for this task
  # Track cache stats for current task
  @task_cache_stats = {
    cache_creation_input_tokens: 0,
    cache_read_input_tokens: 0,
    total_requests: 0,
    cache_hit_requests: 0
  }

  # Deferred error rollback: if the previous session ended with an error,
  # trim history back to just before that failed user message now — at the
  # point the user actually sends a new message, not at restore time.
  # (Trimming at restore time caused replay_history to return empty results.)
  if @pending_error_rollback
    @pending_error_rollback = false
    last_user_index = @history.last_real_user_index
    if last_user_index
      @history.truncate_from(last_user_index)
      @hooks.trigger(:session_rollback, {
        reason: "Previous session ended with error — rolling back before new message",
        rolled_back_message_index: last_user_index
      })
    end
  end

  # Add system prompt as the first message if this is the first run
  if @history.empty?
    system_prompt = build_system_prompt
    @history.append({ role: "system", content: system_prompt })
  end

  # Inject session context (date + model) if not yet present or date has changed
  inject_session_context_if_needed

  # Inject chunk index card if archived chunks exist and index is stale
  inject_chunk_index_if_needed

  if system_notification
    # System notification mode — triggered by background task completion.
    # Skip all user input processing; inject the notification directly.
    @history.append({
      role: "user",
      content: system_notification,
      system_injected: true,
      task_id: task_id,
      created_at: Time.now.to_f
    })
  elsif user_input.nil? || user_input.empty?
    # Drain-only mode: nothing to append now. The iteration drain at the
    # top of the loop will pick up whatever's in @inbox (which is why we
    # were started — see http_server's follow-up run spawn).
  else
    # Normal user message mode — files may or may not be attached.
    processed = process_files_for_user_message(user_input, files)
    append_processed_user_message_to_history!(processed, task_id)

    # If the user typed a slash command targeting a skill with disable-model-invocation: true,
    # inject the skill content as a synthetic assistant message so the LLM can act on it.
    # Skills already in the system prompt (model_invocation_allowed?) are skipped.
    inject_skill_command_as_assistant_message(user_input, task_id)
  end

  @hooks.trigger(:on_start, system_notification ? "[background task notification]" : user_input)

  begin
    # Track if request_user_feedback was called
    awaiting_user_feedback = false
    # Track if task was interrupted by user (denied tool execution)
    task_interrupted = false

    loop do
      @iterations += 1
      @hooks.trigger(:on_iteration, @iterations)

      # Drain any inbox items (queued user messages) since the last
      # iteration. Without this, items sit in the queue until the WHOLE
      # run completes — latency drops from "minutes" to "one LLM turn".
      drain_inbox_into_history!(task_id)

      # Persist drained inbox messages immediately so they survive a
      # crash during the upcoming think() call.
      checkpoint!

      # Think: LLM reasoning with tool support
      response = think
      @last_token_usage = response[:token_usage] if response && response[:token_usage]

      # Debug: check for potential infinite loops
      if @config.verbose
        @ui&.log("Iteration #{@iterations}: finish_reason=#{response[:finish_reason]}, tool_calls=#{response[:tool_calls]&.size || 'nil'}", level: :debug)
      end

      # Skip if compression happened (response is nil)
      if response.nil?
        checkpoint!
        next
      end

      # Checkpoint immediately after think() so the assistant message
      # (and any tool_calls it contains) is persisted before we enter
      # act(). If a crash happens between think() and the next
      # iteration-boundary checkpoint, recovery can at least restore
      # the model's intent.
      checkpoint!

      # [DIAG] Only log when finish_reason=="stop" AND tool_calls non-empty —
      # the suspicious combo that indicates an upstream-truncated tool_use
      # response. Normal responses produce no log line here to avoid noise.
      begin
        tool_calls = response[:tool_calls] || []
        if response[:finish_reason] == "stop" && !tool_calls.empty?
          tc_summary = tool_calls.map do |c|
            args_str = c[:arguments].is_a?(String) ? c[:arguments] : c[:arguments].to_s
            {
              name: c[:name].to_s,
              args_len: args_str.length,
              args_head: args_str[0, 120]
            }
          end
          Octo::Logger.warn("agent.think_response",
            session_id: @session_id,
            iteration: @iterations,
            finish_reason: response[:finish_reason].to_s,
            tool_calls_count: tool_calls.size,
            tool_calls: tc_summary,
            content_len: response[:content].to_s.length,
            completion_tokens: response.dig(:token_usage, :completion_tokens),
            ttft_ms: response.dig(:latency, :ttft_ms),
            suspicious_truncation: true
          )
        end
      rescue StandardError => e
        Octo::Logger.warn("agent.think_response.log_failed", error: e.message)
      end

      # Check if done (no more tool calls needed).
      #
      # Defensive rule: we ONLY exit on empty/missing tool_calls.
      # We used to also short-circuit on finish_reason=="stop", but
      # upstream routers (OpenRouter → Anthropic/Bedrock) can return the
      # contradictory combo `finish_reason=="stop" + non-empty tool_calls
      # with truncated args`, which caused the agent to silently treat a
      # truncated response as "task complete". Truncation is now caught
      # earlier by LlmCaller#detect_upstream_truncation! (which raises
      # UpstreamTruncatedError → RetryableError); this branch stays as
      # a belt-and-braces guard: if that detector ever misses a new
      # truncation pattern, we still won't silently exit while the model
      # is mid-tool_call.
      if response[:tool_calls].nil? || response[:tool_calls].empty?
        # [DIAG] Pin down exactly which sub-condition triggered the task exit.
        Octo::Logger.info("agent.loop_break_normal",
          session_id: @session_id,
          iteration: @iterations,
          branch: (response[:tool_calls].nil? ? "tool_calls_nil" : "tool_calls_empty"),
          finish_reason: response[:finish_reason].to_s,
          tool_calls_count: (response[:tool_calls] || []).size
        )
        if response[:content] && !response[:content].empty?
          emit_assistant_message(response[:content], reasoning_content: response[:reasoning_content])
        end

        # Debug: log why we're stopping
        if @config.verbose && (response[:tool_calls].nil? || response[:tool_calls].empty?)
          reason = response[:finish_reason] == "stop" ? "API returned finish_reason=stop" : "No tool calls in response"
          @ui&.log("Stopping: #{reason}", level: :debug)
          if response[:content] && response[:content].is_a?(String)
            preview = response[:content].length > 200 ? response[:content][0...200] + "..." : response[:content]
            @ui&.log("Response content: #{preview}", level: :debug)
          end
        end

        # A queued user message may have landed between this think() and
        # now. Don't break — loop back so the next iteration's drain
        # injects it and the LLM addresses it within the same warm-cache
        # run. Saves the full-context replay of a fresh run().
        if inbox_pending?
          Octo::Logger.info("agent.loop_continue_for_pending_inbox",
            session_id: @session_id,
            iteration: @iterations
          )
          checkpoint!
          next
        end

        checkpoint!
        break
      end

      # Show assistant message if there's content before tool calls
      if response[:content] && !response[:content].empty?
        emit_assistant_message(response[:content], reasoning_content: response[:reasoning_content])
      end

      # Act: Execute tool calls
      action_result = act(response[:tool_calls])

      # Check if request_user_feedback was called
      if action_result[:awaiting_feedback]
        awaiting_user_feedback = true
        observe(response, action_result[:tool_results])
        flush_pending_injections
        checkpoint!
        break
      end

      # Observe: Add tool results to conversation context
      observe(response, action_result[:tool_results])

      # Flush any inline skill injections enqueued by invoke_skill during act().
      # Must happen AFTER observe() so toolResult is appended before skill instructions,
      # producing a legal message sequence for all API providers (especially Bedrock).
      flush_pending_injections

      # Check if user denied any tool
      if action_result[:denied]
        task_interrupted = true
        # If user provided feedback, treat it as a user question/instruction
        if action_result[:feedback] && !action_result[:feedback].empty?
          # Add user feedback as a new user message with system_injected marker
          @history.append({
            role: "user",
            content: "The user has a question/feedback for you: #{action_result[:feedback]}\n\nPlease respond to the user's question/feedback before continuing with any actions.",
            system_injected: true
          })
          checkpoint!
          # Continue loop to let agent respond to feedback
          next
        else
          # User just said "no" without feedback - stop and wait
          @ui&.show_assistant_message("Tool execution was denied. Please give more instructions...", files: [])
          checkpoint!
          break
        end
      end

      # Normal iteration end — persist incremental checkpoint before
      # the next loop iteration so crash recovery captures this turn.
      checkpoint!
    end

  result = build_result

  # Save snapshots of modified files for Time Machine
  if @modified_files_in_task && !@modified_files_in_task.empty?
    save_modified_files_snapshot(@modified_files_in_task)
    @modified_files_in_task = []  # Reset for next task
  end

    # Run skill evolution hooks after main loop completes
    # Skip if task was interrupted by user (denied tool) or awaiting user feedback
    # Only for main agent (not subagents) to avoid recursive evolution
    unless @is_subagent || task_interrupted || awaiting_user_feedback
      run_skill_evolution_hooks
    end

    # Run long-term memory update as a forked subagent BEFORE we print
    # show_complete. Running it as a subagent (rather than inline in
    # the main loop) gives us correct visual ordering structurally:
    # the subagent blocks until done, its progress spinner finishes,
    # and only then [OK] Task Complete is printed. No cleanup dance,
    # no cross-method progress handle holding.
    # Skip on interrupt / feedback / subagent (self-guarded inside too).
    unless @is_subagent || task_interrupted || awaiting_user_feedback
      run_memory_update_subagent
    end

    if @is_subagent
      # Parent agent (skill_manager) prints the completion summary; skip here.
    else
      @ui&.show_complete(
        iterations: result[:iterations],
        duration: result[:duration_seconds],
        cache_stats: result[:cache_stats],
        awaiting_user_feedback: awaiting_user_feedback
      )
      # Show token usage once at task completion (not on every iteration)
      @ui&.show_token_usage(@last_token_usage) if @last_token_usage
    end

    # Fire async ghost-text prediction for the user's next message. Must
    # run AFTER show_complete (so the UI is in its idle "awaiting input"
    # state) and is fire-and-forget — the suggestion arrives later via
    # the UI's own +show_next_message_suggestion+ event.
    # Same guards as run_memory_update_subagent: skip if interrupted,
    # awaiting feedback, or running as a subagent.
    unless @is_subagent || task_interrupted || awaiting_user_feedback
      run_next_message_suggestion!
    end

    @hooks.trigger(:on_complete, result)
    result
  rescue Octo::AgentInterrupted
    # Let CLI handle the interrupt message
    raise
  rescue StandardError => e
    # Log complete error information to debug_logs for troubleshooting
    @debug_logs << {
      timestamp: Time.now.iso8601,
      event: "agent_run_error",
      error_class: e.class.name,
      error_message: e.message,
      backtrace: e.backtrace&.first(30) # Keep first 30 lines of backtrace
    }
    Octo::Logger.error("agent_run_error", error: e)

    # 400 errors mean our request was malformed — roll back history so the bad
    # message is not replayed on the next user turn.
    # Other errors (auth, network, etc.) leave history intact for retry.
    @pending_error_rollback = true if e.is_a?(Octo::BadRequestError)

    # Build error result for session data, but let CLI handle error display
    result = build_result(:error, error: e.message)  # rubocop:disable Lint/UselessAssignment
    raise
  ensure
    @state_mutex.synchronize do
      @in_run_loop = false
      @current_run_thread = nil
    end

    # Safety net: ensure any lingering progress spinner is stopped.
    # Normal paths close their own spinners; this guards against exceptions
    # raised between a progress slot's active/done pair.
    @ui&.show_progress(phase: "done")

    # Reap stale completed background tasks so the registry doesn't grow
    # unboundedly across a long session.
    begin
      BackgroundTaskRegistry.prune_completed(max_age: 3_600, agent_session_id: @session_id)
    rescue => e
      Octo::Logger.error("background_task_prune_error", error: e)
    end

    # If an inbox item arrived during the ensure block — i.e. AFTER the
    # last in-loop drain but BEFORE we released @run_mutex — handle it
    # on a fresh thread so we don't block the caller.
    flush_inbox_after_run
  end
  end  # @run_mutex.synchronize
end

#running?Boolean

Check if agent is currently running

Returns:

  • (Boolean)


1561
1562
1563
# File 'lib/octo/agent.rb', line 1561

def running?
  !@start_time.nil?
end

#switch_model_by_id(id) ⇒ Boolean

Switch this session to a different model, identified by its stable runtime id. Ids survive list reorders, additions, and field edits, which is why we no longer expose an index-based API.

Parameters:

  • id (String)

    Model id (see AgentConfig#parse_models)

Returns:

  • (Boolean)

    true if switched successfully, false otherwise



189
190
191
192
193
194
# File 'lib/octo/agent.rb', line 189

def switch_model_by_id(id)
  return false unless @config.switch_model_by_id(id)

  rebuild_client_for_current_model!
  true
end

#track_modified_files(tool_name, args) ⇒ Object

Track modified files for Time Machine snapshots

Parameters:

  • tool_name (String)

    Name of the tool that was executed

  • args (Hash)

    Arguments passed to the tool



2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
# File 'lib/octo/agent.rb', line 2169

def track_modified_files(tool_name, args)
  @modified_files_in_task ||= []

  case tool_name
  when "write", "edit"
    file_path = args[:path]
    full_path = File.expand_path(file_path, @working_dir)
    @modified_files_in_task << full_path unless @modified_files_in_task.include?(full_path)
  end
end