Module: Legion::Extensions::Tasker::Runners::FetchDelayed
- Includes:
- Helpers::Lex, Helpers::Task, Helpers::TaskFinder
- Defined in:
- lib/legion/extensions/tasker/runners/fetch_delayed.rb
Instance Method Summary
collapse
#cache_get, #cache_set, #find_delayed, #find_subtasks, #find_trigger
Instance Method Details
#build_delayed_hash(task) ⇒ Object
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
# File 'lib/legion/extensions/tasker/runners/fetch_delayed.rb', line 26
def build_delayed_hash(task)
subtask_hash = {
relationship_id: task[:relationship_id],
chain_id: task[:chain_id],
function_id: task[:function_id],
function: task[:function_name],
runner_id: task[:runner_id],
runner_class: task[:runner_class],
task_id: task[:id],
exchange: task[:exchange],
queue: task[:queue]
}
subtask_hash[:conditions] = task[:conditions] if task[:conditions].is_a?(String)
subtask_hash[:transformation] = task[:transformation] if task[:transformation].is_a?(String)
subtask_hash[:routing_key] = delayed_routing_key(task)
subtask_hash
end
|
#delayed_by?(delay, created) ⇒ Boolean
22
23
24
|
# File 'lib/legion/extensions/tasker/runners/fetch_delayed.rb', line 22
def delayed_by?(delay, created)
delay.is_a?(Integer) && delay.positive? && Time.now < created + delay
end
|
#delayed_routing_key(task) ⇒ Object
45
46
47
48
49
50
51
52
53
|
# File 'lib/legion/extensions/tasker/runners/fetch_delayed.rb', line 45
def delayed_routing_key(task)
if task[:conditions].is_a?(String) && task[:conditions].length > 4
'task.subtask.conditioner'
elsif task[:transformation].is_a?(String) && task[:transformation].length > 4
'task.subtask.transform'
else
task[:runner_routing_key]
end
end
|
#fetch(**_opts) ⇒ Object
11
12
13
14
15
16
17
18
19
20
|
# File 'lib/legion/extensions/tasker/runners/fetch_delayed.rb', line 11
def fetch(**_opts)
find_delayed.each do |task|
next if delayed_by?(task[:relationship_delay], task[:created])
next if delayed_by?(task[:task_delay], task[:created])
subtask_hash = build_delayed_hash(task)
send_task(**subtask_hash)
update_delayed_status(task[:id], subtask_hash[:routing_key])
end
end
|
#send_task(**opts) ⇒ Object
64
65
66
67
68
69
70
71
72
73
74
75
|
# File 'lib/legion/extensions/tasker/runners/fetch_delayed.rb', line 64
def send_task(**opts)
opts[:results] = opts[:result] if opts.key?(:result) && !opts.key?(:results)
opts[:success] = if opts.key?(:result) && opts.key?(:success)
opts[:result].is_a?(Hash) ? opts[:result][:success] : opts[:success]
elsif opts.key?(:success)
opts[:success]
else
1
end
log.debug 'pushing delayed task to worker'
Legion::Transport::Messages::Task.new(**opts).publish
end
|
#update_delayed_status(task_id, routing_key) ⇒ Object
55
56
57
58
59
60
61
62
|
# File 'lib/legion/extensions/tasker/runners/fetch_delayed.rb', line 55
def update_delayed_status(task_id, routing_key)
status = case routing_key
when 'task.subtask.conditioner' then 'conditioner.queued'
when 'task.subtask.transform' then 'transformer.queued'
else 'task.queued'
end
task_update(task_id, status)
end
|