Class: Hatchet::Worker
- Inherits:
-
Object
- Object
- Hatchet::Worker
- Defined in:
- lib/hatchet/worker_obj.rb
Overview
Worker that processes tasks from the Hatchet engine.
The worker connects to the Hatchet server via gRPC, registers its workflows, listens for task assignments, and executes them in a thread pool.
Instance Attribute Summary collapse
-
#client ⇒ Hatchet::Client
readonly
The Hatchet client.
-
#durable_slots ⇒ Integer
readonly
Number of durable-task slots (defaults to “slots“).
-
#engine_version ⇒ String?
readonly
Engine semantic version detected on “start“.
-
#labels ⇒ Hash
readonly
Worker labels for scheduling.
-
#name ⇒ String
readonly
Worker name.
-
#slots ⇒ Integer
readonly
Number of concurrent task execution slots.
-
#worker_id ⇒ String?
Worker ID assigned by the server.
-
#workflows ⇒ Array<Workflow, Task>
readonly
Registered workflows.
Instance Method Summary collapse
-
#context ⇒ WorkerContext
Access worker context for label management.
-
#initialize(name:, client:, workflows: [], slots: 10, durable_slots: nil, labels: {}) ⇒ Worker
constructor
A new instance of Worker.
-
#start ⇒ Object
Start the worker.
-
#stop ⇒ Object
Request graceful shutdown.
Constructor Details
#initialize(name:, client:, workflows: [], slots: 10, durable_slots: nil, labels: {}) ⇒ Worker
Returns a new instance of Worker.
45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/hatchet/worker_obj.rb', line 45 def initialize(name:, client:, workflows: [], slots: 10, durable_slots: nil, labels: {}) @name = name @client = client @workflows = workflows @slots = slots @durable_slots = durable_slots || slots @labels = client.config.worker_preset_labels.merge(labels) @worker_id = nil @shutdown = false @engine_version = nil end |
Instance Attribute Details
#client ⇒ Hatchet::Client (readonly)
Returns The Hatchet client.
28 29 30 |
# File 'lib/hatchet/worker_obj.rb', line 28 def client @client end |
#durable_slots ⇒ Integer (readonly)
Returns Number of durable-task slots (defaults to “slots“).
34 35 36 |
# File 'lib/hatchet/worker_obj.rb', line 34 def durable_slots @durable_slots end |
#engine_version ⇒ String? (readonly)
Returns Engine semantic version detected on “start“.
37 38 39 |
# File 'lib/hatchet/worker_obj.rb', line 37 def engine_version @engine_version end |
#labels ⇒ Hash (readonly)
Returns Worker labels for scheduling.
25 26 27 |
# File 'lib/hatchet/worker_obj.rb', line 25 def labels @labels end |
#name ⇒ String (readonly)
Returns Worker name.
16 17 18 |
# File 'lib/hatchet/worker_obj.rb', line 16 def name @name end |
#slots ⇒ Integer (readonly)
Returns Number of concurrent task execution slots.
22 23 24 |
# File 'lib/hatchet/worker_obj.rb', line 22 def slots @slots end |
#worker_id ⇒ String?
Returns Worker ID assigned by the server.
31 32 33 |
# File 'lib/hatchet/worker_obj.rb', line 31 def worker_id @worker_id end |
Instance Method Details
#context ⇒ WorkerContext
Access worker context for label management
95 96 97 |
# File 'lib/hatchet/worker_obj.rb', line 95 def context @context ||= WorkerContext.new(worker: self) end |
#start ⇒ Object
Start the worker. This blocks until shutdown is requested.
-
Registers workflows with the Hatchet server
-
Starts the health check server
-
Starts the action listener
-
Processes actions in the thread pool
-
Handles graceful shutdown on SIGINT/SIGTERM
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/hatchet/worker_obj.rb', line 64 def start setup_signal_handlers @client.config.logger.info("Starting worker '#{@name}' with #{@slots} slots") @client.config.logger.info("Registering #{@workflows.length} workflow(s)") check_engine_version register_workflows # Start the health check server if enabled start_health_check if @client.config.healthcheck.enabled # Start the action listener loop run_action_listener rescue Interrupt @shutdown = true @client.config.logger.info("Worker '#{@name}' interrupted, shutting down...") rescue StandardError => e @client.config.logger.error("Worker error: #{e.}") raise end |
#stop ⇒ Object
Request graceful shutdown
88 89 90 |
# File 'lib/hatchet/worker_obj.rb', line 88 def stop @shutdown = true end |