Class: Dynflow::World
- Inherits:
-
Object
- Object
- Dynflow::World
- Includes:
- Algebrick::Matching, Algebrick::TypeCheck, Invalidation
- Defined in:
- lib/dynflow/world.rb,
lib/dynflow/world/invalidation.rb
Overview
rubocop:disable Metrics/ClassLength
Direct Known Subclasses
Defined Under Namespace
Modules: Invalidation, TriggerResult, Triggered
Instance Attribute Summary collapse
-
#action_classes ⇒ Object
readonly
Returns the value of attribute action_classes.
-
#auto_rescue ⇒ Object
readonly
Returns the value of attribute auto_rescue.
-
#auto_validity_check ⇒ Object
readonly
Returns the value of attribute auto_validity_check.
-
#client_dispatcher ⇒ Object
readonly
Returns the value of attribute client_dispatcher.
-
#clock ⇒ Object
readonly
Returns the value of attribute clock.
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#connector ⇒ Object
readonly
Returns the value of attribute connector.
-
#coordinator ⇒ Object
readonly
Returns the value of attribute coordinator.
-
#dead_letter_handler ⇒ Object
readonly
Returns the value of attribute dead_letter_handler.
-
#delayed_executor ⇒ Object
readonly
Returns the value of attribute delayed_executor.
-
#execution_plan_cleaner ⇒ Object
readonly
Returns the value of attribute execution_plan_cleaner.
-
#executor ⇒ Object
readonly
Returns the value of attribute executor.
-
#executor_dispatcher ⇒ Object
readonly
Returns the value of attribute executor_dispatcher.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#logger_adapter ⇒ Object
readonly
Returns the value of attribute logger_adapter.
-
#meta ⇒ Object
readonly
Returns the value of attribute meta.
-
#middleware ⇒ Object
readonly
Returns the value of attribute middleware.
-
#persistence ⇒ Object
readonly
Returns the value of attribute persistence.
-
#subscription_index ⇒ Object
readonly
Returns the value of attribute subscription_index.
-
#terminated ⇒ Object
readonly
Returns the value of attribute terminated.
-
#termination_timeout ⇒ Object
readonly
Returns the value of attribute termination_timeout.
-
#throttle_limiter ⇒ Object
readonly
Returns the value of attribute throttle_limiter.
-
#transaction_adapter ⇒ Object
readonly
Returns the value of attribute transaction_adapter.
-
#validity_check_timeout ⇒ Object
readonly
Returns the value of attribute validity_check_timeout.
Instance Method Summary collapse
- #action_logger ⇒ Object
-
#auto_execute ⇒ Object
24119 - ensure delayed executor is preserved after invalidation executes plans that are planned/paused and haven’t reported any error yet (usually when no executor was available by the time of planning or terminating).
- #before_termination(&block) ⇒ Object
- #chain(plan_uuids, action_class, *args) ⇒ Object
- #delay(action_class, delay_options, *args) ⇒ Object
- #delay_with_options(action_class:, args:, delay_options:, id: nil, caller_action: nil) ⇒ Object
- #event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object
-
#execute(execution_plan_id, done = Concurrent::Promises.resolvable_future) ⇒ Concurrent::Promises::ResolvableFuture
raises when ExecutionPlan is not accepted for execution.
- #get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
- #halt(execution_plan_id, accepted = Concurrent::Promises.resolvable_future) ⇒ Object
-
#initialize(config) ⇒ World
constructor
A new instance of World.
- #logger ⇒ Object
- #ping(world_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
- #ping_without_cache(world_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
- #plan(action_class, *args) ⇒ Object
- #plan_elsewhere(action_class, *args) ⇒ Object
- #plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object
- #plan_request(execution_plan_id, done = Concurrent::Promises.resolvable_future) ⇒ Object
- #plan_with_options(action_class:, args:, id: nil, caller_action: nil) ⇒ Object
-
#post_initialization ⇒ Object
performs steps once the executor is ready and invalidation of previous worls is finished.
- #publish_request(request, done, wait_for_accepted, timeout = nil) ⇒ Object
- #registered_world ⇒ Object
-
#reload! ⇒ Object
reload actions classes, intended only for devel.
- #subscribed_actions(action_class) ⇒ Object
- #terminate(future = Concurrent::Promises.resolvable_future) ⇒ Object
- #terminating? ⇒ Boolean
-
#trigger(action_class = nil, *args, &block) ⇒ TriggerResult
blocks until action_class is planned if no arguments given, the plan is expected to be returned by a block.
- #try_spawn(what, lock_class = nil) ⇒ Object
- #update_register ⇒ Object
Methods included from Invalidation
#invalidate, #invalidate_execution_lock, #invalidate_planning_lock, #locks_validity_check, #perform_validity_checks, #prune_execution_inhibition_locks!, #with_valid_execution_plan_for_lock, #worlds_validity_check
Constructor Details
#initialize(config) ⇒ World
Returns a new instance of World.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/dynflow/world.rb', line 19 def initialize(config) @config = Config::ForWorld.new(config, self) # Set the telemetry instance as soon as possible Dynflow::Telemetry.set_adapter @config.telemetry_adapter Dynflow::Telemetry.register_metrics! @id = SecureRandom.uuid @logger_adapter = @config.logger_adapter @clock = spawn_and_wait(Clock, 'clock', logger) @config.validate @transaction_adapter = @config.transaction_adapter @persistence = Persistence.new(self, @config.persistence_adapter, :backup_deleted_plans => @config.backup_deleted_plans, :backup_dir => @config.backup_dir) @coordinator = Coordinator.new(@config.coordinator_adapter) if @config.executor @executor = Executors::Parallel.new(self, executor_class: @config.executor, heartbeat_interval: @config.executor_heartbeat_interval, queues_options: @config.queues) end @action_classes = @config.action_classes @auto_rescue = @config.auto_rescue @exit_on_terminate = Concurrent::AtomicBoolean.new(@config.exit_on_terminate) @connector = @config.connector @middleware = Middleware::World.new @middleware.use Middleware::Common::Transaction if @transaction_adapter @client_dispatcher = spawn_and_wait(Dispatcher::ClientDispatcher, "client-dispatcher", self, @config.ping_cache_age) @dead_letter_handler = spawn_and_wait(DeadLetterSilencer, 'default_dead_letter_handler', @config.silent_dead_letter_matchers) @auto_validity_check = @config.auto_validity_check @validity_check_timeout = @config.validity_check_timeout @throttle_limiter = @config.throttle_limiter @terminated = Concurrent::Promises.resolvable_event @termination_timeout = @config.termination_timeout calculate_subscription_index if executor @executor_dispatcher = spawn_and_wait(Dispatcher::ExecutorDispatcher, "executor-dispatcher", self, @config.executor_semaphore) executor.initialized.wait end update_register perform_validity_checks if auto_validity_check @termination_barrier = Mutex.new @before_termination_hooks = Queue.new if @config.auto_terminate at_exit do @exit_on_terminate.make_false # make sure we don't terminate twice self.terminate.wait end end post_initialization end |
Instance Attribute Details
#action_classes ⇒ Object (readonly)
Returns the value of attribute action_classes.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def action_classes @action_classes end |
#auto_rescue ⇒ Object (readonly)
Returns the value of attribute auto_rescue.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def auto_rescue @auto_rescue end |
#auto_validity_check ⇒ Object (readonly)
Returns the value of attribute auto_validity_check.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def auto_validity_check @auto_validity_check end |
#client_dispatcher ⇒ Object (readonly)
Returns the value of attribute client_dispatcher.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def client_dispatcher @client_dispatcher end |
#clock ⇒ Object (readonly)
Returns the value of attribute clock.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def clock @clock end |
#config ⇒ Object (readonly)
Returns the value of attribute config.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def config @config end |
#connector ⇒ Object (readonly)
Returns the value of attribute connector.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def connector @connector end |
#coordinator ⇒ Object (readonly)
Returns the value of attribute coordinator.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def coordinator @coordinator end |
#dead_letter_handler ⇒ Object (readonly)
Returns the value of attribute dead_letter_handler.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def dead_letter_handler @dead_letter_handler end |
#delayed_executor ⇒ Object (readonly)
Returns the value of attribute delayed_executor.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def delayed_executor @delayed_executor end |
#execution_plan_cleaner ⇒ Object (readonly)
Returns the value of attribute execution_plan_cleaner.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def execution_plan_cleaner @execution_plan_cleaner end |
#executor ⇒ Object (readonly)
Returns the value of attribute executor.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def executor @executor end |
#executor_dispatcher ⇒ Object (readonly)
Returns the value of attribute executor_dispatcher.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def executor_dispatcher @executor_dispatcher end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def id @id end |
#logger_adapter ⇒ Object (readonly)
Returns the value of attribute logger_adapter.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def logger_adapter @logger_adapter end |
#meta ⇒ Object (readonly)
Returns the value of attribute meta.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def @meta end |
#middleware ⇒ Object (readonly)
Returns the value of attribute middleware.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def middleware @middleware end |
#persistence ⇒ Object (readonly)
Returns the value of attribute persistence.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def persistence @persistence end |
#subscription_index ⇒ Object (readonly)
Returns the value of attribute subscription_index.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def subscription_index @subscription_index end |
#terminated ⇒ Object (readonly)
Returns the value of attribute terminated.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def terminated @terminated end |
#termination_timeout ⇒ Object (readonly)
Returns the value of attribute termination_timeout.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def termination_timeout @termination_timeout end |
#throttle_limiter ⇒ Object (readonly)
Returns the value of attribute throttle_limiter.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def throttle_limiter @throttle_limiter end |
#transaction_adapter ⇒ Object (readonly)
Returns the value of attribute transaction_adapter.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def transaction_adapter @transaction_adapter end |
#validity_check_timeout ⇒ Object (readonly)
Returns the value of attribute validity_check_timeout.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def validity_check_timeout @validity_check_timeout end |
Instance Method Details
#action_logger ⇒ Object
116 117 118 |
# File 'lib/dynflow/world.rb', line 116 def action_logger logger_adapter.action_logger end |
#auto_execute ⇒ Object
24119 - ensure delayed executor is preserved after invalidation executes plans that are planned/paused and haven’t reported any error yet (usually when no executor was available by the time of planning or terminating)
295 296 297 298 299 300 301 302 303 304 305 306 307 308 |
# File 'lib/dynflow/world.rb', line 295 def auto_execute coordinator.acquire(Coordinator::AutoExecuteLock.new(self)) do planned_execution_plans = self.persistence.find_execution_plans filters: { 'state' => %w(planned paused), 'result' => (ExecutionPlan.results - [:error]).map(&:to_s) } planned_execution_plans.map do |ep| if coordinator.find_locks(Dynflow::Coordinator::ExecutionLock.unique_filter(ep.id)).empty? execute(ep.id) end end.compact end rescue Coordinator::LockError => e logger.info "auto-executor lock already aquired: #{e.}" [] end |
#before_termination(&block) ⇒ Object
86 87 88 |
# File 'lib/dynflow/world.rb', line 86 def before_termination(&block) @before_termination_hooks << block end |
#chain(plan_uuids, action_class, *args) ⇒ Object
205 206 207 208 209 210 211 212 213 |
# File 'lib/dynflow/world.rb', line 205 def chain(plan_uuids, action_class, *args) plan_uuids = [plan_uuids] unless plan_uuids.is_a? Array result = (action_class: action_class, args: args, delay_options: { frozen: true }) plan_uuids.each do |plan_uuid| persistence.chain_execution_plan(plan_uuid, result.execution_plan_id) end persistence.set_delayed_plan_frozen(result.execution_plan_id, false) result end |
#delay(action_class, delay_options, *args) ⇒ Object
194 195 196 |
# File 'lib/dynflow/world.rb', line 194 def delay(action_class, , *args) (action_class: action_class, args: args, delay_options: ) end |
#delay_with_options(action_class:, args:, delay_options:, id: nil, caller_action: nil) ⇒ Object
198 199 200 201 202 203 |
# File 'lib/dynflow/world.rb', line 198 def (action_class:, args:, delay_options:, id: nil, caller_action: nil) raise 'No action_class given' if action_class.nil? execution_plan = ExecutionPlan.new(self, id) execution_plan.delay(caller_action, action_class, , *args) Scheduled[execution_plan.id] end |
#event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object
242 243 244 |
# File 'lib/dynflow/world.rb', line 242 def event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future, optional: false) publish_request(Dispatcher::Event[execution_plan_id, step_id, event, nil, optional], done, false) end |
#execute(execution_plan_id, done = Concurrent::Promises.resolvable_future) ⇒ Concurrent::Promises::ResolvableFuture
raises when ExecutionPlan is not accepted for execution
238 239 240 |
# File 'lib/dynflow/world.rb', line 238 def execute(execution_plan_id, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Execution[execution_plan_id], done, true) end |
#get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
262 263 264 |
# File 'lib/dynflow/world.rb', line 262 def get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Status[world_id, execution_plan_id], done, false, timeout) end |
#halt(execution_plan_id, accepted = Concurrent::Promises.resolvable_future) ⇒ Object
266 267 268 269 |
# File 'lib/dynflow/world.rb', line 266 def halt(execution_plan_id, accepted = Concurrent::Promises.resolvable_future) coordinator.acquire(Coordinator::ExecutionInhibitionLock.new(execution_plan_id)) publish_request(Dispatcher::Halt[execution_plan_id], accepted, false) end |
#logger ⇒ Object
112 113 114 |
# File 'lib/dynflow/world.rb', line 112 def logger logger_adapter.dynflow_logger end |
#ping(world_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
254 255 256 |
# File 'lib/dynflow/world.rb', line 254 def ping(world_id, timeout, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Ping[world_id, true], done, false, timeout) end |
#ping_without_cache(world_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
258 259 260 |
# File 'lib/dynflow/world.rb', line 258 def ping_without_cache(world_id, timeout, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Ping[world_id, false], done, false, timeout) end |
#plan(action_class, *args) ⇒ Object
223 224 225 |
# File 'lib/dynflow/world.rb', line 223 def plan(action_class, *args) (action_class: action_class, args: args) end |
#plan_elsewhere(action_class, *args) ⇒ Object
215 216 217 218 219 220 221 |
# File 'lib/dynflow/world.rb', line 215 def plan_elsewhere(action_class, *args) execution_plan = ExecutionPlan.new(self, nil) execution_plan.delay(nil, action_class, {}, *args) plan_request(execution_plan.id) Scheduled[execution_plan.id] end |
#plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object
246 247 248 |
# File 'lib/dynflow/world.rb', line 246 def plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future, optional: false) publish_request(Dispatcher::Event[execution_plan_id, step_id, event, time, optional], accepted, false) end |
#plan_request(execution_plan_id, done = Concurrent::Promises.resolvable_future) ⇒ Object
250 251 252 |
# File 'lib/dynflow/world.rb', line 250 def plan_request(execution_plan_id, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Planning[execution_plan_id], done, false) end |
#plan_with_options(action_class:, args:, id: nil, caller_action: nil) ⇒ Object
227 228 229 230 231 232 233 234 |
# File 'lib/dynflow/world.rb', line 227 def (action_class:, args:, id: nil, caller_action: nil) ExecutionPlan.new(self, id).tap do |execution_plan| coordinator.acquire(Coordinator::PlanningLock.new(self, execution_plan.id)) do execution_plan.prepare(action_class, caller_action: caller_action) execution_plan.plan(*args) end end end |
#post_initialization ⇒ Object
performs steps once the executor is ready and invalidation of previous worls is finished. Needs to be indempotent, as it can be called several times (expecially when auto_validity_check if false, as it should be called after ‘perform_validity_checks` method)
78 79 80 81 82 83 84 |
# File 'lib/dynflow/world.rb', line 78 def post_initialization @delayed_executor ||= try_spawn(:delayed_executor, Coordinator::DelayedExecutorLock) @execution_plan_cleaner ||= try_spawn(:execution_plan_cleaner, Coordinator::ExecutionPlanCleanerLock) update_register @delayed_executor.start if auto_validity_check && @delayed_executor && !@delayed_executor.started? self.auto_execute if @config.auto_execute end |
#publish_request(request, done, wait_for_accepted, timeout = nil) ⇒ Object
271 272 273 274 275 276 277 278 279 280 281 |
# File 'lib/dynflow/world.rb', line 271 def publish_request(request, done, wait_for_accepted, timeout = nil) accepted = Concurrent::Promises.resolvable_future accepted.rescue do |reason| done.reject reason if reason end client_dispatcher.ask([:publish_request, done, request, timeout], accepted) accepted.wait if wait_for_accepted done rescue => e accepted.reject e end |
#registered_world ⇒ Object
104 105 106 107 108 109 110 |
# File 'lib/dynflow/world.rb', line 104 def registered_world if executor Coordinator::ExecutorWorld.new(self) else Coordinator::ClientWorld.new(self) end end |
#reload! ⇒ Object
reload actions classes, intended only for devel
125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/dynflow/world.rb', line 125 def reload! # TODO what happens with newly loaded classes @action_classes = @action_classes.map do |klass| begin Utils.constantize(klass.to_s) rescue NameError nil # ignore missing classes end end.compact middleware.clear_cache! calculate_subscription_index end |
#subscribed_actions(action_class) ⇒ Object
120 121 122 |
# File 'lib/dynflow/world.rb', line 120 def subscribed_actions(action_class) @subscription_index.has_key?(action_class) ? @subscription_index[action_class] : [] end |
#terminate(future = Concurrent::Promises.resolvable_future) ⇒ Object
283 284 285 286 |
# File 'lib/dynflow/world.rb', line 283 def terminate(future = Concurrent::Promises.resolvable_future) start_termination.tangle(future) future end |
#terminating? ⇒ Boolean
288 289 290 |
# File 'lib/dynflow/world.rb', line 288 def terminating? defined?(@terminating) end |
#trigger(action_class = nil, *args, &block) ⇒ TriggerResult
blocks until action_class is planned if no arguments given, the plan is expected to be returned by a block
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/dynflow/world.rb', line 177 def trigger(action_class = nil, *args, &block) if action_class.nil? raise 'Neither action_class nor a block given' if block.nil? execution_plan = block.call(self) else execution_plan = plan(action_class, *args) end planned = execution_plan.state == :planned if planned done = execute(execution_plan.id, Concurrent::Promises.resolvable_future) Triggered[execution_plan.id, done] else PlaningFailed[execution_plan.id, execution_plan.errors.first.exception] end end |
#try_spawn(what, lock_class = nil) ⇒ Object
310 311 312 313 314 315 316 317 318 319 |
# File 'lib/dynflow/world.rb', line 310 def try_spawn(what, lock_class = nil) object = nil return nil if !executor || (object = @config.public_send(what)).nil? coordinator.acquire(lock_class.new(self)) if lock_class object.spawn.wait object rescue Coordinator::LockError nil end |
#update_register ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/dynflow/world.rb', line 90 def update_register @meta ||= @config. @meta['queues'] = @config.queues if @executor @meta['delayed_executor'] = true if @delayed_executor @meta['execution_plan_cleaner'] = true if @execution_plan_cleaner @meta['last_seen'] = Dynflow::Dispatcher::ClientDispatcher::PingCache.format_time if @already_registered coordinator.update_record(registered_world) else coordinator.register_world(registered_world) @already_registered = true end end |