Module: Legion::Extensions::Helpers::Task
- Includes:
- Base, Logger
- Included in:
- Lex
- Defined in:
- lib/legion/extensions/helpers/task.rb
Constant Summary
Constants included
from Base
Base::NAMESPACE_BOUNDARIES
Instance Method Summary
collapse
Methods included from Logger
#handle_runner_exception
Methods included from Base
#actor_class, #actor_const, #actor_name, #amqp_prefix, #calling_class, #calling_class_array, #from_json, #full_path, #lex_class, #lex_const, #lex_name, #lex_slug, #log_tag, #normalize, #runner_class, #runner_const, #runner_name, #segments, #settings_path, #table_prefix, #to_dotted_hash
Instance Method Details
#generate_task_id(function_id:, status: 'task.queued', **opts) ⇒ Object
52
53
54
55
56
57
58
59
60
61
|
# File 'lib/legion/extensions/helpers/task.rb', line 52
def generate_task_id(function_id:, status: 'task.queued', **opts)
insert = { status: status, function_id: function_id }
insert[:payload] = Legion::JSON.dump(opts[:payload]) if opts.key? :payload
insert[:function_args] = Legion::JSON.dump(opts[:args]) if opts.key? :args
%i[master_id parent_id relationship_id task_id].each do |column|
insert[column] = opts[column] if opts.key? column
end
{ success: true, task_id: Legion::Data::Model::Task.insert(insert), **insert }
end
|
#generate_task_log(task_id:, function:, runner_class: to_s, **payload) ⇒ Object
16
17
18
19
20
21
22
23
24
25
26
27
|
# File 'lib/legion/extensions/helpers/task.rb', line 16
def generate_task_log(task_id:, function:, runner_class: to_s, **payload)
begin
if Legion::Settings[:data][:connected]
runner_id = Legion::Data::Model::Runner[namespace: runner_class].values[:id]
function_id = Legion::Data::Model::Function.where(runner_id: runner_id, name: function).first.values[:id]
return true if Legion::Data::Model::TaskLog.insert(task_id: task_id, function_id: function_id, entry: Legion::JSON.dump(payload))
end
rescue StandardError => e
handle_exception(e, level: :warn)
end
Legion::Transport::Messages::TaskLog.new(task_id: task_id, runner_class: runner_class, function: function, entry: payload).publish
end
|
#task_update(task_id, status, use_database: true, **opts) ⇒ Object
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
# File 'lib/legion/extensions/helpers/task.rb', line 29
def task_update(task_id, status, use_database: true, **opts)
return if task_id.nil? || status.nil?
begin
if Legion::Settings[:data][:connected] && use_database
task = Legion::Data::Model::Task[task_id]
task.update(status: status)
return true
end
rescue StandardError => e
log.debug("task_update failed, reverting to rmq message, e: #{e.message}")
end
update_hash = { task_id: task_id, status: status }
%i[results payload function_args payload results].each do |column|
update_hash[column] = opts[column] if opts.key? column
end
Legion::Transport::Messages::TaskUpdate.new(**update_hash).publish
rescue StandardError => e
handle_exception(e, level: :fatal)
raise e
end
|