Class: Dynflow::Executors::Parallel::Worker

Inherits:
Actor
  • Object
show all
Defined in:
lib/dynflow/executors/parallel/worker.rb

Instance Method Summary collapse

Methods inherited from Actor

#behaviour_definition, #finish_termination, #start_termination, #terminating?

Methods included from Actor::LogWithFullBacktrace

#log

Constructor Details

#initialize(pool, transaction_adapter, telemetry_options = {}) ⇒ Worker

Returns a new instance of Worker.



7
8
9
10
11
# File 'lib/dynflow/executors/parallel/worker.rb', line 7

def initialize(pool, transaction_adapter, telemetry_options = {})
  @pool                = Type! pool, Concurrent::Actor::Reference
  @transaction_adapter = Type! transaction_adapter, TransactionAdapters::Abstract
  @telemetry_options   = telemetry_options
end

Instance Method Details

#on_message(work_item) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/dynflow/executors/parallel/worker.rb', line 13

def on_message(work_item)
  already_responded = false
  Executors.run_user_code do
    work_item.execute
  end
rescue Errors::PersistenceError => e
  @pool.tell([:handle_persistence_error, reference, e, work_item])
  already_responded = true
ensure
  Dynflow::Telemetry.with_instance { |t| t.increment_counter(:dynflow_worker_events, 1, @telemetry_options) }
  if !already_responded && Concurrent.global_io_executor.running?
    @pool.tell([:worker_done, reference, work_item])
  end
end