Class: Dynflow::Executors::Sidekiq::Core

Inherits:
Abstract::Core show all
Includes:
RedisLocking
Defined in:
lib/dynflow/executors/sidekiq/core.rb

Constant Summary collapse

TELEMETRY_UPDATE_INTERVAL =

update telemetry every 30s

30

Constants included from RedisLocking

RedisLocking::ACQUIRE_MISSING, RedisLocking::ACQUIRE_OK, RedisLocking::ACQUIRE_TAKEN, RedisLocking::REACQUIRE_SCRIPT, RedisLocking::REDIS_LOCK_KEY, RedisLocking::REDIS_LOCK_POLL_INTERVAL, RedisLocking::REDIS_LOCK_TTL, RedisLocking::RELEASE_SCRIPT

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from RedisLocking

#reacquire_orchestrator_lock, #release_orchestrator_lock, #wait_for_orchestrator_lock

Methods inherited from Abstract::Core

#dead_letter_routing, #finish_termination, #halt, #handle_event, #handle_execution, #handle_persistence_error, #handle_planning, #plan_events

Methods inherited from Actor

#behaviour_definition, #finish_termination, #terminating?

Methods included from MethodicActor

#on_message

Methods included from Actor::LogWithFullBacktrace

#log

Constructor Details

#initialize(world, *_args) ⇒ Core

Returns a new instance of Core.



28
29
30
31
32
33
34
35
# File 'lib/dynflow/executors/sidekiq/core.rb', line 28

def initialize(world, *_args)
  @world = world
  @logger = world.logger
  wait_for_orchestrator_lock
  super
  schedule_update_telemetry
  begin_startup!
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



26
27
28
# File 'lib/dynflow/executors/sidekiq/core.rb', line 26

def logger
  @logger
end

Instance Method Details

#begin_startup!Object



81
82
83
84
# File 'lib/dynflow/executors/sidekiq/core.rb', line 81

def begin_startup!
  WorkerJobs::DrainMarker.perform_async(@world.id)
  @recovery = true
end

#execution_status(execution_plan_id = nil) ⇒ Object

TODO: needs thoughs on how to implement it



49
50
51
# File 'lib/dynflow/executors/sidekiq/core.rb', line 49

def execution_status(execution_plan_id = nil)
  {}
end

#feed_pool(work_items) ⇒ Object



53
54
55
56
57
# File 'lib/dynflow/executors/sidekiq/core.rb', line 53

def feed_pool(work_items)
  work_items.each do |new_work|
    WorkerJobs::PerformWork.set(queue: suggest_queue(new_work)).perform_async(new_work)
  end
end

#heartbeatObject



37
38
39
40
# File 'lib/dynflow/executors/sidekiq/core.rb', line 37

def heartbeat
  super
  reacquire_orchestrator_lock
end

#start_termination(*args) ⇒ Object



42
43
44
45
46
# File 'lib/dynflow/executors/sidekiq/core.rb', line 42

def start_termination(*args)
  super
  release_orchestrator_lock
  finish_termination
end

#startup_completeObject



86
87
88
89
90
91
92
93
94
# File 'lib/dynflow/executors/sidekiq/core.rb', line 86

def startup_complete
  logger.info('Performing validity checks')
  @world.perform_validity_checks
  logger.info('Finished performing validity checks')
  if @world.delayed_executor && !@world.delayed_executor.started?
    @world.delayed_executor.start
  end
  @recovery = false
end

#update_telemetryObject



59
60
61
62
63
64
65
66
67
68
# File 'lib/dynflow/executors/sidekiq/core.rb', line 59

def update_telemetry
  sidekiq_queues = ::Sidekiq::Stats.new.queues
  @queues_options.keys.each do |queue|
    queue_size = sidekiq_queues[queue.to_s]
    if queue_size
      Dynflow::Telemetry.with_instance { |t| t.set_gauge(:dynflow_queue_size, queue_size, telemetry_options(queue)) }
    end
  end
  schedule_update_telemetry
end

#work_finished(work, delayed_events = nil) ⇒ Object



70
71
72
73
74
75
76
77
78
79
# File 'lib/dynflow/executors/sidekiq/core.rb', line 70

def work_finished(work, delayed_events = nil)
  # If the work item is sent in reply to a request from the current orchestrator, proceed
  if work.sender_orchestrator_id == @world.id
    super
  else
    # If we're in recovery, we can drop the work as the execution plan will be resumed during validity checks performed when leaving recovery
    # If we're not in recovery and receive an event from another orchestrator, it means it survived the queue draining.
    handle_unknown_work_item(work) unless @recovery
  end
end