Class: Dynflow::Executors::Parallel::Worker
- Inherits:
-
Actor
- Object
- Concurrent::Actor::Context
- Actor
- Dynflow::Executors::Parallel::Worker
- Defined in:
- lib/dynflow/executors/parallel/worker.rb
Instance Method Summary collapse
-
#initialize(pool, transaction_adapter, telemetry_options = {}) ⇒ Worker
constructor
A new instance of Worker.
- #on_message(work_item) ⇒ Object
Methods inherited from Actor
#behaviour_definition, #finish_termination, #start_termination, #terminating?
Methods included from Actor::LogWithFullBacktrace
Constructor Details
#initialize(pool, transaction_adapter, telemetry_options = {}) ⇒ Worker
Returns a new instance of Worker.
7 8 9 10 11 |
# File 'lib/dynflow/executors/parallel/worker.rb', line 7 def initialize(pool, transaction_adapter, = {}) @pool = Type! pool, Concurrent::Actor::Reference @transaction_adapter = Type! transaction_adapter, TransactionAdapters::Abstract @telemetry_options = end |
Instance Method Details
#on_message(work_item) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/dynflow/executors/parallel/worker.rb', line 13 def (work_item) already_responded = false Executors.run_user_code do work_item.execute end rescue Errors::PersistenceError => e @pool.tell([:handle_persistence_error, reference, e, work_item]) already_responded = true ensure Dynflow::Telemetry.with_instance { |t| t.increment_counter(:dynflow_worker_events, 1, @telemetry_options) } if !already_responded && Concurrent.global_io_executor.running? @pool.tell([:worker_done, reference, work_item]) end end |