Class: Hatchet::WorkerRuntime::Runner
- Inherits:
-
Object
- Object
- Hatchet::WorkerRuntime::Runner
- Defined in:
- lib/hatchet/worker/runner.rb
Overview
Executes task actions in a thread pool, managing concurrency slots and context variable propagation.
The runner receives actions from the action listener, looks up the corresponding task block, sets up context variables, executes the task, and sends the result back to the dispatcher. For durable tasks, it wires the :class:‘DurableContext` up to the shared :class:`DurableEventListener` and per-run :class:`DurableEviction::DurableEvictionManager` when the engine supports eviction.
Constant Summary collapse
- STARTED_EVENT_RETRY_COUNT =
5- STARTED_EVENT_STOP =
Object.new
Instance Attribute Summary collapse
- #durable_event_listener ⇒ WorkerRuntime::DurableEventListener? readonly
- #eviction_manager ⇒ WorkerRuntime::DurableEviction::DurableEvictionManager? readonly
Instance Method Summary collapse
-
#execute(action) ⇒ Object
Execute an action (task assignment) in the thread pool.
-
#initialize(workflows:, slots:, dispatcher_client:, event_client:, logger:, client:, engine_version: nil, durable_slots: nil, worker_id: nil) ⇒ Runner
constructor
A new instance of Runner.
-
#shutdown(timeout: 30) ⇒ Object
Gracefully shutdown the runner.
Constructor Details
#initialize(workflows:, slots:, dispatcher_client:, event_client:, logger:, client:, engine_version: nil, durable_slots: nil, worker_id: nil) ⇒ Runner
Returns a new instance of Runner.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/hatchet/worker/runner.rb', line 45 def initialize( workflows:, slots:, dispatcher_client:, event_client:, logger:, client:, engine_version: nil, durable_slots: nil, worker_id: nil ) @workflows = workflows @slots = slots @durable_slots = durable_slots || slots @dispatcher_client = dispatcher_client @event_client = event_client @logger = logger @client = client @engine_version = engine_version @worker_id = worker_id @pool = Concurrent::FixedThreadPool.new(slots) @semaphore = Concurrent::Semaphore.new(slots) @task_map = build_task_map @contexts_mu = Monitor.new @contexts = {} @task_threads = {} @step_action_event_queue = Queue.new @step_action_event_thread = Thread.new { process_step_action_events } @has_durable_tasks = @task_map.values.any?(&:durable) @supports_durable_eviction = supports_durable_eviction? @durable_event_listener = build_durable_event_listener @eviction_manager = nil @eviction_manager_mu = Mutex.new end |
Instance Attribute Details
#durable_event_listener ⇒ WorkerRuntime::DurableEventListener? (readonly)
89 90 91 |
# File 'lib/hatchet/worker/runner.rb', line 89 def durable_event_listener @durable_event_listener end |
#eviction_manager ⇒ WorkerRuntime::DurableEviction::DurableEvictionManager? (readonly)
86 87 88 |
# File 'lib/hatchet/worker/runner.rb', line 86 def eviction_manager @eviction_manager end |
Instance Method Details
#execute(action) ⇒ Object
Execute an action (task assignment) in the thread pool.
94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/hatchet/worker/runner.rb', line 94 def execute(action) ensure_eviction_manager_started(action) @semaphore.acquire @pool.post do execute_task(action) ensure @semaphore.release end end |
#shutdown(timeout: 30) ⇒ Object
Gracefully shutdown the runner.
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/hatchet/worker/runner.rb', line 109 def shutdown(timeout: 30) if @eviction_manager begin @eviction_manager.evict_all_waiting rescue StandardError => e @logger.warn("Runner: failed to evict waiting durable runs during shutdown: #{e.class}: #{e.}") end end @pool.shutdown @pool.wait_for_termination(timeout) stop_step_action_event_thread @durable_event_listener&.stop end |