Class: Conductor::Worker::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/conductor/worker/worker.rb

Overview

Worker class that wraps an execute function Handles various return types and keyword argument mapping

Constant Summary collapse

DEFAULTS =

Default configuration values

{
  poll_interval: 100,       # milliseconds
  thread_count: 1,
  domain: nil,
  worker_id: nil,
  poll_timeout: 100,        # milliseconds
  register_task_def: false,
  overwrite_task_def: true,
  strict_schema: false,
  paused: false,
  isolation: :thread,
  executor: :thread_pool
}.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(task_definition_name, execute_function = nil, **options) {|task| ... } ⇒ Worker

Initialize a worker

Parameters:

  • task_definition_name (String)

    Task definition name in Conductor

  • execute_function (Proc, Method, nil) (defaults to: nil)

    Function to execute tasks

  • options (Hash)

    Worker configuration options

Yields:

  • (task)

    Block to execute tasks (alternative to execute_function)

Raises:

  • (ArgumentError)


47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/conductor/worker/worker.rb', line 47

def initialize(task_definition_name, execute_function = nil, **options, &block)
  @task_definition_name = task_definition_name
  @execute_function = execute_function || block

  raise ArgumentError, 'execute_function or block required' unless @execute_function

  # Apply options with defaults
  DEFAULTS.each do |key, default|
    value = options.key?(key) ? options[key] : default
    send("#{key}=", value)
  end

  @task_def_template = options[:task_def_template]

  # Analyze the execute function for parameter mapping
  @takes_task_object = analyze_execute_function
end

Instance Attribute Details

#domainObject

Configuration attributes



22
23
24
# File 'lib/conductor/worker/worker.rb', line 22

def domain
  @domain
end

#execute_functionProc, ... (readonly)

Returns Function to execute tasks.

Returns:

  • (Proc, Method, nil)

    Function to execute tasks



19
20
21
# File 'lib/conductor/worker/worker.rb', line 19

def execute_function
  @execute_function
end

#executorObject

Configuration attributes



22
23
24
# File 'lib/conductor/worker/worker.rb', line 22

def executor
  @executor
end

#isolationObject

Configuration attributes



22
23
24
# File 'lib/conductor/worker/worker.rb', line 22

def isolation
  @isolation
end

#overwrite_task_defObject

Configuration attributes



22
23
24
# File 'lib/conductor/worker/worker.rb', line 22

def overwrite_task_def
  @overwrite_task_def
end

#pausedObject

Configuration attributes



22
23
24
# File 'lib/conductor/worker/worker.rb', line 22

def paused
  @paused
end

#poll_intervalObject

Configuration attributes



22
23
24
# File 'lib/conductor/worker/worker.rb', line 22

def poll_interval
  @poll_interval
end

#poll_timeoutObject

Configuration attributes



22
23
24
# File 'lib/conductor/worker/worker.rb', line 22

def poll_timeout
  @poll_timeout
end

#register_task_defObject

Configuration attributes



22
23
24
# File 'lib/conductor/worker/worker.rb', line 22

def register_task_def
  @register_task_def
end

#strict_schemaObject

Configuration attributes



22
23
24
# File 'lib/conductor/worker/worker.rb', line 22

def strict_schema
  @strict_schema
end

#task_def_templateObject

Configuration attributes



22
23
24
# File 'lib/conductor/worker/worker.rb', line 22

def task_def_template
  @task_def_template
end

#task_definition_nameString (readonly)

Returns Task definition name in Conductor.

Returns:

  • (String)

    Task definition name in Conductor



16
17
18
# File 'lib/conductor/worker/worker.rb', line 16

def task_definition_name
  @task_definition_name
end

#thread_countObject

Configuration attributes



22
23
24
# File 'lib/conductor/worker/worker.rb', line 22

def thread_count
  @thread_count
end

#worker_idObject

Configuration attributes



22
23
24
# File 'lib/conductor/worker/worker.rb', line 22

def worker_id
  @worker_id
end

Class Method Details

.define(task_definition_name, **options) {|task| ... } ⇒ Worker

Define a worker using a block Registers the worker in the global registry

Parameters:

  • task_definition_name (String)

    Task definition name

  • options (Hash)

    Worker configuration options

Yields:

  • (task)

    Block to execute tasks

Returns:

  • (Worker)

    The created worker



97
98
99
100
101
# File 'lib/conductor/worker/worker.rb', line 97

def self.define(task_definition_name, **options, &block)
  worker = new(task_definition_name, nil, **options, &block)
  WorkerRegistry.register(task_definition_name, block, options)
  worker
end

Instance Method Details

#execute(task) ⇒ TaskResult, TaskInProgress

Execute a task Handles keyword argument mapping and various return types

Parameters:

  • task (Task)

    The task to execute

Returns:



80
81
82
83
84
85
86
87
88
89
# File 'lib/conductor/worker/worker.rb', line 80

def execute(task)
  # Convert task if needed
  task_obj = task.is_a?(Http::Models::Task) ? task : Http::Models::Task.from_hash(task)

  # Call the execute function with appropriate arguments
  output = call_execute_function(task_obj)

  # Handle different return types
  convert_output_to_result(output, task_obj)
end

#polling_interval_secondsFloat

Get polling interval in seconds

Returns:

  • (Float)


72
73
74
# File 'lib/conductor/worker/worker.rb', line 72

def polling_interval_seconds
  @poll_interval / 1000.0
end

#task_typeObject

Alias for task_definition_name (compatibility)



66
67
68
# File 'lib/conductor/worker/worker.rb', line 66

def task_type
  @task_definition_name
end