20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
# File 'lib/legion/llm/inference/steps/sticky_persist.rb', line 20
def step_sticky_persist
return unless sticky_persist_ready?
conv_id = @request.conversation_id
state = Inference::Conversation.read_sticky_state(conv_id).dup
runners = (state[:sticky_runners] || {}).dup
deferred_count = state[:deferred_tool_calls] || 0
tool_snapshot = if Legion::Settings::Extensions.respond_to?(:tools)
Array(Legion::Settings::Extensions.tools)
.select { |t| t.is_a?(Hash) && t[:name] }
.to_h { |t| [t[:name], t] }
else
{}
end
pending_snapshot = @pending_tool_history.dup completed = pending_snapshot.select { |e| e[:result] && !e[:error] }
log_step_debug(
:sticky_persist,
:state_loaded,
runner_count: runners.size,
pending_count: pending_snapshot.size,
completed_count: completed.size,
deferred_count: deferred_count
)
executed_runner_keys = []
deferred_call_count = 0
completed.each do |entry|
tc = @injected_tool_map[entry[:tool_name]] || tool_snapshot[entry[:tool_name]]
next unless tool_entry_deferred?(tc)
key = entry[:runner_key] || tool_entry_runner_key(tc)
executed_runner_keys << key
deferred_call_count += 1
end
executed_runner_keys.uniq!
deferred_count += deferred_call_count
state[:deferred_tool_calls] = deferred_count
executed_runner_keys.each do |key|
existing = runners[key]
new_expiry = deferred_count + execution_sticky_tool_calls
runners[key] = {
tier: :executed,
expires_after_deferred_call: [existing&.dig(:expires_after_deferred_call) || 0, new_expiry].max
}
end
(@freshly_triggered_keys - executed_runner_keys).each do |key|
existing = runners[key]
if (existing&.dig(:tier) == :executed) && (deferred_count < (existing[:expires_after_deferred_call] || 0))
next
end
existing_expiry = runners.dig(key, :expires_at_turn) || 0
new_expiry = @sticky_turn_snapshot + trigger_sticky_turns + 1
runners[key] = { tier: :triggered, expires_at_turn: [existing_expiry, new_expiry].max }
end
state[:sticky_runners] = runners
if pending_snapshot.any?
history = (state[:tool_call_history] || []).dup
pending_snapshot.each do |entry|
next unless entry[:result]
tc = @injected_tool_map[entry[:tool_name]] || tool_snapshot[entry[:tool_name]]
runner_key = entry[:runner_key] || (tc ? tool_entry_runner_key(tc) : 'unknown')
history << {
tool: entry[:tool_name],
runner: runner_key,
turn: @sticky_turn_snapshot,
args: sanitize_args(truncate_args(normalize_history_args(entry[:args]))),
result: entry[:result].to_s[0, max_result_length],
error: entry[:error] || false
}
end
state[:tool_call_history] = history.last(max_history_entries)
end
Inference::Conversation.write_sticky_state(conv_id, state)
log_step_debug(
:sticky_persist,
:state_written,
runner_count: runners.size,
executed_runner_count: executed_runner_keys.size,
deferred_call_count: deferred_call_count,
history_count: state[:tool_call_history]&.size || 0
)
rescue StandardError => e
@warnings << "sticky_persist error: #{e.message}"
handle_exception(e, level: :warn, operation: 'llm.pipeline.step_sticky_persist')
end
|