Module: Legion::Extensions::Tasker::Helpers::TaskFinder
- Included in:
- Client, Runners::CheckSubtask, Runners::FetchDelayed
- Defined in:
- lib/legion/extensions/tasker/helpers/task_finder.rb
Instance Method Summary collapse
- #cache_get(key) ⇒ Object
- #cache_set(key, value, ttl: 60) ⇒ Object
- #find_delayed ⇒ Object
- #find_subtasks(trigger_id:) ⇒ Object
- #find_trigger(runner_class:, function:) ⇒ Object
Instance Method Details
#cache_get(key) ⇒ Object
8 9 10 11 12 13 14 |
# File 'lib/legion/extensions/tasker/helpers/task_finder.rb', line 8 def cache_get(key) return nil unless defined?(Legion::Cache) && Legion::Cache.respond_to?(:connected?) && cache_connected? Legion::Cache.get("tasker:#{key}") # rubocop:disable Legion/HelperMigration/DirectCache rescue StandardError => _e nil end |
#cache_set(key, value, ttl: 60) ⇒ Object
16 17 18 19 20 21 22 |
# File 'lib/legion/extensions/tasker/helpers/task_finder.rb', line 16 def cache_set(key, value, ttl: 60) return unless defined?(Legion::Cache) && Legion::Cache.respond_to?(:connected?) && cache_connected? Legion::Cache.set("tasker:#{key}", value, ttl: ttl) # rubocop:disable Legion/HelperMigration/DirectCache rescue StandardError => _e nil end |
#find_delayed ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/legion/extensions/tasker/helpers/task_finder.rb', line 60 def find_delayed(**) return [] unless defined?(Legion::Data::Model::Task) Legion::Data::Model::Task .join(:functions, id: :function_id) .join(:runners, id: Sequel[:functions][:runner_id]) .join(:extensions, id: Sequel[:runners][:extension_id]) .left_join(:relationships, id: Sequel[:tasks][:relationship_id]) .where(Sequel[:tasks][:status] => 'task.delayed') .select( Sequel[:tasks][:id], Sequel[:tasks][:relationship_id], Sequel[:tasks][:function_id], Sequel[:tasks][:created], Sequel[:relationships][:delay].as(:relationship_delay), Sequel[:relationships][:chain_id], Sequel[:functions][:name].as(:function_name), Sequel[:runners][:namespace].as(:runner_class), Sequel[:runners][:id].as(:runner_id), Sequel[:runners][:queue], Sequel[:extensions][:exchange], Sequel[:tasks][:task_delay] ).all.map do |task| task[:runner_routing_key] = "#{task[:exchange]}.#{task[:queue]}.#{task[:function_name]}" task end end |
#find_subtasks(trigger_id:) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/legion/extensions/tasker/helpers/task_finder.rb', line 44 def find_subtasks(trigger_id:, **) return [] unless defined?(Legion::Data::Model::Relationship) cache_key = "find_subtasks:#{trigger_id}" cached = cache_get(cache_key) return cached if cached.is_a?(Array) results = subtask_query(trigger_id).all.map do |row| row[:runner_routing_key] = "#{row[:exchange]}.#{row[:queue]}.#{row[:function]}" row end cache_set(cache_key, results, ttl: 5) if results.is_a?(Array) && results.any? results end |
#find_trigger(runner_class:, function:) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/legion/extensions/tasker/helpers/task_finder.rb', line 24 def find_trigger(runner_class:, function:, **) return nil unless defined?(Legion::Data::Model::Function) cache_key = "find_trigger:#{runner_class}:#{function}" cached = cache_get(cache_key) return cached if cached.is_a?(Hash) result = Legion::Data::Model::Function .join(:runners, id: :runner_id) .where(Sequel[:functions][:name] => function, Sequel[:runners][:namespace] => runner_class) .select(Sequel[:functions][:id].as(:function_id), Sequel[:functions][:runner_id], Sequel[:runners][:namespace]) .first cache_set(cache_key, result) if result result end |