Class: Conductor::Worker::Worker
- Inherits:
-
Object
- Object
- Conductor::Worker::Worker
- Defined in:
- lib/conductor/worker/worker.rb
Overview
Worker class that wraps an execute function Handles various return types and keyword argument mapping
Constant Summary collapse
- DEFAULTS =
Default configuration values
{ poll_interval: 100, # milliseconds thread_count: 1, domain: nil, worker_id: nil, poll_timeout: 100, # milliseconds register_task_def: false, overwrite_task_def: true, strict_schema: false, paused: false, isolation: :thread, executor: :thread_pool }.freeze
Instance Attribute Summary collapse
-
#domain ⇒ Object
Configuration attributes.
-
#execute_function ⇒ Proc, ...
readonly
Function to execute tasks.
-
#executor ⇒ Object
Configuration attributes.
-
#isolation ⇒ Object
Configuration attributes.
-
#overwrite_task_def ⇒ Object
Configuration attributes.
-
#paused ⇒ Object
Configuration attributes.
-
#poll_interval ⇒ Object
Configuration attributes.
-
#poll_timeout ⇒ Object
Configuration attributes.
-
#register_task_def ⇒ Object
Configuration attributes.
-
#strict_schema ⇒ Object
Configuration attributes.
-
#task_def_template ⇒ Object
Configuration attributes.
-
#task_definition_name ⇒ String
readonly
Task definition name in Conductor.
-
#thread_count ⇒ Object
Configuration attributes.
-
#worker_id ⇒ Object
Configuration attributes.
Class Method Summary collapse
-
.define(task_definition_name, **options) {|task| ... } ⇒ Worker
Define a worker using a block Registers the worker in the global registry.
Instance Method Summary collapse
-
#execute(task) ⇒ TaskResult, TaskInProgress
Execute a task Handles keyword argument mapping and various return types.
-
#initialize(task_definition_name, execute_function = nil, **options) {|task| ... } ⇒ Worker
constructor
Initialize a worker.
-
#polling_interval_seconds ⇒ Float
Get polling interval in seconds.
-
#task_type ⇒ Object
Alias for task_definition_name (compatibility).
Constructor Details
#initialize(task_definition_name, execute_function = nil, **options) {|task| ... } ⇒ Worker
Initialize a worker
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/conductor/worker/worker.rb', line 47 def initialize(task_definition_name, execute_function = nil, **, &block) @task_definition_name = task_definition_name @execute_function = execute_function || block raise ArgumentError, 'execute_function or block required' unless @execute_function # Apply options with defaults DEFAULTS.each do |key, default| value = .key?(key) ? [key] : default send("#{key}=", value) end @task_def_template = [:task_def_template] # Analyze the execute function for parameter mapping @takes_task_object = analyze_execute_function end |
Instance Attribute Details
#domain ⇒ Object
Configuration attributes
22 23 24 |
# File 'lib/conductor/worker/worker.rb', line 22 def domain @domain end |
#execute_function ⇒ Proc, ... (readonly)
Returns Function to execute tasks.
19 20 21 |
# File 'lib/conductor/worker/worker.rb', line 19 def execute_function @execute_function end |
#executor ⇒ Object
Configuration attributes
22 23 24 |
# File 'lib/conductor/worker/worker.rb', line 22 def executor @executor end |
#isolation ⇒ Object
Configuration attributes
22 23 24 |
# File 'lib/conductor/worker/worker.rb', line 22 def isolation @isolation end |
#overwrite_task_def ⇒ Object
Configuration attributes
22 23 24 |
# File 'lib/conductor/worker/worker.rb', line 22 def overwrite_task_def @overwrite_task_def end |
#paused ⇒ Object
Configuration attributes
22 23 24 |
# File 'lib/conductor/worker/worker.rb', line 22 def paused @paused end |
#poll_interval ⇒ Object
Configuration attributes
22 23 24 |
# File 'lib/conductor/worker/worker.rb', line 22 def poll_interval @poll_interval end |
#poll_timeout ⇒ Object
Configuration attributes
22 23 24 |
# File 'lib/conductor/worker/worker.rb', line 22 def poll_timeout @poll_timeout end |
#register_task_def ⇒ Object
Configuration attributes
22 23 24 |
# File 'lib/conductor/worker/worker.rb', line 22 def register_task_def @register_task_def end |
#strict_schema ⇒ Object
Configuration attributes
22 23 24 |
# File 'lib/conductor/worker/worker.rb', line 22 def strict_schema @strict_schema end |
#task_def_template ⇒ Object
Configuration attributes
22 23 24 |
# File 'lib/conductor/worker/worker.rb', line 22 def task_def_template @task_def_template end |
#task_definition_name ⇒ String (readonly)
Returns Task definition name in Conductor.
16 17 18 |
# File 'lib/conductor/worker/worker.rb', line 16 def task_definition_name @task_definition_name end |
#thread_count ⇒ Object
Configuration attributes
22 23 24 |
# File 'lib/conductor/worker/worker.rb', line 22 def thread_count @thread_count end |
#worker_id ⇒ Object
Configuration attributes
22 23 24 |
# File 'lib/conductor/worker/worker.rb', line 22 def worker_id @worker_id end |
Class Method Details
.define(task_definition_name, **options) {|task| ... } ⇒ Worker
Define a worker using a block Registers the worker in the global registry
97 98 99 100 101 |
# File 'lib/conductor/worker/worker.rb', line 97 def self.define(task_definition_name, **, &block) worker = new(task_definition_name, nil, **, &block) WorkerRegistry.register(task_definition_name, block, ) worker end |
Instance Method Details
#execute(task) ⇒ TaskResult, TaskInProgress
Execute a task Handles keyword argument mapping and various return types
80 81 82 83 84 85 86 87 88 89 |
# File 'lib/conductor/worker/worker.rb', line 80 def execute(task) # Convert task if needed task_obj = task.is_a?(Http::Models::Task) ? task : Http::Models::Task.from_hash(task) # Call the execute function with appropriate arguments output = call_execute_function(task_obj) # Handle different return types convert_output_to_result(output, task_obj) end |
#polling_interval_seconds ⇒ Float
Get polling interval in seconds
72 73 74 |
# File 'lib/conductor/worker/worker.rb', line 72 def polling_interval_seconds @poll_interval / 1000.0 end |
#task_type ⇒ Object
Alias for task_definition_name (compatibility)
66 67 68 |
# File 'lib/conductor/worker/worker.rb', line 66 def task_type @task_definition_name end |