Class: Legion::LLM::Inference::Executor

Inherits:
Object
  • Object
show all
Includes:
ContextWindow, Escalation, Routing, ToolInjection, NativeToolLoop, RouteAttempts, Steps::Billing, Steps::Classification, Steps::ConfidenceScoring, Steps::Debate, Steps::GaiaAdvisory, Steps::KnowledgeCapture, Steps::Logging, Steps::Metering, Steps::PostResponse, Steps::PromptCache, Steps::RagContext, Steps::Rbac, Steps::SkillInjector, Steps::StickyPersist, Steps::StickyRunners, Steps::TokenBudget, Steps::ToolCalls, Steps::ToolDiscovery, Steps::ToolHistory, Steps::TriggerMatch, Legion::Logging::Helper
Defined in:
lib/legion/llm/inference/executor.rb,
lib/legion/llm/inference/executor/routing.rb,
lib/legion/llm/inference/executor/escalation.rb,
lib/legion/llm/inference/executor/context_window.rb,
lib/legion/llm/inference/executor/tool_injection.rb

Defined Under Namespace

Modules: ContextWindow, Escalation, Routing, ToolInjection Classes: ToolResultEvent

Constant Summary collapse

PRE_PROVIDER_STEPS =
%i[
  tracing_init idempotency conversation_uuid context_load
  rbac classification billing gaia_advisory tier_assignment rag_context
  trigger_match sticky_runners skill_injector tool_history_inject tool_discovery
  routing request_normalization token_budget
].freeze
POST_PROVIDER_STEPS =
%i[
  response_normalization post_response metering debate confidence_scoring
  tool_calls sticky_persist
  context_store knowledge_capture response_return
].freeze
STEPS =
(PRE_PROVIDER_STEPS + %i[provider_call] + POST_PROVIDER_STEPS).freeze
ASYNC_SAFE_STEPS =
%i[post_response knowledge_capture response_return].freeze
THINKING_TAG_PAIRS =
[
  ['<thinking>', '</thinking>'],
  ['<think>',    '</think>'],
  ['<thought>',  '</thought>']
].freeze
CONFIG_ERROR_PATTERNS =
[
  /AccessDeniedException/,
  /InvalidModel/i,
  /model.*not found/i,
  /not authorized/i,
  /AWS Marketplace/i
].freeze
REQUEST_PAYLOAD_ERROR_PATTERNS =
[
  /input_schema/i,
  /tools\.\d+/,
  /messages\.\d+/,
  /Field required/i,
  /ValidationException/
].freeze
CONTEXT_OVERFLOW_ERROR_PATTERNS =
[
  /maximum context length/i,
  /context length.*input_tokens/i,
  /prompt contains at least \d+ input tokens/i
].freeze
ASYNC_THREAD_POOL =
Concurrent::FixedThreadPool.new(4, fallback_policy: :caller_runs)

Constants included from Steps::StickyPersist

Steps::StickyPersist::SENSITIVE_PARAM_NAMES

Constants included from Steps::Debate

Steps::Debate::CHALLENGER_PROMPT, Steps::Debate::JUDGE_PROMPT, Steps::Debate::REBUTTAL_PROMPT

Constants included from Steps::KnowledgeCapture

Steps::KnowledgeCapture::EMBED_MAX_CHARS

Constants included from Steps::Classification

Steps::Classification::EMAIL_PATTERN, Steps::Classification::LEVELS, Steps::Classification::PHI_KEYWORDS, Steps::Classification::PII_PATTERNS, Steps::Classification::PII_PATTERNS_CORE, Steps::Classification::PII_PATTERNS_EXTENDED

Constants included from NativeToolLoop

NativeToolLoop::QWEN_PARAM_RE, NativeToolLoop::QWEN_TOOL_USE_RE

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Steps::StickyPersist

#step_sticky_persist

Methods included from Steps::ToolHistory

#step_tool_history_inject

Methods included from Steps::StickyRunners

#step_sticky_runners

Methods included from Steps::Metering

build_event, content_fields, flush_spool, identity_fields, operational_fields, publish_event, publish_or_spool, timing_and_context, token_fields

Methods included from Steps::Debate

#debate_enabled?, #gaia_debate_trigger?, #run_debate, #step_debate

Methods included from Steps::PromptCache

#apply_cache_control, #apply_conversation_breakpoint, #sort_tools_deterministically

Methods included from Steps::TokenBudget

#step_token_budget

Methods included from Steps::ConfidenceScoring

#step_confidence_scoring

Methods included from Steps::KnowledgeCapture

#step_knowledge_capture

Methods included from Steps::ToolCalls

#step_tool_calls

Methods included from Steps::ToolDiscovery

#step_tool_discovery

Methods included from Steps::SkillInjector

#step_skill_injector

Methods included from Steps::TriggerMatch

#step_trigger_match

Methods included from Steps::RagContext

#step_rag_context

Methods included from Steps::PostResponse

#step_post_response

Methods included from Steps::GaiaAdvisory

#build_partner_context, #step_gaia_advisory

Methods included from Steps::Billing

#step_billing

Methods included from Steps::Classification

#step_classification

Methods included from Steps::Rbac

#step_rbac

Methods included from ToolInjection

#add_native_tool_definition, #add_pinned_special_tool_definitions, #add_registry_tool_definitions, #add_requested_deferred_tool_definitions_from_settings, #add_settings_extensions_tool_definitions, #client_tool_passthrough_allowed?, #client_tool_passthrough_enabled?, #client_tool_passthrough_list, #client_tool_passthrough_name_variants, #client_tool_policy_variants, #native_dispatch_chat_options, #native_dispatch_options, #native_dispatch_thinking, #native_dispatch_tools, #native_tool_definition_duplicate?, #native_tool_definition_name_variants, #native_tool_definitions, #native_tool_loop_continuation_prompt, #native_tool_loop_system, #non_executable_client_tool?, #record_system_accounting, #record_tool_accounting, #registry_tool_injection_requested?, #request_tool_names, #request_tool_source, #resolve_registry_tool_source

Methods included from ContextWindow

#compact_to_fit, #empty_assistant_message?, #enforce_context_window, #estimate_message_tokens, #last_user_message_index, #native_dispatch_messages, #resolved_context_window, #strip_leading_thinking_block, #strip_thinking_from_history, #tool_result_message?, #trim_oversized_tool_results

Methods included from Escalation

#account_specific_error?, #attempt_escalation, #authentication_error?, #build_default_escalation_chain, #circuit_open?, #client_stream_error?, #config_error?, #context_overflow_error?, #emit_error_audit, #emit_escalation_attempt_audit, #emit_escalation_attempt_metering, #error_metadata, #escalation_attempt_hash, #escalation_move_type, #escalation_previous_failure_summary, #execute_provider_request, #execute_provider_request_native, #execute_provider_request_responses, #execute_provider_request_stream, #execute_provider_request_stream_native, #extract_retry_after, #log_escalation_attempt, #notify_stream_provider_failed, #notify_stream_provider_switched, #pipeline_escalation_enabled?, #pipeline_escalation_max_attempts, #pipeline_escalation_quality_threshold, #record_escalation_failure, #record_provider_response, #report_escalation_quality_failure, #report_provider_failure, #report_provider_health, #request_payload_error?, #routing_empty_chain_error, #run_escalation_resolution, #run_provider_call_single, #run_provider_call_with_escalation, #skip_all_provider_model_instances!, #skip_open_circuits?, #skip_same_tier!, #step_provider_call, #step_provider_call_stream

Methods included from Routing

#apply_proactive_tier_assignment, #apply_routing_resolution, #chain_required_capabilities, #estimate_request_tokens, #fallback_model_for_resolved_provider, #inferred_provider_tier, #local_provider?, #merge_response_offering_metadata, #merge_routing_intent, #native_tools_requested_for_routing?, #normalize_offering_metadata, #normalize_required_capabilities, #provider_scoped_instance, #record_forced_tier_selection, #request_has_vision_content?, #request_requires_thinking?, #resolve_model_to_local_provider, #resolve_provider_instance, #resolve_routing_state, #routing_field_explicit?, #routing_intent_for_request, #routing_intent_present?, #routing_request_state, #routing_resolution_for, #step_request_normalization, #step_routing, #step_tier_assignment, #use_native_dispatch?

Constructor Details

#initialize(request) ⇒ Executor

Returns a new instance of Executor.



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/legion/llm/inference/executor.rb', line 106

def initialize(request)
  @request = request
  @profile = Profile.derive(request.caller)
  @timeline = Timeline.new
  @tracing = nil
  @enrichments = {}
  @audit = {}
  @warnings = []
  @timestamps = { received: Time.now }
  @raw_response = nil
  @exchange_id = nil
  @discovered_tools = []
  @triggered_tools = []
  @resolved_provider = nil
  @resolved_instance = nil
  @resolved_model = nil
  @resolved_tier = nil
  @resolved_offering_id = nil
  @resolved_offering_metadata = {}
  @confidence_score = nil
  @escalation_chain = nil
  @escalation_history = []
  @route_attempts = []
  @current_escalation_context = nil
  @proactive_tier_assignment = nil
  @tool_event_handler = nil
  @sticky_turn_snapshot = nil
  @pending_tool_history = Concurrent::Array.new
  @pending_tool_history_mutex = Mutex.new
  @deferred_tool_audits = []
  @injected_tool_map = {}
  @native_tool_source_map = {}
  @freshly_triggered_keys = []
  @context_accounting = ContextAccounting.empty
end

Instance Attribute Details

#auditObject (readonly)

Returns the value of attribute audit.



34
35
36
# File 'lib/legion/llm/inference/executor.rb', line 34

def audit
  @audit
end

#confidence_scoreObject (readonly)

Returns the value of attribute confidence_score.



34
35
36
# File 'lib/legion/llm/inference/executor.rb', line 34

def confidence_score
  @confidence_score
end

#discovered_toolsObject (readonly)

Returns the value of attribute discovered_tools.



34
35
36
# File 'lib/legion/llm/inference/executor.rb', line 34

def discovered_tools
  @discovered_tools
end

#enrichmentsObject (readonly)

Returns the value of attribute enrichments.



34
35
36
# File 'lib/legion/llm/inference/executor.rb', line 34

def enrichments
  @enrichments
end

#escalation_chainObject (readonly)

Returns the value of attribute escalation_chain.



34
35
36
# File 'lib/legion/llm/inference/executor.rb', line 34

def escalation_chain
  @escalation_chain
end

#profileObject (readonly)

Returns the value of attribute profile.



34
35
36
# File 'lib/legion/llm/inference/executor.rb', line 34

def profile
  @profile
end

#requestObject (readonly)

Returns the value of attribute request.



34
35
36
# File 'lib/legion/llm/inference/executor.rb', line 34

def request
  @request
end

#timelineObject (readonly)

Returns the value of attribute timeline.



34
35
36
# File 'lib/legion/llm/inference/executor.rb', line 34

def timeline
  @timeline
end

#tool_event_handlerObject

Returns the value of attribute tool_event_handler.



37
38
39
# File 'lib/legion/llm/inference/executor.rb', line 37

def tool_event_handler
  @tool_event_handler
end

#tracingObject (readonly)

Returns the value of attribute tracing.



34
35
36
# File 'lib/legion/llm/inference/executor.rb', line 34

def tracing
  @tracing
end

#warningsObject (readonly)

Returns the value of attribute warnings.



34
35
36
# File 'lib/legion/llm/inference/executor.rb', line 34

def warnings
  @warnings
end

Instance Method Details

#callObject



142
143
144
145
146
147
148
149
150
151
# File 'lib/legion/llm/inference/executor.rb', line 142

def call
  set_log_context
  Thread.current[:legion_llm_in_pipeline] = true
  log.debug "[llm][executor] action=call request_id=#{@request.id} profile=#{@profile}"
  execute_steps
  build_response
ensure
  Thread.current[:legion_llm_in_pipeline] = nil
  clear_log_context
end

#call_responses(body:, stream: false, stream_observer: nil, &block) ⇒ Object



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/legion/llm/inference/executor.rb', line 170

def call_responses(body:, stream: false, stream_observer: nil, &block)
  @stream_observer = stream_observer
  set_log_context
  Thread.current[:legion_llm_in_pipeline] = true
  log.debug "[llm][executor] action=call_responses request_id=#{@request.id} profile=#{@profile} stream=#{stream}"

  execute_pre_provider_steps
  if pipeline_escalation_enabled?
    run_provider_call_with_escalation(responses_body: body, responses_stream: stream,
                                      stream_block: (stream ? block : nil))
    execute_post_provider_steps
    return build_response
  end

  # Post-routing gate (the real one): routing may have resolved to a
  # different provider than the request hint suggested (failover,
  # escalation, health-tracker rerouting). Re-check capability now
  # that @resolved_provider reflects the actual dispatch target —
  # otherwise dispatch_responses_request raises ProviderError
  # "unsupported capability :responses for provider X".
  unless resolved_provider_supports_responses?
    log.debug '[llm][executor] action=call_responses_fallback reason=resolved_unsupported ' \
              "request_id=#{@request.id} resolved_provider=#{@resolved_provider}"
    stream ? step_provider_call_stream(&block) : step_provider_call
    execute_post_provider_steps
    return build_response
  end

  execute_provider_request_responses(body: body, stream: stream, &block)
  execute_post_provider_steps
  build_response
ensure
  Thread.current[:legion_llm_in_pipeline] = nil
  clear_log_context
end

#call_stream(stream_observer: nil, &block) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/legion/llm/inference/executor.rb', line 153

def call_stream(stream_observer: nil, &block)
  @stream_observer = stream_observer
  return call unless block

  set_log_context
  Thread.current[:legion_llm_in_pipeline] = true
  log.debug "[llm][executor] action=call_stream request_id=#{@request.id} profile=#{@profile}"
  execute_pre_provider_steps
  step_provider_call_stream(&block)
  execute_post_provider_steps
  build_response
ensure
  @stream_observer = nil
  Thread.current[:legion_llm_in_pipeline] = nil
  clear_log_context
end

#context_accountingObject



39
40
41
# File 'lib/legion/llm/inference/executor.rb', line 39

def context_accounting
  @context_accounting ||= ContextAccounting.empty
end

#provider_supports_responses?Boolean

Returns true when the resolved provider’s adapter natively supports the Responses API. Internal decision — the API layer should call call_responses directly and let the executor handle fallback.

Returns:

  • (Boolean)


228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/legion/llm/inference/executor.rb', line 228

def provider_supports_responses?
  provider = @resolved_provider
  instance = @resolved_instance

  unless provider && instance && Call::Registry.registered?(provider, instance: instance)
    provider = @request.routing&.dig(:provider)
    instance = @request.routing&.dig(:instance)
  end

  return false unless provider && use_native_dispatch?(provider)

  ext = Call::Registry.for(provider, instance: instance)
  ext.respond_to?(:supports?) ? ext.supports?(:responses) : false
rescue StandardError => e
  handle_exception(e, level: :warn, handled: true, operation: 'llm.executor.provider_supports_responses',
                      provider: provider)
  false
end

#resolved_provider_supports_responses?Boolean

Post-routing capability check — same shape as provider_supports_responses? but anchored to the resolved provider only (no fallback to the request hint).

Returns:

  • (Boolean)


209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/legion/llm/inference/executor.rb', line 209

def resolved_provider_supports_responses?
  provider = @resolved_provider
  instance = @resolved_instance
  return false unless provider && use_native_dispatch?(provider)

  ext = Call::Registry.for(provider, instance: instance)
  return false unless ext

  ext.respond_to?(:supports?) ? ext.supports?(:responses) : false
rescue StandardError => e
  handle_exception(e, level: :warn, handled: true,
                      operation: 'llm.executor.resolved_provider_supports_responses',
                      provider: provider)
  false
end