Class: Dynflow::Action

Inherits:
Serializable show all
Extended by:
Format
Includes:
Algebrick::Matching, Algebrick::TypeCheck, Progress, Rescue
Defined in:
lib/dynflow/action.rb,
lib/dynflow/action/v2.rb,
lib/dynflow/action/singleton.rb

Overview

rubocop:disable Metrics/ClassLength

Defined Under Namespace

Modules: Cancellable, Executable, Format, Phase, Polling, Progress, Rescue, Singleton, Timeouts, V2, WithBulkSubPlans, WithPollingSubPlans, WithSubPlans Classes: Missing, Suspended

Constant Summary collapse

OutputReference =
ExecutionPlan::OutputReference
ERROR =
Object.new
SUSPEND =
Object.new
Skip =
Algebrick.atom
DelayedEvent =
Algebrick.type do
  fields! execution_plan_id: String,
          step_id:           Integer,
          event:             Object,
          time:              type { variants Time, NilClass },
          optional:          Algebrick::Types::Boolean
end

Constants included from Rescue

Rescue::Strategy, Rescue::SuggestedStrategy

Constants inherited from Serializable

Serializable::LEGACY_TIME_FORMAT, Serializable::TIME_FORMAT

Instance Attribute Summary collapse

Attributes included from Progress

#calculated_progress

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Format

input_format, output_format

Methods included from Rescue

#combine_suggested_strategies, #rescue_strategy, #rescue_strategy_for_planned_action, #rescue_strategy_for_self

Methods included from Progress

#finalize_progress, #finalize_progress_weight, #run_progress, #run_progress_weight

Methods inherited from Serializable

from_hash

Constructor Details

#initialize(attributes, world) ⇒ Action

Returns a new instance of Action.

Raises:

  • (ArgumentError)


114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/dynflow/action.rb', line 114

def initialize(attributes, world)
  Type! attributes, Hash

  @phase             = Type! attributes.fetch(:phase), Phase
  @world             = Type! world, World
  @step              = Type! attributes.fetch(:step, nil), ExecutionPlan::Steps::Abstract, NilClass
  raise ArgumentError, 'Step reference missing' if phase?(Executable) && @step.nil?
  @execution_plan_id = Type! attributes.fetch(:execution_plan_id), String
  @id                = Type! attributes.fetch(:id), Integer
  @plan_step_id      = Type! attributes.fetch(:plan_step_id), Integer
  @run_step_id       = Type! attributes.fetch(:run_step_id), Integer, NilClass
  @finalize_step_id  = Type! attributes.fetch(:finalize_step_id), Integer, NilClass

  @execution_plan = Type!(attributes.fetch(:execution_plan), ExecutionPlan) if phase? Present

  @caller_execution_plan_id  = Type!(attributes.fetch(:caller_execution_plan_id, nil), String, NilClass)
  @caller_action_id          = Type!(attributes.fetch(:caller_action_id, nil), Integer, NilClass)

  getter = ->key, required do
    required ? attributes.fetch(key) : attributes.fetch(key, {})
  end

  @input  = OutputReference.deserialize getter.(:input, phase?(Run, Finalize, Present))
  @output = OutputReference.deserialize getter.(:output, false) if phase? Run, Finalize, Present
  @pending_output_chunks = [] if phase? Run, Finalize
end

Instance Attribute Details

#caller_action_idObject (readonly)

Returns the value of attribute caller_action_id.



107
108
109
# File 'lib/dynflow/action.rb', line 107

def caller_action_id
  @caller_action_id
end

#caller_execution_plan_idObject (readonly)

Returns the value of attribute caller_execution_plan_id.



107
108
109
# File 'lib/dynflow/action.rb', line 107

def caller_execution_plan_id
  @caller_execution_plan_id
end

#execution_plan_idObject (readonly)

Returns the value of attribute execution_plan_id.



107
108
109
# File 'lib/dynflow/action.rb', line 107

def execution_plan_id
  @execution_plan_id
end

#finalize_step_idObject (readonly)

Returns the value of attribute finalize_step_id.



107
108
109
# File 'lib/dynflow/action.rb', line 107

def finalize_step_id
  @finalize_step_id
end

#idObject (readonly)

Returns the value of attribute id.



107
108
109
# File 'lib/dynflow/action.rb', line 107

def id
  @id
end

#inputObject

Returns the value of attribute input.



107
108
109
# File 'lib/dynflow/action.rb', line 107

def input
  @input
end

#pending_output_chunksObject (readonly)

Returns the value of attribute pending_output_chunks.



107
108
109
# File 'lib/dynflow/action.rb', line 107

def pending_output_chunks
  @pending_output_chunks
end

#phaseObject (readonly)

Returns the value of attribute phase.



107
108
109
# File 'lib/dynflow/action.rb', line 107

def phase
  @phase
end

#plan_step_idObject (readonly)

Returns the value of attribute plan_step_id.



107
108
109
# File 'lib/dynflow/action.rb', line 107

def plan_step_id
  @plan_step_id
end

#run_step_idObject (readonly)

Returns the value of attribute run_step_id.



107
108
109
# File 'lib/dynflow/action.rb', line 107

def run_step_id
  @run_step_id
end

#worldObject (readonly)

Returns the value of attribute world.



107
108
109
# File 'lib/dynflow/action.rb', line 107

def world
  @world
end

Class Method Details

.all_childrenObject



31
32
33
34
35
# File 'lib/dynflow/action.rb', line 31

def self.all_children
  children.values.inject(children.values) do |children, child|
    children + child.all_children
  end
end

.childrenObject



43
44
45
# File 'lib/dynflow/action.rb', line 43

def self.children
  @children ||= {}
end

.constantize(action_name) ⇒ Object



101
102
103
104
105
# File 'lib/dynflow/action.rb', line 101

def self.constantize(action_name)
  super action_name
rescue NameError
  Action::Missing.generate(action_name)
end

.execution_plan_hooksObject



51
52
53
# File 'lib/dynflow/action.rb', line 51

def self.execution_plan_hooks
  @execution_plan_hooks ||= ExecutionPlan::Hooks::Register.new
end

.inherit_execution_plan_hooks(hooks) ⇒ Object



55
56
57
# File 'lib/dynflow/action.rb', line 55

def self.inherit_execution_plan_hooks(hooks)
  @execution_plan_hooks = hooks
end

.inherited(child) ⇒ Object



37
38
39
40
41
# File 'lib/dynflow/action.rb', line 37

def self.inherited(child)
  children[child.name] = child
  child.inherit_execution_plan_hooks(execution_plan_hooks.dup)
  super child
end

.middlewareObject



47
48
49
# File 'lib/dynflow/action.rb', line 47

def self.middleware
  @middleware ||= Middleware::Register.new
end

.subscribenil, Class

FIND define subscriptions in world independent on action’s classes,

limited only by in/output formats

Returns:

  • (nil, Class)

    a child of Action



62
63
64
# File 'lib/dynflow/action.rb', line 62

def self.subscribe
  nil
end

Instance Method Details

#action_loggerObject



225
226
227
# File 'lib/dynflow/action.rb', line 225

def action_logger
  world.action_logger
end

#all_planned_actions(filter_class = Action) ⇒ Array<Action>

returned actions are in Present phase

Parameters:

  • filter_class (Class) (defaults to: Action)

    return only actions which are kind of ‘filter_class`

Returns:

  • (Array<Action>)

    of all (including indirectly) planned actions by this action,



248
249
250
251
252
253
# File 'lib/dynflow/action.rb', line 248

def all_planned_actions(filter_class = Action)
  phase! Present
  mine = planned_actions
  (mine + mine.reduce([]) { |arr, action| arr + action.all_planned_actions })
    .select { |a| a.is_a?(filter_class) }
end

#caller_actionObject



189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/dynflow/action.rb', line 189

def caller_action
  phase! Present
  return nil if @caller_action_id
  return @caller_action if @caller_action

  caller_execution_plan = if @caller_execution_plan_id.nil?
                            execution_plan
                          else
                            world.persistence.load_execution_plan(@caller_execution_plan_id)
                          end
  @caller_action = world.persistence.load_action_for_presentation(caller_execution_plan, @caller_action_id)
end

#delayed_eventsObject



359
360
361
# File 'lib/dynflow/action.rb', line 359

def delayed_events
  @delayed_events ||= []
end

#drop_output_chunks!Object



183
184
185
186
187
# File 'lib/dynflow/action.rb', line 183

def drop_output_chunks!
  @pending_output_chunks = []
  @output_chunks = []
  world.persistence.delete_output_chunks(@execution_plan_id, @id)
end

#errorObject



297
298
299
300
# File 'lib/dynflow/action.rb', line 297

def error
  raise "error data not available" if @step.nil?
  @step.error
end

#execute(*args) ⇒ Object



302
303
304
305
# File 'lib/dynflow/action.rb', line 302

def execute(*args)
  phase! Executable
  self.send phase.execute_method_name, *args
end

#execute_delay(delay_options, *args) ⇒ Object



327
328
329
330
331
332
333
334
335
# File 'lib/dynflow/action.rb', line 327

def execute_delay(delay_options, *args)
  with_error_handling(true) do
    world.middleware.execute(:delay, self, delay_options, *args) do |*new_args|
      @serializer = delay(*new_args).tap do |serializer|
        serializer.perform_serialization!
      end
    end
  end
end

#execution_planObject



220
221
222
223
# File 'lib/dynflow/action.rb', line 220

def execution_plan
  phase! Plan, Present
  @execution_plan
end

#finalize_stepObject



260
261
262
263
# File 'lib/dynflow/action.rb', line 260

def finalize_step
  phase! Present
  execution_plan.steps.fetch(finalize_step_id) if finalize_step_id
end

#from_subscription?Boolean

Returns:

  • (Boolean)


215
216
217
218
# File 'lib/dynflow/action.rb', line 215

def from_subscription?
  phase! Plan
  @from_subscription
end

#holds_singleton_lock?Boolean

Returns:

  • (Boolean)


342
343
344
# File 'lib/dynflow/action.rb', line 342

def holds_singleton_lock?
  false
end

#humanized_stateObject

action: used in Dynflow console



293
294
295
# File 'lib/dynflow/action.rb', line 293

def humanized_state
  state.to_s
end

#labelObject



150
151
152
# File 'lib/dynflow/action.rb', line 150

def label
  self.class.name
end

#outputObject



166
167
168
169
170
171
172
173
# File 'lib/dynflow/action.rb', line 166

def output
  if phase? Plan
    @output_reference or
      raise 'plan_self has to be invoked before being able to reference the output'
  else
    @output
  end
end

#output=(hash) ⇒ Object



160
161
162
163
164
# File 'lib/dynflow/action.rb', line 160

def output=(hash)
  Type! hash, Hash
  phase! Run
  @output = Utils.indifferent_hash(hash)
end

#output_chunk(chunk, kind: nil, timestamp: Time.now) ⇒ Object



175
176
177
# File 'lib/dynflow/action.rb', line 175

def output_chunk(chunk, kind: nil, timestamp: Time.now)
  @pending_output_chunks << { chunk: chunk, kind: kind, timestamp: timestamp }
end

#phase!(*phases) ⇒ Object



145
146
147
148
# File 'lib/dynflow/action.rb', line 145

def phase!(*phases)
  phase?(*phases) or
    raise TypeError, "Wrong phase #{phase}, required #{phases}"
end

#phase?(*phases) ⇒ Boolean

Returns:

  • (Boolean)


141
142
143
# File 'lib/dynflow/action.rb', line 141

def phase?(*phases)
  Match? phase, *phases
end

#plan_event(event, time = nil, execution_plan_id: self.execution_plan_id, step_id: self.run_step_id, optional: false) ⇒ Object

Plan an event to be send to the action defined by action, what defaults to be self. if time is not passed, event is sent as soon as possible.



354
355
356
357
# File 'lib/dynflow/action.rb', line 354

def plan_event(event, time = nil, execution_plan_id: self.execution_plan_id, step_id: self.run_step_id, optional: false)
  time = @world.clock.current_time + time if time.is_a?(Numeric)
  delayed_events << DelayedEvent[execution_plan_id, step_id, event, time, optional]
end

#plan_stepObject



229
230
231
232
# File 'lib/dynflow/action.rb', line 229

def plan_step
  phase! Present
  execution_plan.steps.fetch(plan_step_id)
end

#planned_actions(filter = Action) ⇒ Array<Action>

returned actions are in Present phase

Parameters:

  • filter_class (Class)

    return only actions which are kind of ‘filter_class`

Returns:

  • (Array<Action>)

    of directly planned actions by this action,



237
238
239
240
241
242
243
# File 'lib/dynflow/action.rb', line 237

def planned_actions(filter = Action)
  phase! Present
  plan_step
    .planned_steps(execution_plan)
    .map { |s| s.action(execution_plan) }
    .select { |a| a.is_a?(filter) }
end

#queueObject

queue defined here will also be used as the default queue for all the steps planned under this action, unless overrided by sub-action



349
350
# File 'lib/dynflow/action.rb', line 349

def queue
end

#required_step_ids(input = self.input) ⇒ Array<Integer>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns - ids of steps referenced from action.

Returns:

  • (Array<Integer>)
    • ids of steps referenced from action



309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/dynflow/action.rb', line 309

def required_step_ids(input = self.input)
  results   = []
  recursion = ->value do
    case value
    when Hash
      value.values.each { |v| recursion.(v) }
    when Array
      value.each { |v| recursion.(v) }
    when ExecutionPlan::OutputReference
      results << value.step_id
    else
      # no reference hidden in this arg
    end
    results
  end
  recursion.(input)
end

#run_stepObject



255
256
257
258
# File 'lib/dynflow/action.rb', line 255

def run_step
  phase! Present
  execution_plan.steps.fetch(run_step_id) if run_step_id
end

#serializerObject



337
338
339
340
# File 'lib/dynflow/action.rb', line 337

def serializer
  raise "The action must be delayed in order to access the serializer" if @serializer.nil?
  @serializer
end

#set_plan_context(execution_plan, triggering_action, from_subscription) ⇒ Object



202
203
204
205
206
207
# File 'lib/dynflow/action.rb', line 202

def set_plan_context(execution_plan, triggering_action, from_subscription)
  phase! Plan
  @execution_plan    = Type! execution_plan, ExecutionPlan
  @triggering_action = Type! triggering_action, Action, NilClass
  @from_subscription = Type! from_subscription, TrueClass, FalseClass
end

#stateObject



286
287
288
289
# File 'lib/dynflow/action.rb', line 286

def state
  raise "state data not available" if @step.nil?
  @step.state
end

#stepsObject



265
266
267
# File 'lib/dynflow/action.rb', line 265

def steps
  [plan_step, run_step, finalize_step]
end

#stored_output_chunksObject



179
180
181
# File 'lib/dynflow/action.rb', line 179

def stored_output_chunks
  @output_chunks ||= world.persistence.load_output_chunks(@execution_plan_id, @id)
end

#to_hashObject



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/dynflow/action.rb', line 269

def to_hash
  recursive_to_hash(
    { class:                     self.class.name,
      execution_plan_id:         execution_plan_id,
      id:                        id,
      plan_step_id:              plan_step_id,
      run_step_id:               run_step_id,
      finalize_step_id:          finalize_step_id,
      caller_execution_plan_id:  caller_execution_plan_id,
      caller_action_id:          caller_action_id,
      input:                     input },
    if phase? Run, Finalize, Present
      { output: output }
    end
  )
end

#triggering_actionObject

action that caused this action to be planned. Available only in planning phase



210
211
212
213
# File 'lib/dynflow/action.rb', line 210

def triggering_action
  phase! Plan
  @triggering_action
end