Class: RubyReactor::SidekiqAdapter

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_reactor/sidekiq_adapter.rb

Class Method Summary collapse

Class Method Details

.perform_async(serialized_context, reactor_class_name = nil, intermediate_results: {}) ⇒ Object



5
6
7
8
9
10
# File 'lib/ruby_reactor/sidekiq_adapter.rb', line 5

def self.perform_async(serialized_context, reactor_class_name = nil, intermediate_results: {})
  job_id = SidekiqWorkers::Worker.perform_async(serialized_context, reactor_class_name)
  context = ContextSerializer.deserialize(serialized_context)
  RubyReactor::AsyncResult.new(job_id: job_id, intermediate_results: intermediate_results,
                               execution_id: context.context_id)
end

.perform_in(delay, serialized_context, reactor_class_name = nil, intermediate_results: {}) ⇒ Object



12
13
14
15
16
17
# File 'lib/ruby_reactor/sidekiq_adapter.rb', line 12

def self.perform_in(delay, serialized_context, reactor_class_name = nil, intermediate_results: {})
  job_id = SidekiqWorkers::Worker.perform_in(delay, serialized_context, reactor_class_name)
  context = ContextSerializer.deserialize(serialized_context)
  RubyReactor::AsyncResult.new(job_id: job_id, intermediate_results: intermediate_results,
                               execution_id: context.context_id)
end

.perform_map_collection_async(parent_context_id:, map_id:, parent_reactor_class_name:, step_name:, strict_ordering:, timeout:) ⇒ Object

rubocop:disable Metrics/ParameterLists



69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/ruby_reactor/sidekiq_adapter.rb', line 69

def self.perform_map_collection_async(parent_context_id:, map_id:, parent_reactor_class_name:, step_name:,
                                      strict_ordering:, timeout:)
  job_id = RubyReactor::SidekiqWorkers::MapCollectorWorker.perform_async(
    {
      "parent_context_id" => parent_context_id,
      "map_id" => map_id,
      "parent_reactor_class_name" => parent_reactor_class_name,
      "step_name" => step_name,
      "strict_ordering" => strict_ordering,
      "timeout" => timeout
    }
  )
  RubyReactor::AsyncResult.new(job_id: job_id)
end

.perform_map_element_async(map_id:, element_id:, index:, serialized_inputs:, reactor_class_info:, strict_ordering:, parent_context_id:, parent_reactor_class_name:, step_name:, batch_size: nil, serialized_context: nil, fail_fast: nil) ⇒ Object

rubocop:disable Metrics/ParameterLists



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/ruby_reactor/sidekiq_adapter.rb', line 20

def self.perform_map_element_async(map_id:, element_id:, index:, serialized_inputs:, reactor_class_info:,
                                   strict_ordering:, parent_context_id:, parent_reactor_class_name:, step_name:,
                                   batch_size: nil, serialized_context: nil, fail_fast: nil)
  job_id = RubyReactor::SidekiqWorkers::MapElementWorker.perform_async(
    {
      "map_id" => map_id,
      "element_id" => element_id,
      "index" => index,
      "serialized_inputs" => serialized_inputs,
      "reactor_class_info" => reactor_class_info,
      "strict_ordering" => strict_ordering,
      "parent_context_id" => parent_context_id,
      "parent_reactor_class_name" => parent_reactor_class_name,
      "step_name" => step_name,
      "batch_size" => batch_size,
      "serialized_context" => serialized_context,
      "fail_fast" => fail_fast
    }
  )
  RubyReactor::AsyncResult.new(job_id: job_id)
end

.perform_map_element_in(delay, map_id:, element_id:, index:, serialized_inputs:, reactor_class_info:, strict_ordering:, parent_context_id:, parent_reactor_class_name:, step_name:, batch_size: nil, serialized_context: nil, fail_fast: nil) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/ruby_reactor/sidekiq_adapter.rb', line 42

def self.perform_map_element_in(delay, map_id:, element_id:, index:, serialized_inputs:, reactor_class_info:,
                                strict_ordering:, parent_context_id:, parent_reactor_class_name:, step_name:,
                                batch_size: nil, serialized_context: nil, fail_fast: nil)
  job_id = RubyReactor::SidekiqWorkers::MapElementWorker.perform_in(
    delay,
    {
      "map_id" => map_id,
      "element_id" => element_id,
      "index" => index,
      "serialized_inputs" => serialized_inputs,
      "reactor_class_info" => reactor_class_info,
      "strict_ordering" => strict_ordering,
      "parent_context_id" => parent_context_id,
      "parent_reactor_class_name" => parent_reactor_class_name,
      "step_name" => step_name,
      "batch_size" => batch_size,
      "serialized_context" => serialized_context,
      "fail_fast" => fail_fast
    }
  )
  # Return an AsyncResult so RetryManager#handle_async_retry recognises the
  # element was successfully requeued and yields a RetryQueuedResult.
  RubyReactor::AsyncResult.new(job_id: job_id)
end