Class: Dynflow::Executors::Abstract::Core
  
  
  
  
  
    - Inherits:
 
    - 
      Actor
      
        
          - Object
 
          
            - Concurrent::Actor::Context
 
          
            - Actor
 
          
            - Dynflow::Executors::Abstract::Core
 
          
        
        show all
      
     
  
  
  
  
  
  
  
  
  
  
    - Defined in:
 
    - lib/dynflow/executors/abstract/core.rb
 
  
  
 
  Instance Attribute Summary collapse
  
  
    
      Instance Method Summary
      collapse
    
    
  
  
  
  
  
  
  
  
  
  Methods inherited from Actor
  #behaviour_definition, #terminating?
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  #log
  
  Constructor Details
  
    
  
  
    #initialize(world, heartbeat_interval, queues_options)  ⇒ Core 
  
  
  
  
    
Returns a new instance of Core.
   
 
  
  
    
      
9
10
11
12
13
14
15
16
17
18
19 
     | 
    
      # File 'lib/dynflow/executors/abstract/core.rb', line 9
def initialize(world, heartbeat_interval, queues_options)
  @logger         = world.logger
  @world          = Type! world, World
  @pools          = {}
  @terminated     = nil
  @director       = Director.new(@world)
  @heartbeat_interval = heartbeat_interval
  @queues_options = queues_options
  schedule_heartbeat
end
     | 
  
 
  
 
  
    Instance Attribute Details
    
      
      
      
  
  
    #logger  ⇒ Object  
  
  
  
  
    
Returns the value of attribute logger.
   
 
  
  
    
      
7
8
9 
     | 
    
      # File 'lib/dynflow/executors/abstract/core.rb', line 7
def logger
  @logger
end 
     | 
  
 
    
   
  
    Instance Method Details
    
      
  
  
    #dead_letter_routing  ⇒ Object 
  
  
  
  
    
      
84
85
86 
     | 
    
      # File 'lib/dynflow/executors/abstract/core.rb', line 84
def dead_letter_routing
  @world.dead_letter_handler
end 
     | 
  
 
    
      
  
  
    #execution_status(execution_plan_id = nil)  ⇒ Object 
  
  
  
  
    
      
88
89
90 
     | 
    
      # File 'lib/dynflow/executors/abstract/core.rb', line 88
def execution_status(execution_plan_id = nil)
  {}
end
     | 
  
 
    
      
  
  
    #finish_termination  ⇒ Object 
  
  
  
  
    
      
78
79
80
81
82 
     | 
    
      # File 'lib/dynflow/executors/abstract/core.rb', line 78
def finish_termination
  @director.terminate
  logger.info '... Dynflow core terminated.'
  super()
end 
     | 
  
 
    
      
  
  
    #halt(execution_plan_id)  ⇒ Object 
  
  
  
  
    
      
69
70
71 
     | 
    
      # File 'lib/dynflow/executors/abstract/core.rb', line 69
def halt(execution_plan_id)
  @director.halt execution_plan_id
end 
     | 
  
 
    
      
  
  
    #handle_event(event)  ⇒ Object 
  
  
  
  
    
      
30
31
32
33
34
35
36
37 
     | 
    
      # File 'lib/dynflow/executors/abstract/core.rb', line 30
def handle_event(event)
  Type! event, Director::Event
  if terminating?
    raise Dynflow::Error,
      "cannot accept event: #{event} core is terminating"
  end
  handle_work(@director.handle_event(event))
end
     | 
  
 
    
      
  
  
    #handle_execution(execution_plan_id, finished)  ⇒ Object 
  
  
  
  
    
      
21
22
23
24
25
26
27
28 
     | 
    
      # File 'lib/dynflow/executors/abstract/core.rb', line 21
def handle_execution(execution_plan_id, finished)
  if terminating?
    raise Dynflow::Error,
      "cannot accept execution_plan_id:#{execution_plan_id} core is terminating"
  end
  handle_work(@director.start_execution(execution_plan_id, finished))
end
     | 
  
 
    
      
  
  
    #handle_persistence_error(error, work = nil)  ⇒ Object 
  
  
  
  
    
      
59
60
61
62
63
64
65
66
67 
     | 
    
      # File 'lib/dynflow/executors/abstract/core.rb', line 59
def handle_persistence_error(error, work = nil)
  logger.error "PersistenceError in executor"
  logger.error error
  @director.work_failed(work) if work
  if error.is_a? Errors::FatalPersistenceError
    logger.fatal "Terminating"
    @world.terminate
  end
end
     | 
  
 
    
      
  
  
    #handle_planning(execution_plan_id)  ⇒ Object 
  
  
  
  
    
      
39
40
41
42
43
44
45
46 
     | 
    
      # File 'lib/dynflow/executors/abstract/core.rb', line 39
def handle_planning(execution_plan_id)
  if terminating?
    raise Dynflow::Error,
      "cannot accept event: #{event} core is terminating"
  end
  handle_work(@director.handle_planning(execution_plan_id))
end
     | 
  
 
    
      
  
  
    #heartbeat  ⇒ Object 
  
  
  
  
    
      
92
93
94
95
96
97
98
99
100
101
102
103
104
105 
     | 
    
      # File 'lib/dynflow/executors/abstract/core.rb', line 92
def heartbeat
  @logger.debug('Executor heartbeat')
  record = @world.coordinator.find_records(:id => @world.id,
                                           :class => ['Dynflow::Coordinator::ExecutorWorld', 'Dynflow::Coordinator::ClientWorld']).first
  unless record
    logger.error(%{Executor's world record for #{@world.id} missing: terminating})
    @world.terminate
    return
  end
  record.data[:meta].update(:last_seen => Dynflow::Dispatcher::ClientDispatcher::PingCache.format_time)
  @world.coordinator.update_record(record)
  schedule_heartbeat
end
     | 
  
 
    
      
  
  
    #plan_events(delayed_events)  ⇒ Object 
  
  
  
  
    
      
48
49
50
51
52 
     | 
    
      # File 'lib/dynflow/executors/abstract/core.rb', line 48
def plan_events(delayed_events)
  delayed_events.each do |event|
    @world.plan_event(event.execution_plan_id, event.step_id, event.event, event.time, optional: event.optional)
  end
end
     | 
  
 
    
      
  
  
    #start_termination(*args)  ⇒ Object 
  
  
  
  
    
      
73
74
75
76 
     | 
    
      # File 'lib/dynflow/executors/abstract/core.rb', line 73
def start_termination(*args)
  logger.info 'shutting down Core ...'
  super
end 
     | 
  
 
    
      
  
  
    #work_finished(work, delayed_events = nil)  ⇒ Object 
  
  
  
  
    
      
54
55
56
57 
     | 
    
      # File 'lib/dynflow/executors/abstract/core.rb', line 54
def work_finished(work, delayed_events = nil)
  handle_work(@director.work_finished(work))
  plan_events(delayed_events) if delayed_events
end 
     |