11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
# File 'lib/legion/runner.rb', line 11
def self.run(runner_class:, function:, task_id: nil, args: nil, check_subtask: true, generate_task: true, parent_id: nil, master_id: nil, catch_exceptions: false, **opts) started_at = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
lex_tag = derive_lex_tag(runner_class)
rlog = runner_logger(lex_tag)
rlog.info "[Runner] start: #{runner_class}##{function} task_id=#{task_id}"
runner_class = Kernel.const_get(runner_class) if runner_class.is_a? String
if task_id.nil? && generate_task
task_gen = Legion::Runner::Status.generate_task_id(
function: function,
runner_class: runner_class,
parent_id: parent_id, master_id: master_id, task_id: task_id, **opts
)
task_id = task_gen[:task_id] unless task_gen.nil?
end
args = opts if args.nil?
args[:task_id] = task_id unless task_id.nil?
args[:master_id] = master_id unless master_id.nil?
args[:parent_id] = parent_id unless parent_id.nil?
raise 'No Function defined' if function.nil?
result = nil
status = nil
Legion::Context.with_task_context(opts.merge(task_id: task_id, function: function, runner_class: runner_class.to_s)) do
result = if runner_class.respond_to?(:with_log_context)
runner_class.with_log_context(function) { runner_class.send(function, **args) }
else
runner_class.send(function, **args)
end
rescue Legion::Exception::HandledTask => e
rlog.debug "[Runner] HandledTask raised in #{runner_class}##{function}: #{e.message}"
status = 'task.exception'
result = { error: {} }
rescue StandardError => e
rlog.error "[Runner] exception in #{runner_class}##{function}: #{e.message}"
status = 'task.exception'
result = { success: false, status: status, error: { message: e.message, backtrace: e.backtrace } }
if runner_class.respond_to?(:handle_runner_exception)
begin
runner_class.handle_runner_exception(e,
**opts,
runner_class: runner_class,
args: args,
function: function,
task_id: task_id,
generate_task: generate_task,
check_subtask: check_subtask)
rescue Legion::Exception::HandledTask => handled
rlog.debug "[Runner] HandledTask raised while handling exception in #{runner_class}##{function}: #{handled.message}"
end
end
raise e unless catch_exceptions
end
ensure
status = 'task.completed' if status.nil?
duration_ms = ((::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - started_at) * 1000).round
rlog.info "[Runner] complete: #{runner_class}##{function} status=#{status} duration_ms=#{duration_ms}"
Legion::Events.emit("task.#{status == 'task.completed' ? 'completed' : 'failed'}",
task_id: task_id, runner_class: runner_class.to_s, function: function, status: status)
Legion::Runner::Status.update(task_id: task_id, status: status) unless task_id.nil?
if check_subtask && status == 'task.completed'
Legion::Transport::Messages::CheckSubtask.new(runner_class: runner_class,
function: function,
result: result,
original_args: args,
task_id: task_id,
master_id: master_id,
**opts).publish
end
if defined?(Legion::Audit)
begin
error_message = status == 'task.exception' ? result&.dig(:error, :message) : nil
Legion::Audit.record(
event_type: 'runner_execution',
principal_id: opts[:principal_id] || opts[:worker_id] || 'system',
principal_type: opts[:principal_type] || 'system',
action: 'execute',
resource: "#{runner_class}/#{function}",
source: opts[:source] || 'unknown',
status: status == 'task.completed' ? 'success' : 'failure',
duration_ms: duration_ms,
detail: { task_id: task_id, error: error_message }
)
rescue StandardError => e
rlog.debug("Audit in runner.run failed: #{e.message}")
end
end
return { success: true, status: status, result: result, task_id: task_id } end
|