Module: Legion::Extensions::Tasker::Runners::CheckSubtask

Includes:
Helpers::Lex, Helpers::TaskFinder
Defined in:
lib/legion/extensions/tasker/runners/check_subtask.rb

Instance Method Summary collapse

Methods included from Helpers::TaskFinder

#cache_get, #cache_set, #find_delayed, #find_subtasks, #find_trigger

Instance Method Details

#build_task_hash(relationship, opts) ⇒ Object



31
32
33
34
35
36
37
38
39
# File 'lib/legion/extensions/tasker/runners/check_subtask.rb', line 31

def build_task_hash(relationship, opts)
  task_hash = relationship.dup
  task_hash[:status] = relationship[:delay].to_i.zero? ? 'conditioner.queued' : 'task.delayed'
  task_hash[:payload] = opts
  task_hash[:master_id] = resolve_master_id(opts)
  task_hash[:parent_id] = opts[:task_id] if opts.key?(:task_id)
  task_hash[:routing_key] = subtask_routing_key(relationship)
  task_hash
end

#chain_matches?(relationship, opts) ⇒ Boolean

Returns:

  • (Boolean)


25
26
27
28
29
# File 'lib/legion/extensions/tasker/runners/check_subtask.rb', line 25

def chain_matches?(relationship, opts)
  return true if relationship[:allow_new_chains]

  !relationship[:chain_id].nil? && opts.key?(:chain_id) && relationship[:chain_id] == opts[:chain_id]
end

#check_subtasks(runner_class:, function:, **opts) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
# File 'lib/legion/extensions/tasker/runners/check_subtask.rb', line 13

def check_subtasks(runner_class:, function:, **opts)
  trigger = find_trigger(runner_class: runner_class, function: function)
  return { success: true, subtasks: 0 } unless trigger.is_a?(Hash)

  find_subtasks(trigger_id: trigger[:function_id]).each do |relationship|
    next unless chain_matches?(relationship, opts)

    task_hash = build_task_hash(relationship, opts)
    dispatch_task(task_hash, trigger, opts)
  end
end

#dispatch_task(task_hash, trigger, opts) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/legion/extensions/tasker/runners/check_subtask.rb', line 58

def dispatch_task(task_hash, trigger, opts)
  trigger_info = { trigger_runner_id: trigger[:runner_id], trigger_function_id: trigger[:function_id] }
  results_value = opts[:result] || opts[:results]

  if results_value.is_a?(Array)
    results_value.each do |result|
      send_task(results: result, **trigger_info, **task_hash)
    end
  else
    send_task(results: resolve_results(opts), **trigger_info, **task_hash)
  end
end

#insert_task(relationship_id:, function_id:, **opts) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/legion/extensions/tasker/runners/check_subtask.rb', line 94

def insert_task(relationship_id:, function_id:, **opts)
  return nil unless defined?(Legion::Data::Model::Task)

  status = opts.fetch(:status, 'task.queued')
  master_id = opts[:master_id]
  parent_id = opts[:parent_id]

  insert_hash = { relationship_id: relationship_id, function_id: function_id, status: status }
  insert_hash[:master_id] = master_id.is_a?(Integer) ? master_id : (parent_id if parent_id.is_a?(Integer))
  insert_hash[:parent_id] = parent_id if parent_id.is_a?(Integer)
  insert_hash[:payload] = json_dump(opts)
  Legion::Data::Model::Task.insert(insert_hash)
end

#resolve_master_id(opts) ⇒ Object



41
42
43
44
45
46
# File 'lib/legion/extensions/tasker/runners/check_subtask.rb', line 41

def resolve_master_id(opts)
  return opts[:master_id] if opts.key?(:master_id)
  return opts[:parent_id] if opts.key?(:parent_id)

  opts[:task_id] if opts.key?(:task_id)
end

#resolve_results(opts) ⇒ Object



71
72
73
74
75
76
# File 'lib/legion/extensions/tasker/runners/check_subtask.rb', line 71

def resolve_results(opts)
  return opts[:results] if opts[:results].is_a?(Hash)
  return opts[:result] if opts[:result].is_a?(Hash)

  opts
end

#send_task(**opts) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/legion/extensions/tasker/runners/check_subtask.rb', line 78

def send_task(**opts)
  opts[:results] = opts[:result] if opts.key?(:result) && !opts.key?(:results)
  opts[:success] = if opts.key?(:result) && opts.key?(:success)
                     opts[:result].is_a?(Hash) ? opts[:result][:success] : opts[:success]
                   elsif opts.key?(:success)
                     opts[:success]
                   else
                     1
                   end

  opts[:task_id] = insert_task(**opts)
  return { status: true } unless opts[:delay].zero?

  Legion::Transport::Messages::SubTask.new(**opts).publish
end

#subtask_routing_key(relationship) ⇒ Object



48
49
50
51
52
53
54
55
56
# File 'lib/legion/extensions/tasker/runners/check_subtask.rb', line 48

def subtask_routing_key(relationship)
  if relationship[:conditions].is_a?(String) && relationship[:conditions].length > 4
    'task.subtask.conditioner'
  elsif relationship[:transformation].is_a?(String) && relationship[:transformation].length > 4
    'task.subtask.transform'
  else
    relationship[:runner_routing_key]
  end
end