Class: Hatchet::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/hatchet/worker_obj.rb

Overview

Worker that processes tasks from the Hatchet engine.

The worker connects to the Hatchet server via gRPC, registers its workflows, listens for task assignments, and executes them in a thread pool.

Examples:

Start a worker

worker = hatchet.worker("my-worker", workflows: [my_workflow])
worker.start

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, client:, workflows: [], slots: 10, durable_slots: nil, labels: {}) ⇒ Worker

Returns a new instance of Worker.

Parameters:

  • name (String)

    Worker name

  • client (Hatchet::Client)

    The Hatchet client

  • workflows (Array<Workflow, Task>) (defaults to: [])

    Workflows to register

  • slots (Integer) (defaults to: 10)

    Number of concurrent task slots (default: 10)

  • durable_slots (Integer, nil) (defaults to: nil)

    Number of durable-task slots; defaults to “slots“

  • labels (Hash) (defaults to: {})

    Worker labels (default: {})



45
46
47
48
49
50
51
52
53
54
55
# File 'lib/hatchet/worker_obj.rb', line 45

def initialize(name:, client:, workflows: [], slots: 10, durable_slots: nil, labels: {})
  @name = name
  @client = client
  @workflows = workflows
  @slots = slots
  @durable_slots = durable_slots || slots
  @labels = client.config.worker_preset_labels.merge(labels)
  @worker_id = nil
  @shutdown = false
  @engine_version = nil
end

Instance Attribute Details

#clientHatchet::Client (readonly)

Returns The Hatchet client.

Returns:



28
29
30
# File 'lib/hatchet/worker_obj.rb', line 28

def client
  @client
end

#durable_slotsInteger (readonly)

Returns Number of durable-task slots (defaults to “slots“).

Returns:

  • (Integer)

    Number of durable-task slots (defaults to “slots“)



34
35
36
# File 'lib/hatchet/worker_obj.rb', line 34

def durable_slots
  @durable_slots
end

#engine_versionString? (readonly)

Returns Engine semantic version detected on “start“.

Returns:

  • (String, nil)

    Engine semantic version detected on “start“



37
38
39
# File 'lib/hatchet/worker_obj.rb', line 37

def engine_version
  @engine_version
end

#labelsHash (readonly)

Returns Worker labels for scheduling.

Returns:

  • (Hash)

    Worker labels for scheduling



25
26
27
# File 'lib/hatchet/worker_obj.rb', line 25

def labels
  @labels
end

#nameString (readonly)

Returns Worker name.

Returns:

  • (String)

    Worker name



16
17
18
# File 'lib/hatchet/worker_obj.rb', line 16

def name
  @name
end

#slotsInteger (readonly)

Returns Number of concurrent task execution slots.

Returns:

  • (Integer)

    Number of concurrent task execution slots



22
23
24
# File 'lib/hatchet/worker_obj.rb', line 22

def slots
  @slots
end

#worker_idString?

Returns Worker ID assigned by the server.

Returns:

  • (String, nil)

    Worker ID assigned by the server



31
32
33
# File 'lib/hatchet/worker_obj.rb', line 31

def worker_id
  @worker_id
end

#workflowsArray<Workflow, Task> (readonly)

Returns Registered workflows.

Returns:



19
20
21
# File 'lib/hatchet/worker_obj.rb', line 19

def workflows
  @workflows
end

Instance Method Details

#contextWorkerContext

Access worker context for label management

Returns:



95
96
97
# File 'lib/hatchet/worker_obj.rb', line 95

def context
  @context ||= WorkerContext.new(worker: self)
end

#startObject

Start the worker. This blocks until shutdown is requested.

  1. Registers workflows with the Hatchet server

  2. Starts the health check server

  3. Starts the action listener

  4. Processes actions in the thread pool

  5. Handles graceful shutdown on SIGINT/SIGTERM



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/hatchet/worker_obj.rb', line 64

def start
  setup_signal_handlers

  @client.config.logger.info("Starting worker '#{@name}' with #{@slots} slots")
  @client.config.logger.info("Registering #{@workflows.length} workflow(s)")

  check_engine_version

  register_workflows

  # Start the health check server if enabled
  start_health_check if @client.config.healthcheck.enabled

  # Start the action listener loop
  run_action_listener
rescue Interrupt
  @shutdown = true
  @client.config.logger.info("Worker '#{@name}' interrupted, shutting down...")
rescue StandardError => e
  @client.config.logger.error("Worker error: #{e.message}")
  raise
end

#stopObject

Request graceful shutdown



88
89
90
# File 'lib/hatchet/worker_obj.rb', line 88

def stop
  @shutdown = true
end