Class: Hatchet::WorkerRuntime::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/hatchet/worker/runner.rb

Overview

Executes task actions in a thread pool, managing concurrency slots and context variable propagation.

The runner receives actions from the action listener, looks up the corresponding task block, sets up context variables, executes the task, and sends the result back to the dispatcher. For durable tasks, it wires the :class:‘DurableContext` up to the shared :class:`DurableEventListener` and per-run :class:`DurableEviction::DurableEvictionManager` when the engine supports eviction.

Examples:

runner = Runner.new(
  workflows: [my_workflow],
  slots: 10,
  dispatcher_client: dispatcher_grpc,
  event_client: event_grpc,
  logger: logger,
  client: hatchet_client,
  engine_version: "v0.80.0",
  durable_slots: 10,
)
runner.execute(action)

Constant Summary collapse

STARTED_EVENT_RETRY_COUNT =
5
STARTED_EVENT_STOP =
Object.new

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(workflows:, slots:, dispatcher_client:, event_client:, logger:, client:, engine_version: nil, durable_slots: nil, worker_id: nil) ⇒ Runner

Returns a new instance of Runner.

Parameters:

  • workflows (Array<Workflow, Task>)

    Registered workflows

  • slots (Integer)

    Maximum concurrent task slots

  • dispatcher_client (Hatchet::Clients::Grpc::Dispatcher)

    gRPC dispatcher client

  • event_client (Hatchet::Clients::Grpc::EventClient)

    gRPC event client

  • logger (Logger)

    Logger instance

  • client (Hatchet::Client)

    The Hatchet client

  • engine_version (String, nil) (defaults to: nil)

    Engine semantic version (from GetVersion)

  • durable_slots (Integer, nil) (defaults to: nil)

    Separate slot count for durable tasks; defaults to “slots“.

  • worker_id (String, nil) (defaults to: nil)

    Worker ID from registration; stamped onto durable calls that need it.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/hatchet/worker/runner.rb', line 45

def initialize(
  workflows:,
  slots:,
  dispatcher_client:,
  event_client:,
  logger:,
  client:,
  engine_version: nil,
  durable_slots: nil,
  worker_id: nil
)
  @workflows = workflows
  @slots = slots
  @durable_slots = durable_slots || slots
  @dispatcher_client = dispatcher_client
  @event_client = event_client
  @logger = logger
  @client = client
  @engine_version = engine_version
  @worker_id = worker_id

  @pool = Concurrent::FixedThreadPool.new(slots)
  @semaphore = Concurrent::Semaphore.new(slots)

  @task_map = build_task_map

  @contexts_mu = Monitor.new
  @contexts = {}
  @task_threads = {}
  @step_action_event_queue = Queue.new
  @step_action_event_thread = Thread.new { process_step_action_events }

  @has_durable_tasks = @task_map.values.any?(&:durable)
  @supports_durable_eviction = supports_durable_eviction?

  @durable_event_listener = build_durable_event_listener
  @eviction_manager = nil
  @eviction_manager_mu = Mutex.new
end

Instance Attribute Details

#durable_event_listenerWorkerRuntime::DurableEventListener? (readonly)



89
90
91
# File 'lib/hatchet/worker/runner.rb', line 89

def durable_event_listener
  @durable_event_listener
end

#eviction_managerWorkerRuntime::DurableEviction::DurableEvictionManager? (readonly)



86
87
88
# File 'lib/hatchet/worker/runner.rb', line 86

def eviction_manager
  @eviction_manager
end

Instance Method Details

#execute(action) ⇒ Object

Execute an action (task assignment) in the thread pool.

Parameters:



94
95
96
97
98
99
100
101
102
103
104
# File 'lib/hatchet/worker/runner.rb', line 94

def execute(action)
  ensure_eviction_manager_started(action)

  @semaphore.acquire

  @pool.post do
    execute_task(action)
  ensure
    @semaphore.release
  end
end

#shutdown(timeout: 30) ⇒ Object

Gracefully shutdown the runner.

Parameters:

  • timeout (Integer) (defaults to: 30)

    Seconds to wait for in-progress tasks



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/hatchet/worker/runner.rb', line 109

def shutdown(timeout: 30)
  if @eviction_manager
    begin
      @eviction_manager.evict_all_waiting
    rescue StandardError => e
      @logger.warn("Runner: failed to evict waiting durable runs during shutdown: #{e.class}: #{e.message}")
    end
  end

  @pool.shutdown
  @pool.wait_for_termination(timeout)
  stop_step_action_event_thread

  @durable_event_listener&.stop
end