Class: Dynflow::Executors::Abstract::Core
- Inherits:
-
Actor
- Object
- Concurrent::Actor::Context
- Actor
- Dynflow::Executors::Abstract::Core
show all
- Defined in:
- lib/dynflow/executors/abstract/core.rb
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods inherited from Actor
#behaviour_definition, #terminating?
#log
Constructor Details
#initialize(world, heartbeat_interval, queues_options) ⇒ Core
Returns a new instance of Core.
9
10
11
12
13
14
15
16
17
18
19
|
# File 'lib/dynflow/executors/abstract/core.rb', line 9
def initialize(world, heartbeat_interval, queues_options)
@logger = world.logger
@world = Type! world, World
@pools = {}
@terminated = nil
@director = Director.new(@world)
@heartbeat_interval = heartbeat_interval
@queues_options = queues_options
schedule_heartbeat
end
|
Instance Attribute Details
#logger ⇒ Object
Returns the value of attribute logger.
7
8
9
|
# File 'lib/dynflow/executors/abstract/core.rb', line 7
def logger
@logger
end
|
Instance Method Details
#dead_letter_routing ⇒ Object
84
85
86
|
# File 'lib/dynflow/executors/abstract/core.rb', line 84
def dead_letter_routing
@world.dead_letter_handler
end
|
#execution_status(execution_plan_id = nil) ⇒ Object
88
89
90
|
# File 'lib/dynflow/executors/abstract/core.rb', line 88
def execution_status(execution_plan_id = nil)
{}
end
|
#finish_termination ⇒ Object
78
79
80
81
82
|
# File 'lib/dynflow/executors/abstract/core.rb', line 78
def finish_termination
@director.terminate
logger.info '... Dynflow core terminated.'
super()
end
|
#halt(execution_plan_id) ⇒ Object
69
70
71
|
# File 'lib/dynflow/executors/abstract/core.rb', line 69
def halt(execution_plan_id)
@director.halt execution_plan_id
end
|
#handle_event(event) ⇒ Object
30
31
32
33
34
35
36
37
|
# File 'lib/dynflow/executors/abstract/core.rb', line 30
def handle_event(event)
Type! event, Director::Event
if terminating?
raise Dynflow::Error,
"cannot accept event: #{event} core is terminating"
end
handle_work(@director.handle_event(event))
end
|
#handle_execution(execution_plan_id, finished) ⇒ Object
21
22
23
24
25
26
27
28
|
# File 'lib/dynflow/executors/abstract/core.rb', line 21
def handle_execution(execution_plan_id, finished)
if terminating?
raise Dynflow::Error,
"cannot accept execution_plan_id:#{execution_plan_id} core is terminating"
end
handle_work(@director.start_execution(execution_plan_id, finished))
end
|
#handle_persistence_error(error, work = nil) ⇒ Object
59
60
61
62
63
64
65
66
67
|
# File 'lib/dynflow/executors/abstract/core.rb', line 59
def handle_persistence_error(error, work = nil)
logger.error "PersistenceError in executor"
logger.error error
@director.work_failed(work) if work
if error.is_a? Errors::FatalPersistenceError
logger.fatal "Terminating"
@world.terminate
end
end
|
#handle_planning(execution_plan_id) ⇒ Object
39
40
41
42
43
44
45
46
|
# File 'lib/dynflow/executors/abstract/core.rb', line 39
def handle_planning(execution_plan_id)
if terminating?
raise Dynflow::Error,
"cannot accept event: #{event} core is terminating"
end
handle_work(@director.handle_planning(execution_plan_id))
end
|
#heartbeat ⇒ Object
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
# File 'lib/dynflow/executors/abstract/core.rb', line 92
def heartbeat
@logger.debug('Executor heartbeat')
record = @world.coordinator.find_records(:id => @world.id,
:class => ['Dynflow::Coordinator::ExecutorWorld', 'Dynflow::Coordinator::ClientWorld']).first
unless record
logger.error(%{Executor's world record for #{@world.id} missing: terminating})
@world.terminate
return
end
record.data[:meta].update(:last_seen => Dynflow::Dispatcher::ClientDispatcher::PingCache.format_time)
@world.coordinator.update_record(record)
schedule_heartbeat
end
|
#plan_events(delayed_events) ⇒ Object
48
49
50
51
52
|
# File 'lib/dynflow/executors/abstract/core.rb', line 48
def plan_events(delayed_events)
delayed_events.each do |event|
@world.plan_event(event.execution_plan_id, event.step_id, event.event, event.time, optional: event.optional)
end
end
|
#start_termination(*args) ⇒ Object
73
74
75
76
|
# File 'lib/dynflow/executors/abstract/core.rb', line 73
def start_termination(*args)
logger.info 'shutting down Core ...'
super
end
|
#work_finished(work, delayed_events = nil) ⇒ Object
54
55
56
57
|
# File 'lib/dynflow/executors/abstract/core.rb', line 54
def work_finished(work, delayed_events = nil)
handle_work(@director.work_finished(work))
plan_events(delayed_events) if delayed_events
end
|