Class: Dynflow::World

Inherits:
Object
  • Object
show all
Includes:
Algebrick::Matching, Algebrick::TypeCheck, Invalidation
Defined in:
lib/dynflow/world.rb,
lib/dynflow/world/invalidation.rb

Overview

rubocop:disable Metrics/ClassLength

Direct Known Subclasses

Testing::InThreadWorld

Defined Under Namespace

Modules: Invalidation, TriggerResult, Triggered

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Invalidation

#invalidate, #invalidate_execution_lock, #invalidate_planning_lock, #locks_validity_check, #perform_validity_checks, #prune_execution_inhibition_locks!, #with_valid_execution_plan_for_lock, #worlds_validity_check

Constructor Details

#initialize(config) ⇒ World

Returns a new instance of World.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/dynflow/world.rb', line 19

def initialize(config)
  @config = Config::ForWorld.new(config, self)

  # Set the telemetry instance as soon as possible
  Dynflow::Telemetry.set_adapter @config.telemetry_adapter
  Dynflow::Telemetry.register_metrics!

  @id                     = SecureRandom.uuid
  @logger_adapter         = @config.logger_adapter
  @clock                  = spawn_and_wait(Clock, 'clock', logger)
  @config.validate
  @transaction_adapter    = @config.transaction_adapter
  @persistence            = Persistence.new(self, @config.persistence_adapter,
    :backup_deleted_plans => @config.backup_deleted_plans,
    :backup_dir => @config.backup_dir)
  @coordinator            = Coordinator.new(@config.coordinator_adapter)
  if @config.executor
    @executor = Executors::Parallel.new(self,
      executor_class: @config.executor,
      heartbeat_interval: @config.executor_heartbeat_interval,
      queues_options: @config.queues)
  end
  @action_classes         = @config.action_classes
  @auto_rescue            = @config.auto_rescue
  @exit_on_terminate      = Concurrent::AtomicBoolean.new(@config.exit_on_terminate)
  @connector              = @config.connector
  @middleware             = Middleware::World.new
  @middleware.use Middleware::Common::Transaction if @transaction_adapter
  @client_dispatcher      = spawn_and_wait(Dispatcher::ClientDispatcher, "client-dispatcher", self, @config.ping_cache_age)
  @dead_letter_handler    = spawn_and_wait(DeadLetterSilencer, 'default_dead_letter_handler', @config.silent_dead_letter_matchers)
  @auto_validity_check    = @config.auto_validity_check
  @validity_check_timeout = @config.validity_check_timeout
  @throttle_limiter       = @config.throttle_limiter
  @terminated             = Concurrent::Promises.resolvable_event
  @termination_timeout    = @config.termination_timeout
  calculate_subscription_index

  if executor
    @executor_dispatcher = spawn_and_wait(Dispatcher::ExecutorDispatcher, "executor-dispatcher", self, @config.executor_semaphore)
    executor.initialized.wait
  end
  update_register
  perform_validity_checks if auto_validity_check

  @termination_barrier = Mutex.new
  @before_termination_hooks = Queue.new

  if @config.auto_terminate
    at_exit do
      @exit_on_terminate.make_false # make sure we don't terminate twice
      self.terminate.wait
    end
  end
  post_initialization
end

Instance Attribute Details

#action_classesObject (readonly)

Returns the value of attribute action_classes.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def action_classes
  @action_classes
end

#auto_rescueObject (readonly)

Returns the value of attribute auto_rescue.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def auto_rescue
  @auto_rescue
end

#auto_validity_checkObject (readonly)

Returns the value of attribute auto_validity_check.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def auto_validity_check
  @auto_validity_check
end

#client_dispatcherObject (readonly)

Returns the value of attribute client_dispatcher.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def client_dispatcher
  @client_dispatcher
end

#clockObject (readonly)

Returns the value of attribute clock.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def clock
  @clock
end

#configObject (readonly)

Returns the value of attribute config.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def config
  @config
end

#connectorObject (readonly)

Returns the value of attribute connector.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def connector
  @connector
end

#coordinatorObject (readonly)

Returns the value of attribute coordinator.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def coordinator
  @coordinator
end

#dead_letter_handlerObject (readonly)

Returns the value of attribute dead_letter_handler.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def dead_letter_handler
  @dead_letter_handler
end

#delayed_executorObject (readonly)

Returns the value of attribute delayed_executor.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def delayed_executor
  @delayed_executor
end

#execution_plan_cleanerObject (readonly)

Returns the value of attribute execution_plan_cleaner.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def execution_plan_cleaner
  @execution_plan_cleaner
end

#executorObject (readonly)

Returns the value of attribute executor.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def executor
  @executor
end

#executor_dispatcherObject (readonly)

Returns the value of attribute executor_dispatcher.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def executor_dispatcher
  @executor_dispatcher
end

#idObject (readonly)

Returns the value of attribute id.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def id
  @id
end

#logger_adapterObject (readonly)

Returns the value of attribute logger_adapter.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def logger_adapter
  @logger_adapter
end

#metaObject (readonly)

Returns the value of attribute meta.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def meta
  @meta
end

#middlewareObject (readonly)

Returns the value of attribute middleware.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def middleware
  @middleware
end

#persistenceObject (readonly)

Returns the value of attribute persistence.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def persistence
  @persistence
end

#subscription_indexObject (readonly)

Returns the value of attribute subscription_index.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def subscription_index
  @subscription_index
end

#terminatedObject (readonly)

Returns the value of attribute terminated.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def terminated
  @terminated
end

#termination_timeoutObject (readonly)

Returns the value of attribute termination_timeout.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def termination_timeout
  @termination_timeout
end

#throttle_limiterObject (readonly)

Returns the value of attribute throttle_limiter.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def throttle_limiter
  @throttle_limiter
end

#transaction_adapterObject (readonly)

Returns the value of attribute transaction_adapter.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def transaction_adapter
  @transaction_adapter
end

#validity_check_timeoutObject (readonly)

Returns the value of attribute validity_check_timeout.



13
14
15
# File 'lib/dynflow/world.rb', line 13

def validity_check_timeout
  @validity_check_timeout
end

Instance Method Details

#action_loggerObject



116
117
118
# File 'lib/dynflow/world.rb', line 116

def action_logger
  logger_adapter.action_logger
end

#auto_executeObject

24119 - ensure delayed executor is preserved after invalidation executes plans that are planned/paused and haven’t reported any error yet (usually when no executor was available by the time of planning or terminating)



285
286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'lib/dynflow/world.rb', line 285

def auto_execute
  coordinator.acquire(Coordinator::AutoExecuteLock.new(self)) do
    planned_execution_plans =
      self.persistence.find_execution_plans filters: { 'state' => %w(planned paused), 'result' => (ExecutionPlan.results - [:error]).map(&:to_s) }
    planned_execution_plans.map do |ep|
      if coordinator.find_locks(Dynflow::Coordinator::ExecutionLock.unique_filter(ep.id)).empty?
        execute(ep.id)
      end
    end.compact
  end
rescue Coordinator::LockError => e
  logger.info "auto-executor lock already aquired: #{e.message}"
  []
end

#before_termination(&block) ⇒ Object



86
87
88
# File 'lib/dynflow/world.rb', line 86

def before_termination(&block)
  @before_termination_hooks << block
end

#delay(action_class, delay_options, *args) ⇒ Object



194
195
196
# File 'lib/dynflow/world.rb', line 194

def delay(action_class, delay_options, *args)
  delay_with_options(action_class: action_class, args: args, delay_options: delay_options)
end

#delay_with_options(action_class:, args:, delay_options:, id: nil, caller_action: nil) ⇒ Object



198
199
200
201
202
203
# File 'lib/dynflow/world.rb', line 198

def delay_with_options(action_class:, args:, delay_options:, id: nil, caller_action: nil)
  raise 'No action_class given' if action_class.nil?
  execution_plan = ExecutionPlan.new(self, id)
  execution_plan.delay(caller_action, action_class, delay_options, *args)
  Scheduled[execution_plan.id]
end

#event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object



232
233
234
# File 'lib/dynflow/world.rb', line 232

def event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future, optional: false)
  publish_request(Dispatcher::Event[execution_plan_id, step_id, event, nil, optional], done, false)
end

#execute(execution_plan_id, done = Concurrent::Promises.resolvable_future) ⇒ Concurrent::Promises::ResolvableFuture

raises when ExecutionPlan is not accepted for execution

Returns:

  • (Concurrent::Promises::ResolvableFuture)

    containing execution_plan when finished



228
229
230
# File 'lib/dynflow/world.rb', line 228

def execute(execution_plan_id, done = Concurrent::Promises.resolvable_future)
  publish_request(Dispatcher::Execution[execution_plan_id], done, true)
end

#get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object



252
253
254
# File 'lib/dynflow/world.rb', line 252

def get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent::Promises.resolvable_future)
  publish_request(Dispatcher::Status[world_id, execution_plan_id], done, false, timeout)
end

#halt(execution_plan_id, accepted = Concurrent::Promises.resolvable_future) ⇒ Object



256
257
258
259
# File 'lib/dynflow/world.rb', line 256

def halt(execution_plan_id, accepted = Concurrent::Promises.resolvable_future)
  coordinator.acquire(Coordinator::ExecutionInhibitionLock.new(execution_plan_id))
  publish_request(Dispatcher::Halt[execution_plan_id], accepted, false)
end

#loggerObject



112
113
114
# File 'lib/dynflow/world.rb', line 112

def logger
  logger_adapter.dynflow_logger
end

#ping(world_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object



244
245
246
# File 'lib/dynflow/world.rb', line 244

def ping(world_id, timeout, done = Concurrent::Promises.resolvable_future)
  publish_request(Dispatcher::Ping[world_id, true], done, false, timeout)
end

#ping_without_cache(world_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object



248
249
250
# File 'lib/dynflow/world.rb', line 248

def ping_without_cache(world_id, timeout, done = Concurrent::Promises.resolvable_future)
  publish_request(Dispatcher::Ping[world_id, false], done, false, timeout)
end

#plan(action_class, *args) ⇒ Object



213
214
215
# File 'lib/dynflow/world.rb', line 213

def plan(action_class, *args)
  plan_with_options(action_class: action_class, args: args)
end

#plan_elsewhere(action_class, *args) ⇒ Object



205
206
207
208
209
210
211
# File 'lib/dynflow/world.rb', line 205

def plan_elsewhere(action_class, *args)
  execution_plan = ExecutionPlan.new(self, nil)
  execution_plan.delay(nil, action_class, {}, *args)
  plan_request(execution_plan.id)

  Scheduled[execution_plan.id]
end

#plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object



236
237
238
# File 'lib/dynflow/world.rb', line 236

def plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future, optional: false)
  publish_request(Dispatcher::Event[execution_plan_id, step_id, event, time, optional], accepted, false)
end

#plan_request(execution_plan_id, done = Concurrent::Promises.resolvable_future) ⇒ Object



240
241
242
# File 'lib/dynflow/world.rb', line 240

def plan_request(execution_plan_id, done = Concurrent::Promises.resolvable_future)
  publish_request(Dispatcher::Planning[execution_plan_id], done, false)
end

#plan_with_options(action_class:, args:, id: nil, caller_action: nil) ⇒ Object



217
218
219
220
221
222
223
224
# File 'lib/dynflow/world.rb', line 217

def plan_with_options(action_class:, args:, id: nil, caller_action: nil)
  ExecutionPlan.new(self, id).tap do |execution_plan|
    coordinator.acquire(Coordinator::PlanningLock.new(self, execution_plan.id)) do
      execution_plan.prepare(action_class, caller_action: caller_action)
      execution_plan.plan(*args)
    end
  end
end

#post_initializationObject

performs steps once the executor is ready and invalidation of previous worls is finished. Needs to be indempotent, as it can be called several times (expecially when auto_validity_check if false, as it should be called after ‘perform_validity_checks` method)



78
79
80
81
82
83
84
# File 'lib/dynflow/world.rb', line 78

def post_initialization
  @delayed_executor ||= try_spawn(:delayed_executor, Coordinator::DelayedExecutorLock)
  @execution_plan_cleaner ||= try_spawn(:execution_plan_cleaner, Coordinator::ExecutionPlanCleanerLock)
  update_register
  @delayed_executor.start if auto_validity_check && @delayed_executor && !@delayed_executor.started?
  self.auto_execute if @config.auto_execute
end

#publish_request(request, done, wait_for_accepted, timeout = nil) ⇒ Object



261
262
263
264
265
266
267
268
269
270
271
# File 'lib/dynflow/world.rb', line 261

def publish_request(request, done, wait_for_accepted, timeout = nil)
  accepted = Concurrent::Promises.resolvable_future
  accepted.rescue do |reason|
    done.reject reason if reason
  end
  client_dispatcher.ask([:publish_request, done, request, timeout], accepted)
  accepted.wait if wait_for_accepted
  done
rescue => e
  accepted.reject e
end

#registered_worldObject



104
105
106
107
108
109
110
# File 'lib/dynflow/world.rb', line 104

def registered_world
  if executor
    Coordinator::ExecutorWorld.new(self)
  else
    Coordinator::ClientWorld.new(self)
  end
end

#reload!Object

reload actions classes, intended only for devel



125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/dynflow/world.rb', line 125

def reload!
  # TODO what happens with newly loaded classes
  @action_classes = @action_classes.map do |klass|
    begin
      Utils.constantize(klass.to_s)
    rescue NameError
      nil # ignore missing classes
    end
  end.compact
  middleware.clear_cache!
  calculate_subscription_index
end

#subscribed_actions(action_class) ⇒ Object



120
121
122
# File 'lib/dynflow/world.rb', line 120

def subscribed_actions(action_class)
  @subscription_index.has_key?(action_class) ? @subscription_index[action_class] : []
end

#terminate(future = Concurrent::Promises.resolvable_future) ⇒ Object



273
274
275
276
# File 'lib/dynflow/world.rb', line 273

def terminate(future = Concurrent::Promises.resolvable_future)
  start_termination.tangle(future)
  future
end

#terminating?Boolean

Returns:

  • (Boolean)


278
279
280
# File 'lib/dynflow/world.rb', line 278

def terminating?
  defined?(@terminating)
end

#trigger(action_class = nil, *args, &block) ⇒ TriggerResult

blocks until action_class is planned if no arguments given, the plan is expected to be returned by a block

Returns:



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/dynflow/world.rb', line 177

def trigger(action_class = nil, *args, &block)
  if action_class.nil?
    raise 'Neither action_class nor a block given' if block.nil?
    execution_plan = block.call(self)
  else
    execution_plan = plan(action_class, *args)
  end
  planned = execution_plan.state == :planned

  if planned
    done = execute(execution_plan.id, Concurrent::Promises.resolvable_future)
    Triggered[execution_plan.id, done]
  else
    PlaningFailed[execution_plan.id, execution_plan.errors.first.exception]
  end
end

#try_spawn(what, lock_class = nil) ⇒ Object



300
301
302
303
304
305
306
307
308
309
# File 'lib/dynflow/world.rb', line 300

def try_spawn(what, lock_class = nil)
  object = nil
  return nil if !executor || (object = @config.public_send(what)).nil?

  coordinator.acquire(lock_class.new(self)) if lock_class
  object.spawn.wait
  object
rescue Coordinator::LockError
  nil
end

#update_registerObject



90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/dynflow/world.rb', line 90

def update_register
  @meta ||= @config.meta
  @meta['queues']           = @config.queues if @executor
  @meta['delayed_executor'] = true if @delayed_executor
  @meta['execution_plan_cleaner'] = true if @execution_plan_cleaner
  @meta['last_seen'] = Dynflow::Dispatcher::ClientDispatcher::PingCache.format_time
  if @already_registered
    coordinator.update_record(registered_world)
  else
    coordinator.register_world(registered_world)
    @already_registered = true
  end
end