Class: Collavre::Orchestration::LoopBreaker
- Inherits:
-
Object
- Object
- Collavre::Orchestration::LoopBreaker
- Defined in:
- app/services/collavre/orchestration/loop_breaker.rb
Overview
LoopBreaker detects and prevents infinite loops in agent orchestration.
Detects:
-
Ping-pong: Same two agents messaging back and forth too many times
-
Creative retry: Too many tasks created on same creative in time window
-
Task timeout: Single task running too long
-
Token spike: Abnormal token usage in short time window
Usage:
breaker = LoopBreaker.new(context, policy_resolver: resolver)
result = breaker.check
if result.should_break?
# Handle loop detection - escalate to human
end
Defined Under Namespace
Classes: Result
Constant Summary collapse
- CACHE_PREFIX =
"loop_breaker"- CACHE_EXPIRY =
1.hour
Instance Method Summary collapse
-
#check ⇒ Object
Main entry point - checks all loop conditions.
-
#initialize(context, policy_resolver: nil) ⇒ LoopBreaker
constructor
A new instance of LoopBreaker.
-
#record_interaction(from_agent_id, to_agent_id, creative_id) ⇒ Object
Record an interaction for ping-pong detection.
-
#record_task(creative_id, agent_id, topic_id: nil, triggered_by_user: false) ⇒ Object
Record a task creation for creative retry detection (per-topic) Skips recording for user-initiated messages (only agent-to-agent counts as potential loop).
-
#record_tokens(agent_id, tokens_used) ⇒ Object
Record token usage for spike detection.
-
#reset_for_creative(creative_id) ⇒ Object
Reset tracking for a creative (after human intervention).
Constructor Details
#initialize(context, policy_resolver: nil) ⇒ LoopBreaker
Returns a new instance of LoopBreaker.
34 35 36 37 38 |
# File 'app/services/collavre/orchestration/loop_breaker.rb', line 34 def initialize(context, policy_resolver: nil) @context = context @policy_resolver = policy_resolver || PolicyResolver.new(context) @config = @policy_resolver.loop_breaker_config end |
Instance Method Details
#check ⇒ Object
Main entry point - checks all loop conditions
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'app/services/collavre/orchestration/loop_breaker.rb', line 41 def check return Result.new(should_break: false, reason: nil, details: {}) unless enabled? # Check each condition checks = [ check_ping_pong, check_creative_retry, check_task_timeout, check_token_spike ] # Return first violation found violation = checks.find(&:should_break?) violation || Result.new(should_break: false, reason: nil, details: {}) end |
#record_interaction(from_agent_id, to_agent_id, creative_id) ⇒ Object
Record an interaction for ping-pong detection
58 59 60 61 62 63 64 65 66 67 68 |
# File 'app/services/collavre/orchestration/loop_breaker.rb', line 58 def record_interaction(from_agent_id, to_agent_id, creative_id) key = "#{CACHE_PREFIX}:ping_pong_history:#{creative_id}" interactions = Rails.cache.read(key) || [] interactions << { at: Time.current.to_i, from: from_agent_id, to: to_agent_id } # Keep only recent interactions (within window) window_start = (Time.current - ping_pong_window).to_i interactions = interactions.select { |i| i[:at] >= window_start } Rails.cache.write(key, interactions, expires_in: CACHE_EXPIRY) end |
#record_task(creative_id, agent_id, topic_id: nil, triggered_by_user: false) ⇒ Object
Record a task creation for creative retry detection (per-topic) Skips recording for user-initiated messages (only agent-to-agent counts as potential loop)
72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'app/services/collavre/orchestration/loop_breaker.rb', line 72 def record_task(creative_id, agent_id, topic_id: nil, triggered_by_user: false) return if triggered_by_user key = topic_tasks_key(creative_id, topic_id) tasks = Rails.cache.read(key) || [] tasks << { at: Time.current.to_i, agent_id: agent_id } # Keep only tasks within window window_start = (Time.current - creative_retry_window).to_i tasks = tasks.select { |t| t[:at] >= window_start } Rails.cache.write(key, tasks, expires_in: CACHE_EXPIRY) end |
#record_tokens(agent_id, tokens_used) ⇒ Object
Record token usage for spike detection
87 88 89 90 91 92 93 94 95 96 97 |
# File 'app/services/collavre/orchestration/loop_breaker.rb', line 87 def record_tokens(agent_id, tokens_used) key = token_usage_key(agent_id) usage = Rails.cache.read(key) || [] usage << { at: Time.current.to_i, tokens: tokens_used } # Keep only usage within window window_start = (Time.current - token_spike_window).to_i usage = usage.select { |u| u[:at] >= window_start } Rails.cache.write(key, usage, expires_in: CACHE_EXPIRY) end |
#reset_for_creative(creative_id) ⇒ Object
Reset tracking for a creative (after human intervention)
100 101 102 103 104 |
# File 'app/services/collavre/orchestration/loop_breaker.rb', line 100 def reset_for_creative(creative_id) # Clear all related cache keys Rails.cache.delete_matched("#{CACHE_PREFIX}:creative:#{creative_id}:*") Rails.cache.delete_matched("#{CACHE_PREFIX}:ping_pong:*:#{creative_id}") end |