Class: Dynflow::Executors::Sidekiq::Core
- Inherits:
-
Abstract::Core
- Object
- Concurrent::Actor::Context
- Actor
- Abstract::Core
- Dynflow::Executors::Sidekiq::Core
- Includes:
- RedisLocking
- Defined in:
- lib/dynflow/executors/sidekiq/core.rb
Constant Summary collapse
- TELEMETRY_UPDATE_INTERVAL =
update telemetry every 30s
30
Constants included from RedisLocking
RedisLocking::ACQUIRE_MISSING, RedisLocking::ACQUIRE_OK, RedisLocking::ACQUIRE_TAKEN, RedisLocking::REACQUIRE_SCRIPT, RedisLocking::REDIS_LOCK_KEY, RedisLocking::REDIS_LOCK_POLL_INTERVAL, RedisLocking::REDIS_LOCK_TTL, RedisLocking::RELEASE_SCRIPT
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
Instance Method Summary collapse
- #begin_startup! ⇒ Object
-
#execution_status(execution_plan_id = nil) ⇒ Object
TODO: needs thoughs on how to implement it.
- #feed_pool(work_items) ⇒ Object
- #heartbeat ⇒ Object
-
#initialize(world, *_args) ⇒ Core
constructor
A new instance of Core.
- #start_termination(*args) ⇒ Object
- #startup_complete ⇒ Object
- #update_telemetry ⇒ Object
- #work_finished(work, delayed_events = nil) ⇒ Object
Methods included from RedisLocking
#reacquire_orchestrator_lock, #release_orchestrator_lock, #wait_for_orchestrator_lock
Methods inherited from Abstract::Core
#dead_letter_routing, #finish_termination, #halt, #handle_event, #handle_execution, #handle_persistence_error, #handle_planning, #plan_events
Methods inherited from Actor
#behaviour_definition, #finish_termination, #terminating?
Methods included from MethodicActor
Methods included from Actor::LogWithFullBacktrace
Constructor Details
#initialize(world, *_args) ⇒ Core
Returns a new instance of Core.
28 29 30 31 32 33 34 35 |
# File 'lib/dynflow/executors/sidekiq/core.rb', line 28 def initialize(world, *_args) @world = world @logger = world.logger wait_for_orchestrator_lock super schedule_update_telemetry begin_startup! end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
26 27 28 |
# File 'lib/dynflow/executors/sidekiq/core.rb', line 26 def logger @logger end |
Instance Method Details
#begin_startup! ⇒ Object
81 82 83 84 |
# File 'lib/dynflow/executors/sidekiq/core.rb', line 81 def begin_startup! WorkerJobs::DrainMarker.perform_async(@world.id) @recovery = true end |
#execution_status(execution_plan_id = nil) ⇒ Object
TODO: needs thoughs on how to implement it
49 50 51 |
# File 'lib/dynflow/executors/sidekiq/core.rb', line 49 def execution_status(execution_plan_id = nil) {} end |
#feed_pool(work_items) ⇒ Object
53 54 55 56 57 |
# File 'lib/dynflow/executors/sidekiq/core.rb', line 53 def feed_pool(work_items) work_items.each do |new_work| WorkerJobs::PerformWork.set(queue: suggest_queue(new_work)).perform_async(new_work) end end |
#heartbeat ⇒ Object
37 38 39 40 |
# File 'lib/dynflow/executors/sidekiq/core.rb', line 37 def heartbeat super reacquire_orchestrator_lock end |
#start_termination(*args) ⇒ Object
42 43 44 45 46 |
# File 'lib/dynflow/executors/sidekiq/core.rb', line 42 def start_termination(*args) super release_orchestrator_lock finish_termination end |
#startup_complete ⇒ Object
86 87 88 89 90 91 92 93 94 |
# File 'lib/dynflow/executors/sidekiq/core.rb', line 86 def startup_complete logger.info('Performing validity checks') @world.perform_validity_checks logger.info('Finished performing validity checks') if @world.delayed_executor && !@world.delayed_executor.started? @world.delayed_executor.start end @recovery = false end |
#update_telemetry ⇒ Object
59 60 61 62 63 64 65 66 67 68 |
# File 'lib/dynflow/executors/sidekiq/core.rb', line 59 def update_telemetry sidekiq_queues = ::Sidekiq::Stats.new.queues @queues_options.keys.each do |queue| queue_size = sidekiq_queues[queue.to_s] if queue_size Dynflow::Telemetry.with_instance { |t| t.set_gauge(:dynflow_queue_size, queue_size, (queue)) } end end schedule_update_telemetry end |
#work_finished(work, delayed_events = nil) ⇒ Object
70 71 72 73 74 75 76 77 78 79 |
# File 'lib/dynflow/executors/sidekiq/core.rb', line 70 def work_finished(work, delayed_events = nil) # If the work item is sent in reply to a request from the current orchestrator, proceed if work.sender_orchestrator_id == @world.id super else # If we're in recovery, we can drop the work as the execution plan will be resumed during validity checks performed when leaving recovery # If we're not in recovery and receive an event from another orchestrator, it means it survived the queue draining. handle_unknown_work_item(work) unless @recovery end end |