Class: RubyReactor::SidekiqAdapter

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_reactor/sidekiq_adapter.rb

Class Method Summary collapse

Class Method Details

.perform_async(serialized_context, reactor_class_name = nil, intermediate_results: {}) ⇒ Object



5
6
7
8
9
10
# File 'lib/ruby_reactor/sidekiq_adapter.rb', line 5

def self.perform_async(serialized_context, reactor_class_name = nil, intermediate_results: {})
  job_id = SidekiqWorkers::Worker.perform_async(serialized_context, reactor_class_name)
  context = ContextSerializer.deserialize(serialized_context)
  RubyReactor::AsyncResult.new(job_id: job_id, intermediate_results: intermediate_results,
                               execution_id: context.context_id)
end

.perform_in(delay, serialized_context, reactor_class_name = nil, intermediate_results: {}) ⇒ Object



12
13
14
15
16
17
# File 'lib/ruby_reactor/sidekiq_adapter.rb', line 12

def self.perform_in(delay, serialized_context, reactor_class_name = nil, intermediate_results: {})
  job_id = SidekiqWorkers::Worker.perform_in(delay, serialized_context, reactor_class_name)
  context = ContextSerializer.deserialize(serialized_context)
  RubyReactor::AsyncResult.new(job_id: job_id, intermediate_results: intermediate_results,
                               execution_id: context.context_id)
end

.perform_map_collection_async(parent_context_id:, map_id:, parent_reactor_class_name:, step_name:, strict_ordering:, timeout:) ⇒ Object

rubocop:disable Metrics/ParameterLists



65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/ruby_reactor/sidekiq_adapter.rb', line 65

def self.perform_map_collection_async(parent_context_id:, map_id:, parent_reactor_class_name:, step_name:,
                                      strict_ordering:, timeout:)
  job_id = RubyReactor::SidekiqWorkers::MapCollectorWorker.perform_async(
    {
      "parent_context_id" => parent_context_id,
      "map_id" => map_id,
      "parent_reactor_class_name" => parent_reactor_class_name,
      "step_name" => step_name,
      "strict_ordering" => strict_ordering,
      "timeout" => timeout
    }
  )
  RubyReactor::AsyncResult.new(job_id: job_id)
end

.perform_map_element_async(map_id:, element_id:, index:, serialized_inputs:, reactor_class_info:, strict_ordering:, parent_context_id:, parent_reactor_class_name:, step_name:, batch_size: nil, serialized_context: nil, fail_fast: nil) ⇒ Object

rubocop:disable Metrics/ParameterLists



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/ruby_reactor/sidekiq_adapter.rb', line 20

def self.perform_map_element_async(map_id:, element_id:, index:, serialized_inputs:, reactor_class_info:,
                                   strict_ordering:, parent_context_id:, parent_reactor_class_name:, step_name:,
                                   batch_size: nil, serialized_context: nil, fail_fast: nil)
  job_id = RubyReactor::SidekiqWorkers::MapElementWorker.perform_async(
    {
      "map_id" => map_id,
      "element_id" => element_id,
      "index" => index,
      "serialized_inputs" => serialized_inputs,
      "reactor_class_info" => reactor_class_info,
      "strict_ordering" => strict_ordering,
      "parent_context_id" => parent_context_id,
      "parent_reactor_class_name" => parent_reactor_class_name,
      "step_name" => step_name,
      "batch_size" => batch_size,
      "serialized_context" => serialized_context,
      "fail_fast" => fail_fast
    }
  )
  RubyReactor::AsyncResult.new(job_id: job_id)
end

.perform_map_element_in(delay, map_id:, element_id:, index:, serialized_inputs:, reactor_class_info:, strict_ordering:, parent_context_id:, parent_reactor_class_name:, step_name:, batch_size: nil, serialized_context: nil) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/ruby_reactor/sidekiq_adapter.rb', line 42

def self.perform_map_element_in(delay, map_id:, element_id:, index:, serialized_inputs:, reactor_class_info:,
                                strict_ordering:, parent_context_id:, parent_reactor_class_name:, step_name:,
                                batch_size: nil, serialized_context: nil)
  RubyReactor::SidekiqWorkers::MapElementWorker.perform_in(
    delay,
    {
      "map_id" => map_id,
      "element_id" => element_id,
      "index" => index,
      "serialized_inputs" => serialized_inputs,
      "reactor_class_info" => reactor_class_info,
      "strict_ordering" => strict_ordering,
      "parent_context_id" => parent_context_id,
      "parent_reactor_class_name" => parent_reactor_class_name,
      "step_name" => step_name,
      "batch_size" => batch_size,
      "serialized_context" => serialized_context
    }
  )
end

.perform_map_execution_async(map_id:, serialized_inputs:, reactor_class_info:, strict_ordering:, parent_context_id:, parent_reactor_class_name:, step_name:, fail_fast: nil) ⇒ Object

rubocop:enable Metrics/ParameterLists rubocop:disable Metrics/ParameterLists



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/ruby_reactor/sidekiq_adapter.rb', line 82

def self.perform_map_execution_async(map_id:, serialized_inputs:, reactor_class_info:, strict_ordering:,
                                     parent_context_id:, parent_reactor_class_name:, step_name:, fail_fast: nil)
  # rubocop:enable Metrics/ParameterLists
  job_id = RubyReactor::SidekiqWorkers::MapExecutionWorker.perform_async(
    {
      "map_id" => map_id,
      "serialized_inputs" => serialized_inputs,
      "reactor_class_info" => reactor_class_info,
      "strict_ordering" => strict_ordering,
      "parent_context_id" => parent_context_id,
      "parent_reactor_class_name" => parent_reactor_class_name,
      "step_name" => step_name,
      "fail_fast" => fail_fast
    }
  )
  RubyReactor::AsyncResult.new(job_id: job_id)
end