Class: Dynflow::World
- Inherits:
-
Object
- Object
- Dynflow::World
- Includes:
- Algebrick::Matching, Algebrick::TypeCheck, Invalidation
- Defined in:
- lib/dynflow/world.rb,
lib/dynflow/world/invalidation.rb
Overview
rubocop:disable Metrics/ClassLength
Direct Known Subclasses
Defined Under Namespace
Modules: Invalidation, TriggerResult, Triggered
Instance Attribute Summary collapse
-
#action_classes ⇒ Object
readonly
Returns the value of attribute action_classes.
-
#auto_rescue ⇒ Object
readonly
Returns the value of attribute auto_rescue.
-
#auto_validity_check ⇒ Object
readonly
Returns the value of attribute auto_validity_check.
-
#client_dispatcher ⇒ Object
readonly
Returns the value of attribute client_dispatcher.
-
#clock ⇒ Object
readonly
Returns the value of attribute clock.
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#connector ⇒ Object
readonly
Returns the value of attribute connector.
-
#coordinator ⇒ Object
readonly
Returns the value of attribute coordinator.
-
#dead_letter_handler ⇒ Object
readonly
Returns the value of attribute dead_letter_handler.
-
#delayed_executor ⇒ Object
readonly
Returns the value of attribute delayed_executor.
-
#execution_plan_cleaner ⇒ Object
readonly
Returns the value of attribute execution_plan_cleaner.
-
#executor ⇒ Object
readonly
Returns the value of attribute executor.
-
#executor_dispatcher ⇒ Object
readonly
Returns the value of attribute executor_dispatcher.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#logger_adapter ⇒ Object
readonly
Returns the value of attribute logger_adapter.
-
#meta ⇒ Object
readonly
Returns the value of attribute meta.
-
#middleware ⇒ Object
readonly
Returns the value of attribute middleware.
-
#persistence ⇒ Object
readonly
Returns the value of attribute persistence.
-
#subscription_index ⇒ Object
readonly
Returns the value of attribute subscription_index.
-
#terminated ⇒ Object
readonly
Returns the value of attribute terminated.
-
#termination_timeout ⇒ Object
readonly
Returns the value of attribute termination_timeout.
-
#throttle_limiter ⇒ Object
readonly
Returns the value of attribute throttle_limiter.
-
#transaction_adapter ⇒ Object
readonly
Returns the value of attribute transaction_adapter.
-
#validity_check_timeout ⇒ Object
readonly
Returns the value of attribute validity_check_timeout.
Instance Method Summary collapse
- #action_logger ⇒ Object
-
#auto_execute ⇒ Object
24119 - ensure delayed executor is preserved after invalidation executes plans that are planned/paused and haven’t reported any error yet (usually when no executor was available by the time of planning or terminating).
- #before_termination(&block) ⇒ Object
- #delay(action_class, delay_options, *args) ⇒ Object
- #delay_with_options(action_class:, args:, delay_options:, id: nil, caller_action: nil) ⇒ Object
- #event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object
-
#execute(execution_plan_id, done = Concurrent::Promises.resolvable_future) ⇒ Concurrent::Promises::ResolvableFuture
raises when ExecutionPlan is not accepted for execution.
- #get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
- #halt(execution_plan_id, accepted = Concurrent::Promises.resolvable_future) ⇒ Object
-
#initialize(config) ⇒ World
constructor
A new instance of World.
- #logger ⇒ Object
- #ping(world_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
- #ping_without_cache(world_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
- #plan(action_class, *args) ⇒ Object
- #plan_elsewhere(action_class, *args) ⇒ Object
- #plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object
- #plan_request(execution_plan_id, done = Concurrent::Promises.resolvable_future) ⇒ Object
- #plan_with_options(action_class:, args:, id: nil, caller_action: nil) ⇒ Object
-
#post_initialization ⇒ Object
performs steps once the executor is ready and invalidation of previous worls is finished.
- #publish_request(request, done, wait_for_accepted, timeout = nil) ⇒ Object
- #registered_world ⇒ Object
-
#reload! ⇒ Object
reload actions classes, intended only for devel.
- #subscribed_actions(action_class) ⇒ Object
- #terminate(future = Concurrent::Promises.resolvable_future) ⇒ Object
- #terminating? ⇒ Boolean
-
#trigger(action_class = nil, *args, &block) ⇒ TriggerResult
blocks until action_class is planned if no arguments given, the plan is expected to be returned by a block.
- #try_spawn(what, lock_class = nil) ⇒ Object
- #update_register ⇒ Object
Methods included from Invalidation
#invalidate, #invalidate_execution_lock, #invalidate_planning_lock, #locks_validity_check, #perform_validity_checks, #prune_execution_inhibition_locks!, #with_valid_execution_plan_for_lock, #worlds_validity_check
Constructor Details
#initialize(config) ⇒ World
Returns a new instance of World.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/dynflow/world.rb', line 19 def initialize(config) @config = Config::ForWorld.new(config, self) # Set the telemetry instance as soon as possible Dynflow::Telemetry.set_adapter @config.telemetry_adapter Dynflow::Telemetry.register_metrics! @id = SecureRandom.uuid @logger_adapter = @config.logger_adapter @clock = spawn_and_wait(Clock, 'clock', logger) @config.validate @transaction_adapter = @config.transaction_adapter @persistence = Persistence.new(self, @config.persistence_adapter, :backup_deleted_plans => @config.backup_deleted_plans, :backup_dir => @config.backup_dir) @coordinator = Coordinator.new(@config.coordinator_adapter) if @config.executor @executor = Executors::Parallel.new(self, executor_class: @config.executor, heartbeat_interval: @config.executor_heartbeat_interval, queues_options: @config.queues) end @action_classes = @config.action_classes @auto_rescue = @config.auto_rescue @exit_on_terminate = Concurrent::AtomicBoolean.new(@config.exit_on_terminate) @connector = @config.connector @middleware = Middleware::World.new @middleware.use Middleware::Common::Transaction if @transaction_adapter @client_dispatcher = spawn_and_wait(Dispatcher::ClientDispatcher, "client-dispatcher", self, @config.ping_cache_age) @dead_letter_handler = spawn_and_wait(DeadLetterSilencer, 'default_dead_letter_handler', @config.silent_dead_letter_matchers) @auto_validity_check = @config.auto_validity_check @validity_check_timeout = @config.validity_check_timeout @throttle_limiter = @config.throttle_limiter @terminated = Concurrent::Promises.resolvable_event @termination_timeout = @config.termination_timeout calculate_subscription_index if executor @executor_dispatcher = spawn_and_wait(Dispatcher::ExecutorDispatcher, "executor-dispatcher", self, @config.executor_semaphore) executor.initialized.wait end update_register perform_validity_checks if auto_validity_check @termination_barrier = Mutex.new @before_termination_hooks = Queue.new if @config.auto_terminate at_exit do @exit_on_terminate.make_false # make sure we don't terminate twice self.terminate.wait end end post_initialization end |
Instance Attribute Details
#action_classes ⇒ Object (readonly)
Returns the value of attribute action_classes.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def action_classes @action_classes end |
#auto_rescue ⇒ Object (readonly)
Returns the value of attribute auto_rescue.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def auto_rescue @auto_rescue end |
#auto_validity_check ⇒ Object (readonly)
Returns the value of attribute auto_validity_check.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def auto_validity_check @auto_validity_check end |
#client_dispatcher ⇒ Object (readonly)
Returns the value of attribute client_dispatcher.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def client_dispatcher @client_dispatcher end |
#clock ⇒ Object (readonly)
Returns the value of attribute clock.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def clock @clock end |
#config ⇒ Object (readonly)
Returns the value of attribute config.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def config @config end |
#connector ⇒ Object (readonly)
Returns the value of attribute connector.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def connector @connector end |
#coordinator ⇒ Object (readonly)
Returns the value of attribute coordinator.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def coordinator @coordinator end |
#dead_letter_handler ⇒ Object (readonly)
Returns the value of attribute dead_letter_handler.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def dead_letter_handler @dead_letter_handler end |
#delayed_executor ⇒ Object (readonly)
Returns the value of attribute delayed_executor.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def delayed_executor @delayed_executor end |
#execution_plan_cleaner ⇒ Object (readonly)
Returns the value of attribute execution_plan_cleaner.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def execution_plan_cleaner @execution_plan_cleaner end |
#executor ⇒ Object (readonly)
Returns the value of attribute executor.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def executor @executor end |
#executor_dispatcher ⇒ Object (readonly)
Returns the value of attribute executor_dispatcher.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def executor_dispatcher @executor_dispatcher end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def id @id end |
#logger_adapter ⇒ Object (readonly)
Returns the value of attribute logger_adapter.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def logger_adapter @logger_adapter end |
#meta ⇒ Object (readonly)
Returns the value of attribute meta.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def @meta end |
#middleware ⇒ Object (readonly)
Returns the value of attribute middleware.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def middleware @middleware end |
#persistence ⇒ Object (readonly)
Returns the value of attribute persistence.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def persistence @persistence end |
#subscription_index ⇒ Object (readonly)
Returns the value of attribute subscription_index.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def subscription_index @subscription_index end |
#terminated ⇒ Object (readonly)
Returns the value of attribute terminated.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def terminated @terminated end |
#termination_timeout ⇒ Object (readonly)
Returns the value of attribute termination_timeout.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def termination_timeout @termination_timeout end |
#throttle_limiter ⇒ Object (readonly)
Returns the value of attribute throttle_limiter.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def throttle_limiter @throttle_limiter end |
#transaction_adapter ⇒ Object (readonly)
Returns the value of attribute transaction_adapter.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def transaction_adapter @transaction_adapter end |
#validity_check_timeout ⇒ Object (readonly)
Returns the value of attribute validity_check_timeout.
13 14 15 |
# File 'lib/dynflow/world.rb', line 13 def validity_check_timeout @validity_check_timeout end |
Instance Method Details
#action_logger ⇒ Object
116 117 118 |
# File 'lib/dynflow/world.rb', line 116 def action_logger logger_adapter.action_logger end |
#auto_execute ⇒ Object
24119 - ensure delayed executor is preserved after invalidation executes plans that are planned/paused and haven’t reported any error yet (usually when no executor was available by the time of planning or terminating)
285 286 287 288 289 290 291 292 293 294 295 296 297 298 |
# File 'lib/dynflow/world.rb', line 285 def auto_execute coordinator.acquire(Coordinator::AutoExecuteLock.new(self)) do planned_execution_plans = self.persistence.find_execution_plans filters: { 'state' => %w(planned paused), 'result' => (ExecutionPlan.results - [:error]).map(&:to_s) } planned_execution_plans.map do |ep| if coordinator.find_locks(Dynflow::Coordinator::ExecutionLock.unique_filter(ep.id)).empty? execute(ep.id) end end.compact end rescue Coordinator::LockError => e logger.info "auto-executor lock already aquired: #{e.}" [] end |
#before_termination(&block) ⇒ Object
86 87 88 |
# File 'lib/dynflow/world.rb', line 86 def before_termination(&block) @before_termination_hooks << block end |
#delay(action_class, delay_options, *args) ⇒ Object
194 195 196 |
# File 'lib/dynflow/world.rb', line 194 def delay(action_class, , *args) (action_class: action_class, args: args, delay_options: ) end |
#delay_with_options(action_class:, args:, delay_options:, id: nil, caller_action: nil) ⇒ Object
198 199 200 201 202 203 |
# File 'lib/dynflow/world.rb', line 198 def (action_class:, args:, delay_options:, id: nil, caller_action: nil) raise 'No action_class given' if action_class.nil? execution_plan = ExecutionPlan.new(self, id) execution_plan.delay(caller_action, action_class, , *args) Scheduled[execution_plan.id] end |
#event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object
232 233 234 |
# File 'lib/dynflow/world.rb', line 232 def event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future, optional: false) publish_request(Dispatcher::Event[execution_plan_id, step_id, event, nil, optional], done, false) end |
#execute(execution_plan_id, done = Concurrent::Promises.resolvable_future) ⇒ Concurrent::Promises::ResolvableFuture
raises when ExecutionPlan is not accepted for execution
228 229 230 |
# File 'lib/dynflow/world.rb', line 228 def execute(execution_plan_id, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Execution[execution_plan_id], done, true) end |
#get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
252 253 254 |
# File 'lib/dynflow/world.rb', line 252 def get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Status[world_id, execution_plan_id], done, false, timeout) end |
#halt(execution_plan_id, accepted = Concurrent::Promises.resolvable_future) ⇒ Object
256 257 258 259 |
# File 'lib/dynflow/world.rb', line 256 def halt(execution_plan_id, accepted = Concurrent::Promises.resolvable_future) coordinator.acquire(Coordinator::ExecutionInhibitionLock.new(execution_plan_id)) publish_request(Dispatcher::Halt[execution_plan_id], accepted, false) end |
#logger ⇒ Object
112 113 114 |
# File 'lib/dynflow/world.rb', line 112 def logger logger_adapter.dynflow_logger end |
#ping(world_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
244 245 246 |
# File 'lib/dynflow/world.rb', line 244 def ping(world_id, timeout, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Ping[world_id, true], done, false, timeout) end |
#ping_without_cache(world_id, timeout, done = Concurrent::Promises.resolvable_future) ⇒ Object
248 249 250 |
# File 'lib/dynflow/world.rb', line 248 def ping_without_cache(world_id, timeout, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Ping[world_id, false], done, false, timeout) end |
#plan(action_class, *args) ⇒ Object
213 214 215 |
# File 'lib/dynflow/world.rb', line 213 def plan(action_class, *args) (action_class: action_class, args: args) end |
#plan_elsewhere(action_class, *args) ⇒ Object
205 206 207 208 209 210 211 |
# File 'lib/dynflow/world.rb', line 205 def plan_elsewhere(action_class, *args) execution_plan = ExecutionPlan.new(self, nil) execution_plan.delay(nil, action_class, {}, *args) plan_request(execution_plan.id) Scheduled[execution_plan.id] end |
#plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future, optional: false) ⇒ Object
236 237 238 |
# File 'lib/dynflow/world.rb', line 236 def plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future, optional: false) publish_request(Dispatcher::Event[execution_plan_id, step_id, event, time, optional], accepted, false) end |
#plan_request(execution_plan_id, done = Concurrent::Promises.resolvable_future) ⇒ Object
240 241 242 |
# File 'lib/dynflow/world.rb', line 240 def plan_request(execution_plan_id, done = Concurrent::Promises.resolvable_future) publish_request(Dispatcher::Planning[execution_plan_id], done, false) end |
#plan_with_options(action_class:, args:, id: nil, caller_action: nil) ⇒ Object
217 218 219 220 221 222 223 224 |
# File 'lib/dynflow/world.rb', line 217 def (action_class:, args:, id: nil, caller_action: nil) ExecutionPlan.new(self, id).tap do |execution_plan| coordinator.acquire(Coordinator::PlanningLock.new(self, execution_plan.id)) do execution_plan.prepare(action_class, caller_action: caller_action) execution_plan.plan(*args) end end end |
#post_initialization ⇒ Object
performs steps once the executor is ready and invalidation of previous worls is finished. Needs to be indempotent, as it can be called several times (expecially when auto_validity_check if false, as it should be called after ‘perform_validity_checks` method)
78 79 80 81 82 83 84 |
# File 'lib/dynflow/world.rb', line 78 def post_initialization @delayed_executor ||= try_spawn(:delayed_executor, Coordinator::DelayedExecutorLock) @execution_plan_cleaner ||= try_spawn(:execution_plan_cleaner, Coordinator::ExecutionPlanCleanerLock) update_register @delayed_executor.start if auto_validity_check && @delayed_executor && !@delayed_executor.started? self.auto_execute if @config.auto_execute end |
#publish_request(request, done, wait_for_accepted, timeout = nil) ⇒ Object
261 262 263 264 265 266 267 268 269 270 271 |
# File 'lib/dynflow/world.rb', line 261 def publish_request(request, done, wait_for_accepted, timeout = nil) accepted = Concurrent::Promises.resolvable_future accepted.rescue do |reason| done.reject reason if reason end client_dispatcher.ask([:publish_request, done, request, timeout], accepted) accepted.wait if wait_for_accepted done rescue => e accepted.reject e end |
#registered_world ⇒ Object
104 105 106 107 108 109 110 |
# File 'lib/dynflow/world.rb', line 104 def registered_world if executor Coordinator::ExecutorWorld.new(self) else Coordinator::ClientWorld.new(self) end end |
#reload! ⇒ Object
reload actions classes, intended only for devel
125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/dynflow/world.rb', line 125 def reload! # TODO what happens with newly loaded classes @action_classes = @action_classes.map do |klass| begin Utils.constantize(klass.to_s) rescue NameError nil # ignore missing classes end end.compact middleware.clear_cache! calculate_subscription_index end |
#subscribed_actions(action_class) ⇒ Object
120 121 122 |
# File 'lib/dynflow/world.rb', line 120 def subscribed_actions(action_class) @subscription_index.has_key?(action_class) ? @subscription_index[action_class] : [] end |
#terminate(future = Concurrent::Promises.resolvable_future) ⇒ Object
273 274 275 276 |
# File 'lib/dynflow/world.rb', line 273 def terminate(future = Concurrent::Promises.resolvable_future) start_termination.tangle(future) future end |
#terminating? ⇒ Boolean
278 279 280 |
# File 'lib/dynflow/world.rb', line 278 def terminating? defined?(@terminating) end |
#trigger(action_class = nil, *args, &block) ⇒ TriggerResult
blocks until action_class is planned if no arguments given, the plan is expected to be returned by a block
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/dynflow/world.rb', line 177 def trigger(action_class = nil, *args, &block) if action_class.nil? raise 'Neither action_class nor a block given' if block.nil? execution_plan = block.call(self) else execution_plan = plan(action_class, *args) end planned = execution_plan.state == :planned if planned done = execute(execution_plan.id, Concurrent::Promises.resolvable_future) Triggered[execution_plan.id, done] else PlaningFailed[execution_plan.id, execution_plan.errors.first.exception] end end |
#try_spawn(what, lock_class = nil) ⇒ Object
300 301 302 303 304 305 306 307 308 309 |
# File 'lib/dynflow/world.rb', line 300 def try_spawn(what, lock_class = nil) object = nil return nil if !executor || (object = @config.public_send(what)).nil? coordinator.acquire(lock_class.new(self)) if lock_class object.spawn.wait object rescue Coordinator::LockError nil end |
#update_register ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/dynflow/world.rb', line 90 def update_register @meta ||= @config. @meta['queues'] = @config.queues if @executor @meta['delayed_executor'] = true if @delayed_executor @meta['execution_plan_cleaner'] = true if @execution_plan_cleaner @meta['last_seen'] = Dynflow::Dispatcher::ClientDispatcher::PingCache.format_time if @already_registered coordinator.update_record(registered_world) else coordinator.register_world(registered_world) @already_registered = true end end |