Module: Legion::Extensions::Tasker::Helpers::TaskFinder

Included in:
Client, Runners::CheckSubtask, Runners::FetchDelayed
Defined in:
lib/legion/extensions/tasker/helpers/task_finder.rb

Instance Method Summary collapse

Instance Method Details

#cache_get(key) ⇒ Object



8
9
10
11
12
13
14
# File 'lib/legion/extensions/tasker/helpers/task_finder.rb', line 8

def cache_get(key)
  return nil unless defined?(Legion::Cache) && Legion::Cache.respond_to?(:connected?) && cache_connected?

  Legion::Cache.get("tasker:#{key}") # rubocop:disable Legion/HelperMigration/DirectCache
rescue StandardError => _e
  nil
end

#cache_set(key, value, ttl: 60) ⇒ Object



16
17
18
19
20
21
22
# File 'lib/legion/extensions/tasker/helpers/task_finder.rb', line 16

def cache_set(key, value, ttl: 60)
  return unless defined?(Legion::Cache) && Legion::Cache.respond_to?(:connected?) && cache_connected?

  Legion::Cache.set("tasker:#{key}", value, ttl: ttl) # rubocop:disable Legion/HelperMigration/DirectCache
rescue StandardError => _e
  nil
end

#find_delayedObject



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/legion/extensions/tasker/helpers/task_finder.rb', line 60

def find_delayed(**)
  return [] unless defined?(Legion::Data::Model::Task)

  Legion::Data::Model::Task
    .join(:functions, id: :function_id)
    .join(:runners, id: Sequel[:functions][:runner_id])
    .join(:extensions, id: Sequel[:runners][:extension_id])
    .left_join(:relationships, id: Sequel[:tasks][:relationship_id])
    .where(Sequel[:tasks][:status] => 'task.delayed')
    .select(
      Sequel[:tasks][:id],
      Sequel[:tasks][:relationship_id],
      Sequel[:tasks][:function_id],
      Sequel[:tasks][:created],
      Sequel[:relationships][:delay].as(:relationship_delay),
      Sequel[:relationships][:chain_id],
      Sequel[:functions][:name].as(:function_name),
      Sequel[:runners][:namespace].as(:runner_class),
      Sequel[:runners][:id].as(:runner_id),
      Sequel[:runners][:queue],
      Sequel[:extensions][:exchange],
      Sequel[:tasks][:task_delay]
    ).all.map do |task|
      task[:runner_routing_key] = "#{task[:exchange]}.#{task[:queue]}.#{task[:function_name]}"
      task
    end
end

#find_subtasks(trigger_id:) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/legion/extensions/tasker/helpers/task_finder.rb', line 44

def find_subtasks(trigger_id:, **)
  return [] unless defined?(Legion::Data::Model::Relationship)

  cache_key = "find_subtasks:#{trigger_id}"
  cached = cache_get(cache_key)
  return cached if cached.is_a?(Array)

  results = subtask_query(trigger_id).all.map do |row|
    row[:runner_routing_key] = "#{row[:exchange]}.#{row[:queue]}.#{row[:function]}"
    row
  end

  cache_set(cache_key, results, ttl: 5) if results.is_a?(Array) && results.any?
  results
end

#find_trigger(runner_class:, function:) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/legion/extensions/tasker/helpers/task_finder.rb', line 24

def find_trigger(runner_class:, function:, **)
  return nil unless defined?(Legion::Data::Model::Function)

  cache_key = "find_trigger:#{runner_class}:#{function}"
  cached = cache_get(cache_key)
  return cached if cached.is_a?(Hash)

  result = Legion::Data::Model::Function
           .join(:runners, id: :runner_id)
           .where(Sequel[:functions][:name]    => function,
                  Sequel[:runners][:namespace] => runner_class)
           .select(Sequel[:functions][:id].as(:function_id),
                   Sequel[:functions][:runner_id],
                   Sequel[:runners][:namespace])
           .first

  cache_set(cache_key, result) if result
  result
end