Class: Dynflow::Action
- Inherits:
-
Serializable
show all
- Extended by:
- Format
- Includes:
- Algebrick::Matching, Algebrick::TypeCheck, Progress, Rescue
- Defined in:
- lib/dynflow/action.rb,
lib/dynflow/action/v2.rb,
lib/dynflow/action/singleton.rb
Overview
rubocop:disable Metrics/ClassLength
Defined Under Namespace
Modules: Cancellable, Executable, Format, Phase, Polling, Progress, Rescue, Singleton, Timeouts, V2, WithBulkSubPlans, WithPollingSubPlans, WithSubPlans
Classes: Missing, Suspended
Constant Summary
collapse
- OutputReference =
ExecutionPlan::OutputReference
- ERROR =
Object.new
- SUSPEND =
Object.new
- Skip =
Algebrick.atom
- DelayedEvent =
Algebrick.type do
fields! execution_plan_id: String,
step_id: Integer,
event: Object,
time: type { variants Time, NilClass },
optional: Algebrick::Types::Boolean
end
Constants included
from Rescue
Rescue::Strategy, Rescue::SuggestedStrategy
Constants inherited
from Serializable
Serializable::LEGACY_TIME_FORMAT, Serializable::TIME_FORMAT
Instance Attribute Summary collapse
Attributes included from Progress
#calculated_progress
Class Method Summary
collapse
Instance Method Summary
collapse
-
#action_logger ⇒ Object
-
#all_planned_actions(filter_class = Action) ⇒ Array<Action>
returned actions are in Present phase.
-
#caller_action ⇒ Object
-
#delayed_events ⇒ Object
-
#drop_output_chunks! ⇒ Object
-
#error ⇒ Object
-
#execute(*args) ⇒ Object
-
#execute_delay(delay_options, *args) ⇒ Object
-
#execution_plan ⇒ Object
-
#finalize_step ⇒ Object
-
#from_subscription? ⇒ Boolean
-
#holds_singleton_lock? ⇒ Boolean
-
#humanized_state ⇒ Object
action: used in Dynflow console.
-
#initialize(attributes, world) ⇒ Action
constructor
A new instance of Action.
-
#label ⇒ Object
-
#output ⇒ Object
-
#output=(hash) ⇒ Object
-
#output_chunk(chunk, kind: nil, timestamp: Time.now) ⇒ Object
-
#phase!(*phases) ⇒ Object
-
#phase?(*phases) ⇒ Boolean
-
#plan_event(event, time = nil, execution_plan_id: self.execution_plan_id, step_id: self.run_step_id, optional: false) ⇒ Object
Plan an event
to be send to the action defined by action
, what defaults to be self.
-
#plan_step ⇒ Object
-
#planned_actions(filter = Action) ⇒ Array<Action>
returned actions are in Present phase.
-
#queue ⇒ Object
queue defined here will also be used as the default queue for all the steps planned under this action, unless overrided by sub-action.
-
#required_step_ids(input = self.input) ⇒ Array<Integer>
private
-
#run_step ⇒ Object
-
#serializer ⇒ Object
-
#set_plan_context(execution_plan, triggering_action, from_subscription) ⇒ Object
-
#state ⇒ Object
-
#steps ⇒ Object
-
#stored_output_chunks ⇒ Object
-
#to_hash ⇒ Object
-
#triggering_action ⇒ Object
action that caused this action to be planned.
Methods included from Format
input_format, output_format
Methods included from Rescue
#combine_suggested_strategies, #rescue_strategy, #rescue_strategy_for_planned_action, #rescue_strategy_for_self
Methods included from Progress
#finalize_progress, #finalize_progress_weight, #run_progress, #run_progress_weight
from_hash
Constructor Details
#initialize(attributes, world) ⇒ Action
Returns a new instance of Action.
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
# File 'lib/dynflow/action.rb', line 114
def initialize(attributes, world)
Type! attributes, Hash
@phase = Type! attributes.fetch(:phase), Phase
@world = Type! world, World
@step = Type! attributes.fetch(:step, nil), ExecutionPlan::Steps::Abstract, NilClass
raise ArgumentError, 'Step reference missing' if phase?(Executable) && @step.nil?
@execution_plan_id = Type! attributes.fetch(:execution_plan_id), String
@id = Type! attributes.fetch(:id), Integer
@plan_step_id = Type! attributes.fetch(:plan_step_id), Integer
@run_step_id = Type! attributes.fetch(:run_step_id), Integer, NilClass
@finalize_step_id = Type! attributes.fetch(:finalize_step_id), Integer, NilClass
@execution_plan = Type!(attributes.fetch(:execution_plan), ExecutionPlan) if phase? Present
@caller_execution_plan_id = Type!(attributes.fetch(:caller_execution_plan_id, nil), String, NilClass)
@caller_action_id = Type!(attributes.fetch(:caller_action_id, nil), Integer, NilClass)
getter = ->key, required do
required ? attributes.fetch(key) : attributes.fetch(key, {})
end
@input = OutputReference.deserialize getter.(:input, phase?(Run, Finalize, Present))
@output = OutputReference.deserialize getter.(:output, false) if phase? Run, Finalize, Present
@pending_output_chunks = [] if phase? Run, Finalize
end
|
Instance Attribute Details
#caller_action_id ⇒ Object
Returns the value of attribute caller_action_id.
107
108
109
|
# File 'lib/dynflow/action.rb', line 107
def caller_action_id
@caller_action_id
end
|
#caller_execution_plan_id ⇒ Object
Returns the value of attribute caller_execution_plan_id.
107
108
109
|
# File 'lib/dynflow/action.rb', line 107
def caller_execution_plan_id
@caller_execution_plan_id
end
|
#execution_plan_id ⇒ Object
Returns the value of attribute execution_plan_id.
107
108
109
|
# File 'lib/dynflow/action.rb', line 107
def execution_plan_id
@execution_plan_id
end
|
#finalize_step_id ⇒ Object
Returns the value of attribute finalize_step_id.
107
108
109
|
# File 'lib/dynflow/action.rb', line 107
def finalize_step_id
@finalize_step_id
end
|
#id ⇒ Object
Returns the value of attribute id.
107
108
109
|
# File 'lib/dynflow/action.rb', line 107
def id
@id
end
|
Returns the value of attribute input.
107
108
109
|
# File 'lib/dynflow/action.rb', line 107
def input
@input
end
|
#pending_output_chunks ⇒ Object
Returns the value of attribute pending_output_chunks.
107
108
109
|
# File 'lib/dynflow/action.rb', line 107
def pending_output_chunks
@pending_output_chunks
end
|
#phase ⇒ Object
Returns the value of attribute phase.
107
108
109
|
# File 'lib/dynflow/action.rb', line 107
def phase
@phase
end
|
#plan_step_id ⇒ Object
Returns the value of attribute plan_step_id.
107
108
109
|
# File 'lib/dynflow/action.rb', line 107
def plan_step_id
@plan_step_id
end
|
#run_step_id ⇒ Object
Returns the value of attribute run_step_id.
107
108
109
|
# File 'lib/dynflow/action.rb', line 107
def run_step_id
@run_step_id
end
|
#world ⇒ Object
Returns the value of attribute world.
107
108
109
|
# File 'lib/dynflow/action.rb', line 107
def world
@world
end
|
Class Method Details
.all_children ⇒ Object
31
32
33
34
35
|
# File 'lib/dynflow/action.rb', line 31
def self.all_children
children.values.inject(children.values) do |children, child|
children + child.all_children
end
end
|
.children ⇒ Object
43
44
45
|
# File 'lib/dynflow/action.rb', line 43
def self.children
@children ||= {}
end
|
.constantize(action_name) ⇒ Object
101
102
103
104
105
|
# File 'lib/dynflow/action.rb', line 101
def self.constantize(action_name)
super action_name
rescue NameError
Action::Missing.generate(action_name)
end
|
.execution_plan_hooks ⇒ Object
.inherit_execution_plan_hooks(hooks) ⇒ Object
55
56
57
|
# File 'lib/dynflow/action.rb', line 55
def self.inherit_execution_plan_hooks(hooks)
@execution_plan_hooks = hooks
end
|
.inherited(child) ⇒ Object
37
38
39
40
41
|
# File 'lib/dynflow/action.rb', line 37
def self.inherited(child)
children[child.name] = child
child.inherit_execution_plan_hooks(execution_plan_hooks.dup)
super child
end
|
.middleware ⇒ Object
47
48
49
|
# File 'lib/dynflow/action.rb', line 47
def self.middleware
@middleware ||= Middleware::Register.new
end
|
.subscribe ⇒ nil, Class
FIND define subscriptions in world independent on action’s classes,
limited only by in/output formats
62
63
64
|
# File 'lib/dynflow/action.rb', line 62
def self.subscribe
nil
end
|
Instance Method Details
#action_logger ⇒ Object
225
226
227
|
# File 'lib/dynflow/action.rb', line 225
def action_logger
world.action_logger
end
|
#all_planned_actions(filter_class = Action) ⇒ Array<Action>
returned actions are in Present phase
248
249
250
251
252
253
|
# File 'lib/dynflow/action.rb', line 248
def all_planned_actions(filter_class = Action)
phase! Present
mine = planned_actions
(mine + mine.reduce([]) { |arr, action| arr + action.all_planned_actions })
.select { |a| a.is_a?(filter_class) }
end
|
#caller_action ⇒ Object
189
190
191
192
193
194
195
196
197
198
199
200
|
# File 'lib/dynflow/action.rb', line 189
def caller_action
phase! Present
return nil if @caller_action_id
return @caller_action if @caller_action
caller_execution_plan = if @caller_execution_plan_id.nil?
execution_plan
else
world.persistence.load_execution_plan(@caller_execution_plan_id)
end
@caller_action = world.persistence.load_action_for_presentation(caller_execution_plan, @caller_action_id)
end
|
#delayed_events ⇒ Object
359
360
361
|
# File 'lib/dynflow/action.rb', line 359
def delayed_events
@delayed_events ||= []
end
|
#drop_output_chunks! ⇒ Object
183
184
185
186
187
|
# File 'lib/dynflow/action.rb', line 183
def drop_output_chunks!
@pending_output_chunks = []
@output_chunks = []
world.persistence.delete_output_chunks(@execution_plan_id, @id)
end
|
#error ⇒ Object
297
298
299
300
|
# File 'lib/dynflow/action.rb', line 297
def error
raise "error data not available" if @step.nil?
@step.error
end
|
#execute(*args) ⇒ Object
302
303
304
305
|
# File 'lib/dynflow/action.rb', line 302
def execute(*args)
phase! Executable
self.send phase.execute_method_name, *args
end
|
#execute_delay(delay_options, *args) ⇒ Object
327
328
329
330
331
332
333
334
335
|
# File 'lib/dynflow/action.rb', line 327
def execute_delay(delay_options, *args)
with_error_handling(true) do
world.middleware.execute(:delay, self, delay_options, *args) do |*new_args|
@serializer = delay(*new_args).tap do |serializer|
serializer.perform_serialization!
end
end
end
end
|
#execution_plan ⇒ Object
220
221
222
223
|
# File 'lib/dynflow/action.rb', line 220
def execution_plan
phase! Plan, Present
@execution_plan
end
|
#finalize_step ⇒ Object
260
261
262
263
|
# File 'lib/dynflow/action.rb', line 260
def finalize_step
phase! Present
execution_plan.steps.fetch(finalize_step_id) if finalize_step_id
end
|
#from_subscription? ⇒ Boolean
215
216
217
218
|
# File 'lib/dynflow/action.rb', line 215
def from_subscription?
phase! Plan
@from_subscription
end
|
#holds_singleton_lock? ⇒ Boolean
342
343
344
|
# File 'lib/dynflow/action.rb', line 342
def holds_singleton_lock?
false
end
|
#humanized_state ⇒ Object
action: used in Dynflow console
293
294
295
|
# File 'lib/dynflow/action.rb', line 293
def humanized_state
state.to_s
end
|
#label ⇒ Object
150
151
152
|
# File 'lib/dynflow/action.rb', line 150
def label
self.class.name
end
|
#output ⇒ Object
166
167
168
169
170
171
172
173
|
# File 'lib/dynflow/action.rb', line 166
def output
if phase? Plan
@output_reference or
raise 'plan_self has to be invoked before being able to reference the output'
else
@output
end
end
|
#output=(hash) ⇒ Object
160
161
162
163
164
|
# File 'lib/dynflow/action.rb', line 160
def output=(hash)
Type! hash, Hash
phase! Run
@output = Utils.indifferent_hash(hash)
end
|
#output_chunk(chunk, kind: nil, timestamp: Time.now) ⇒ Object
175
176
177
|
# File 'lib/dynflow/action.rb', line 175
def output_chunk(chunk, kind: nil, timestamp: Time.now)
@pending_output_chunks << { chunk: chunk, kind: kind, timestamp: timestamp }
end
|
#phase!(*phases) ⇒ Object
145
146
147
148
|
# File 'lib/dynflow/action.rb', line 145
def phase!(*phases)
phase?(*phases) or
raise TypeError, "Wrong phase #{phase}, required #{phases}"
end
|
#phase?(*phases) ⇒ Boolean
141
142
143
|
# File 'lib/dynflow/action.rb', line 141
def phase?(*phases)
Match? phase, *phases
end
|
#plan_event(event, time = nil, execution_plan_id: self.execution_plan_id, step_id: self.run_step_id, optional: false) ⇒ Object
Plan an event
to be send to the action defined by action
, what defaults to be self. if time
is not passed, event is sent as soon as possible.
354
355
356
357
|
# File 'lib/dynflow/action.rb', line 354
def plan_event(event, time = nil, execution_plan_id: self.execution_plan_id, step_id: self.run_step_id, optional: false)
time = @world.clock.current_time + time if time.is_a?(Numeric)
delayed_events << DelayedEvent[execution_plan_id, step_id, event, time, optional]
end
|
#plan_step ⇒ Object
229
230
231
232
|
# File 'lib/dynflow/action.rb', line 229
def plan_step
phase! Present
execution_plan.steps.fetch(plan_step_id)
end
|
#planned_actions(filter = Action) ⇒ Array<Action>
returned actions are in Present phase
237
238
239
240
241
242
243
|
# File 'lib/dynflow/action.rb', line 237
def planned_actions(filter = Action)
phase! Present
plan_step
.planned_steps(execution_plan)
.map { |s| s.action(execution_plan) }
.select { |a| a.is_a?(filter) }
end
|
#queue ⇒ Object
queue defined here will also be used as the default queue for all the steps planned under this action, unless overrided by sub-action
349
350
|
# File 'lib/dynflow/action.rb', line 349
def queue
end
|
#required_step_ids(input = self.input) ⇒ Array<Integer>
This method is part of a private API.
You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns - ids of steps referenced from action.
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
|
# File 'lib/dynflow/action.rb', line 309
def required_step_ids(input = self.input)
results = []
recursion = ->value do
case value
when Hash
value.values.each { |v| recursion.(v) }
when Array
value.each { |v| recursion.(v) }
when ExecutionPlan::OutputReference
results << value.step_id
else
end
results
end
recursion.(input)
end
|
#run_step ⇒ Object
255
256
257
258
|
# File 'lib/dynflow/action.rb', line 255
def run_step
phase! Present
execution_plan.steps.fetch(run_step_id) if run_step_id
end
|
#serializer ⇒ Object
337
338
339
340
|
# File 'lib/dynflow/action.rb', line 337
def serializer
raise "The action must be delayed in order to access the serializer" if @serializer.nil?
@serializer
end
|
#set_plan_context(execution_plan, triggering_action, from_subscription) ⇒ Object
202
203
204
205
206
207
|
# File 'lib/dynflow/action.rb', line 202
def set_plan_context(execution_plan, triggering_action, from_subscription)
phase! Plan
@execution_plan = Type! execution_plan, ExecutionPlan
@triggering_action = Type! triggering_action, Action, NilClass
@from_subscription = Type! from_subscription, TrueClass, FalseClass
end
|
#state ⇒ Object
286
287
288
289
|
# File 'lib/dynflow/action.rb', line 286
def state
raise "state data not available" if @step.nil?
@step.state
end
|
#steps ⇒ Object
265
266
267
|
# File 'lib/dynflow/action.rb', line 265
def steps
[plan_step, run_step, finalize_step]
end
|
#stored_output_chunks ⇒ Object
179
180
181
|
# File 'lib/dynflow/action.rb', line 179
def stored_output_chunks
@output_chunks ||= world.persistence.load_output_chunks(@execution_plan_id, @id)
end
|
#to_hash ⇒ Object
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
|
# File 'lib/dynflow/action.rb', line 269
def to_hash
recursive_to_hash(
{ class: self.class.name,
execution_plan_id: execution_plan_id,
id: id,
plan_step_id: plan_step_id,
run_step_id: run_step_id,
finalize_step_id: finalize_step_id,
caller_execution_plan_id: caller_execution_plan_id,
caller_action_id: caller_action_id,
input: input },
if phase? Run, Finalize, Present
{ output: output }
end
)
end
|
#triggering_action ⇒ Object
action that caused this action to be planned. Available only in planning phase
210
211
212
213
|
# File 'lib/dynflow/action.rb', line 210
def triggering_action
phase! Plan
@triggering_action
end
|