Class: Conductor::Worker::TaskDefinitionRegistrar
- Inherits:
-
Object
- Object
- Conductor::Worker::TaskDefinitionRegistrar
- Defined in:
- lib/conductor/worker/task_definition_registrar.rb
Overview
TaskDefinitionRegistrar - Handles automatic task definition registration Generates JSON schemas from worker function signatures and registers task definitions with the Conductor server.
Instance Method Summary collapse
-
#initialize(configuration, logger: nil) ⇒ TaskDefinitionRegistrar
constructor
A new instance of TaskDefinitionRegistrar.
-
#register(worker) ⇒ Boolean
Register a task definition for a worker.
Constructor Details
#initialize(configuration, logger: nil) ⇒ TaskDefinitionRegistrar
Returns a new instance of TaskDefinitionRegistrar.
15 16 17 18 19 |
# File 'lib/conductor/worker/task_definition_registrar.rb', line 15 def initialize(configuration, logger: nil) @configuration = configuration @metadata_client = Client::MetadataClient.new(configuration) @logger = logger || Logger.new($stdout) end |
Instance Method Details
#register(worker) ⇒ Boolean
Register a task definition for a worker
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/conductor/worker/task_definition_registrar.rb', line 24 def register(worker) return false unless worker.register_task_def task_def = build_task_definition(worker) # Generate schemas if worker has typed parameters input_schema = generate_input_schema(worker) output_schema = generate_output_schema(worker) # Register schemas if available register_schemas(worker.task_definition_name, input_schema, output_schema) if input_schema || output_schema # Register or update task definition if worker.overwrite_task_def register_or_update_task_def(task_def) else register_if_not_exists(task_def) end @logger.info("Registered task definition: #{worker.task_definition_name}") true rescue StandardError => e @logger.warn("Failed to register task definition '#{worker.task_definition_name}': #{e.}") false end |