Class: Conductor::Workflow::Dsl::WorkflowBuilder
- Inherits:
-
Object
- Object
- Conductor::Workflow::Dsl::WorkflowBuilder
- Defined in:
- lib/conductor/workflow/dsl/workflow_builder.rb
Overview
WorkflowBuilder is the core DSL engine for building Conductor workflows. It provides Ruby-idiomatic methods for defining tasks and control flow.
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#tasks ⇒ Object
readonly
Returns the value of attribute tasks.
-
#version ⇒ Integer?
readonly
Get the workflow version.
Instance Method Summary collapse
-
#call_mcp_tool(task_name, mcp_server:, method:, arguments: nil, headers: nil, **options) ⇒ TaskRef
Add a CALL_MCP_TOOL task (invoke a tool on MCP server).
-
#decide(expression) { ... } ⇒ TaskRef
Create a switch/decision block.
- #description(text = nil) ⇒ Object
-
#dynamic(task_name, dynamic_task_param:, **inputs) ⇒ TaskRef
Add a DYNAMIC task (task name determined at runtime).
-
#dynamic_fork(task_name, tasks_param:, tasks_input_param:) ⇒ TaskRef
Add a DYNAMIC fork task (parallel tasks determined at runtime).
-
#event(task_name, sink:, **inputs) ⇒ TaskRef
Add an EVENT task.
- #failure_workflow(name) ⇒ Object
-
#generate_audio(task_name, provider:, model:, text: nil, voice: nil, speed: nil, response_format: nil, n: 1, **options) ⇒ TaskRef
Add a GENERATE_AUDIO task (text-to-speech).
-
#generate_image(task_name, provider:, model:, prompt:, size: nil, **options) ⇒ TaskRef
Add a GENERATE_IMAGE task.
-
#get_document(task_name, url:, media_type:) ⇒ TaskRef
Add a GET_DOCUMENT task (retrieve and parse a document from URL).
-
#http(task_name, url:, method: :get, body: nil, headers: nil, **options) ⇒ TaskRef
Add an HTTP task.
-
#http_poll(task_name, url:, method: :get, body: nil, headers: nil, termination_condition: nil, polling_interval: 60, polling_strategy: 'FIXED') ⇒ TaskRef
Add an HTTP_POLL task (polls HTTP endpoint until condition is met).
-
#human(task_name, assignee: nil, display_name: nil, **inputs) ⇒ TaskRef
Add a HUMAN task.
-
#initialize(name, version: nil, description: nil, executor: nil) ⇒ WorkflowBuilder
constructor
A new instance of WorkflowBuilder.
-
#inline_workflow(task_name, version: 1) { ... } ⇒ TaskRef
Define an inline sub-workflow that executes as a SUB_WORKFLOW task.
-
#javascript(task_name, script:, **bindings) ⇒ TaskRef
Add an INLINE (JavaScript) task.
-
#jq(task_name, query:, **inputs) ⇒ TaskRef
Add a JSON_JQ_TRANSFORM task.
-
#kafka_publish(task_name, topic:, value:, key: nil, headers: nil) ⇒ TaskRef
Add a KAFKA_PUBLISH task.
-
#list_mcp_tools(task_name, mcp_server:, headers: nil, **options) ⇒ TaskRef
Add a LIST_MCP_TOOLS task (list available tools from MCP server).
-
#llm_chat(task_name, provider:, model:, messages: nil, temperature: nil, top_p: nil, stop_words: nil, max_tokens: nil, **options) ⇒ TaskRef
Add an LLM_CHAT_COMPLETE task.
-
#llm_complete(task_name, provider:, model:, prompt:, temperature: nil, max_tokens: nil, **options) ⇒ TaskRef
Add an LLM_TEXT_COMPLETE task.
-
#llm_embed(task_name, provider:, model:, text:, **options) ⇒ TaskRef
Add an LLM_GENERATE_EMBEDDINGS task.
-
#llm_get_embeddings(task_name, vector_db:, index:, ids:, namespace: nil, **options) ⇒ TaskRef
Add an LLM_GET_EMBEDDINGS task (retrieve stored embeddings).
-
#llm_index(task_name, vector_db:, namespace:, index:, embeddings:, doc_id: nil, **options) ⇒ TaskRef
Add an LLM_INDEX_TEXT task.
-
#llm_search(task_name, vector_db:, namespace:, index:, query_embeddings:, top_k: 10, **options) ⇒ TaskRef
Add an LLM_SEARCH_INDEX task.
-
#llm_search_embeddings(task_name, vector_db:, index:, embeddings:, namespace: nil, max_results: 1, embedding_model: nil, embedding_model_provider: nil, **options) ⇒ TaskRef
Add an LLM_SEARCH_EMBEDDINGS task (search vector database by embeddings).
-
#llm_store_embeddings(task_name, vector_db:, index:, embeddings:, namespace: nil, id: nil, metadata: nil, embedding_model: nil, embedding_model_provider: nil, **options) ⇒ TaskRef
Add an LLM_STORE_EMBEDDINGS task (store vectors in a vector database).
-
#loop_over(items) { ... } ⇒ TaskRef
Create a loop that iterates over items in an array.
-
#loop_times(count) { ... } ⇒ TaskRef
Create a loop that executes N times.
-
#loop_while(condition) { ... } ⇒ TaskRef
Create a loop with a custom condition.
-
#output(**params) ⇒ Object
Define workflow output parameters.
- #owner_email(email) ⇒ Object
-
#parallel { ... } ⇒ TaskRef
Create a parallel execution block (FORK_JOIN).
- #restartable(value) ⇒ Object
-
#set(**variables) ⇒ TaskRef
Add a SET_VARIABLE task.
-
#set_version(v) ⇒ Object
Configure workflow metadata.
-
#simple(task_name, **inputs) ⇒ TaskRef
Add a SIMPLE task (worker task).
-
#start_workflow(task_name, workflow:, version: nil, **inputs) ⇒ TaskRef
Add a START_WORKFLOW task (fire-and-forget, does not wait for completion).
-
#sub_workflow(task_name, workflow:, version: nil, **inputs) ⇒ TaskRef
Add a SUB_WORKFLOW task.
-
#terminate(status, reason, **options) ⇒ TaskRef
Add a TERMINATE task.
- #timeout(seconds) ⇒ Object
-
#to_workflow_def ⇒ Conductor::Http::Models::WorkflowDef
Convert the builder to a WorkflowDef model.
-
#wait(seconds = nil, until_time: nil, **options) ⇒ TaskRef
Add a WAIT task.
-
#wait_for_webhook(task_name, matches: {}) ⇒ TaskRef
Add a WAIT_FOR_WEBHOOK task (waits for external webhook callback).
-
#wf ⇒ InputRef
Returns the workflow input proxy for accessing workflow inputs.
-
#when_false(condition) { ... } ⇒ TaskRef
Create a branch that only executes if a condition is false.
-
#when_true(condition) { ... } ⇒ TaskRef
Create a branch that only executes if a condition is true.
Constructor Details
#initialize(name, version: nil, description: nil, executor: nil) ⇒ WorkflowBuilder
Returns a new instance of WorkflowBuilder.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 19 def initialize(name, version: nil, description: nil, executor: nil) @name = name @version = version @description = description @executor = executor @tasks = [] @output_params = {} @input_params = [] @ref_counter = Hash.new(0) @timeout_seconds = 60 @owner_email = nil @restartable = true @failure_workflow = nil end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
17 18 19 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 17 def name @name end |
#tasks ⇒ Object (readonly)
Returns the value of attribute tasks.
17 18 19 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 17 def tasks @tasks end |
#version ⇒ Integer? (readonly)
Get the workflow version
36 37 38 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 36 def version @version end |
Instance Method Details
#call_mcp_tool(task_name, mcp_server:, method:, arguments: nil, headers: nil, **options) ⇒ TaskRef
Add a CALL_MCP_TOOL task (invoke a tool on MCP server)
603 604 605 606 607 608 609 610 611 612 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 603 def call_mcp_tool(task_name, mcp_server:, method:, arguments: nil, headers: nil, **) inputs = { 'mcpServer' => mcp_server, 'method' => method, 'arguments' => resolve_hash(arguments || {}) } inputs['headers'] = resolve_hash(headers) if headers add_task(task_name, TaskType::CALL_MCP_TOOL, inputs, ) end |
#decide(expression) { ... } ⇒ TaskRef
Create a switch/decision block
655 656 657 658 659 660 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 655 def decide(expression, &block) builder = SwitchBuilder.new(resolve_value(expression), self) builder.instance_eval(&block) add_switch_task(builder) end |
#description(text = nil) ⇒ Object
52 53 54 55 56 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 52 def description(text = nil) return @description if text.nil? @description = text end |
#dynamic(task_name, dynamic_task_param:, **inputs) ⇒ TaskRef
Add a DYNAMIC task (task name determined at runtime)
293 294 295 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 293 def dynamic(task_name, dynamic_task_param:, **inputs) add_task(task_name, TaskType::DYNAMIC, inputs, { dynamic_task_name_param: dynamic_task_param }) end |
#dynamic_fork(task_name, tasks_param:, tasks_input_param:) ⇒ TaskRef
Add a DYNAMIC fork task (parallel tasks determined at runtime)
306 307 308 309 310 311 312 313 314 315 316 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 306 def dynamic_fork(task_name, tasks_param:, tasks_input_param:) add_task( task_name, TaskType::FORK_JOIN_DYNAMIC, {}, { dynamic_fork_tasks_param: resolve_value(tasks_param), dynamic_fork_tasks_input_param: resolve_value(tasks_input_param) } ) end |
#event(task_name, sink:, **inputs) ⇒ TaskRef
Add an EVENT task
201 202 203 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 201 def event(task_name, sink:, **inputs) add_task(task_name, TaskType::EVENT, inputs, { sink: sink }) end |
#failure_workflow(name) ⇒ Object
70 71 72 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 70 def failure_workflow(name) @failure_workflow = name end |
#generate_audio(task_name, provider:, model:, text: nil, voice: nil, speed: nil, response_format: nil, n: 1, **options) ⇒ TaskRef
Add a GENERATE_AUDIO task (text-to-speech)
481 482 483 484 485 486 487 488 489 490 491 492 493 494 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 481 def generate_audio(task_name, provider:, model:, text: nil, voice: nil, speed: nil, response_format: nil, n: 1, **) inputs = { 'llmProvider' => provider, 'model' => model, 'n' => n } inputs['text'] = resolve_value(text) if text inputs['voice'] = voice if voice inputs['speed'] = speed if speed inputs['responseFormat'] = response_format if response_format add_task(task_name, TaskType::GENERATE_AUDIO, inputs, ) end |
#generate_image(task_name, provider:, model:, prompt:, size: nil, **options) ⇒ TaskRef
Add a GENERATE_IMAGE task
457 458 459 460 461 462 463 464 465 466 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 457 def generate_image(task_name, provider:, model:, prompt:, size: nil, **) inputs = { 'llmProvider' => provider, 'model' => model, 'prompt' => resolve_value(prompt) } inputs['size'] = size if size add_task(task_name, TaskType::GENERATE_IMAGE, inputs, ) end |
#get_document(task_name, url:, media_type:) ⇒ TaskRef
Add a GET_DOCUMENT task (retrieve and parse a document from URL)
325 326 327 328 329 330 331 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 325 def get_document(task_name, url:, media_type:) inputs = { 'url' => resolve_value(url), 'mediaType' => media_type } add_task(task_name, TaskType::GET_DOCUMENT, inputs, {}) end |
#http(task_name, url:, method: :get, body: nil, headers: nil, **options) ⇒ TaskRef
Add an HTTP task
102 103 104 105 106 107 108 109 110 111 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 102 def http(task_name, url:, method: :get, body: nil, headers: nil, **) http_request = { 'uri' => resolve_value(url), 'method' => method.to_s.upcase } http_request['body'] = resolve_value(body) if body http_request['headers'] = resolve_value(headers) if headers add_task(task_name, TaskType::HTTP, { 'http_request' => http_request }, ) end |
#http_poll(task_name, url:, method: :get, body: nil, headers: nil, termination_condition: nil, polling_interval: 60, polling_strategy: 'FIXED') ⇒ TaskRef
Add an HTTP_POLL task (polls HTTP endpoint until condition is met)
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 267 def http_poll(task_name, url:, method: :get, body: nil, headers: nil, termination_condition: nil, polling_interval: 60, polling_strategy: 'FIXED') http_request = { 'uri' => resolve_value(url), 'method' => method.to_s.upcase } http_request['body'] = resolve_value(body) if body http_request['headers'] = resolve_value(headers) if headers inputs = { 'http_request' => http_request, 'pollingInterval' => polling_interval, 'pollingStrategy' => polling_strategy } inputs['terminationCondition'] = termination_condition if termination_condition add_task(task_name, TaskType::HTTP_POLL, inputs, {}) end |
#human(task_name, assignee: nil, display_name: nil, **inputs) ⇒ TaskRef
Add a HUMAN task
163 164 165 166 167 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 163 def human(task_name, assignee: nil, display_name: nil, **inputs) inputs = inputs.merge('assignee' => assignee) if assignee inputs = inputs.merge('displayName' => display_name) if display_name add_task(task_name, TaskType::HUMAN, inputs, {}) end |
#inline_workflow(task_name, version: 1) { ... } ⇒ TaskRef
Define an inline sub-workflow that executes as a SUB_WORKFLOW task
722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 722 def inline_workflow(task_name, version: 1, &block) # Create a nested builder for the inline workflow inline_builder = WorkflowBuilder.new( "#{@name}_#{task_name}_inline", version: version ) inline_builder.instance_eval(&block) # Get the workflow def from the inline builder inline_def = inline_builder.to_workflow_def add_task( task_name, TaskType::SUB_WORKFLOW, {}, { inline_workflow_def: inline_def } ) end |
#javascript(task_name, script:, **bindings) ⇒ TaskRef
Add an INLINE (JavaScript) task
182 183 184 185 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 182 def javascript(task_name, script:, **bindings) inputs = resolve_hash(bindings) add_task(task_name, TaskType::INLINE, inputs, { expression: script, evaluator_type: 'javascript' }) end |
#jq(task_name, query:, **inputs) ⇒ TaskRef
Add a JSON_JQ_TRANSFORM task
192 193 194 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 192 def jq(task_name, query:, **inputs) add_task(task_name, TaskType::JSON_JQ_TRANSFORM, inputs, { query_expression: query }) end |
#kafka_publish(task_name, topic:, value:, key: nil, headers: nil) ⇒ TaskRef
Add a KAFKA_PUBLISH task
212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 212 def kafka_publish(task_name, topic:, value:, key: nil, headers: nil) inputs = { 'kafka_request' => { 'topic' => topic, 'value' => resolve_value(value) } } inputs['kafka_request']['key'] = key if key inputs['kafka_request']['headers'] = resolve_value(headers) if headers add_task(task_name, TaskType::KAFKA_PUBLISH, inputs, {}) end |
#list_mcp_tools(task_name, mcp_server:, headers: nil, **options) ⇒ TaskRef
Add a LIST_MCP_TOOLS task (list available tools from MCP server)
585 586 587 588 589 590 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 585 def list_mcp_tools(task_name, mcp_server:, headers: nil, **) inputs = { 'mcpServer' => mcp_server } inputs['headers'] = resolve_hash(headers) if headers add_task(task_name, TaskType::LIST_MCP_TOOLS, inputs, ) end |
#llm_chat(task_name, provider:, model:, messages: nil, temperature: nil, top_p: nil, stop_words: nil, max_tokens: nil, **options) ⇒ TaskRef
Add an LLM_CHAT_COMPLETE task
348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 348 def llm_chat(task_name, provider:, model:, messages: nil, temperature: nil, top_p: nil, stop_words: nil, max_tokens: nil, **) = &.map do |msg| if msg.is_a?(Hash) Conductor::Workflow::Llm::ChatMessage.new(**msg) else msg end end inputs = { 'llmProvider' => provider, 'model' => model } inputs['messages'] = .map(&:to_h) if inputs['temperature'] = temperature if temperature inputs['topP'] = top_p if top_p inputs['stopWords'] = stop_words if stop_words inputs['maxTokens'] = max_tokens if max_tokens add_task(task_name, TaskType::LLM_CHAT_COMPLETE, inputs, ) end |
#llm_complete(task_name, provider:, model:, prompt:, temperature: nil, max_tokens: nil, **options) ⇒ TaskRef
Add an LLM_TEXT_COMPLETE task
380 381 382 383 384 385 386 387 388 389 390 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 380 def llm_complete(task_name, provider:, model:, prompt:, temperature: nil, max_tokens: nil, **) inputs = { 'llmProvider' => provider, 'model' => model, 'prompt' => resolve_value(prompt) } inputs['temperature'] = temperature if temperature inputs['maxTokens'] = max_tokens if max_tokens add_task(task_name, TaskType::LLM_TEXT_COMPLETE, inputs, ) end |
#llm_embed(task_name, provider:, model:, text:, **options) ⇒ TaskRef
Add an LLM_GENERATE_EMBEDDINGS task
399 400 401 402 403 404 405 406 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 399 def (task_name, provider:, model:, text:, **) inputs = { 'llmProvider' => provider, 'model' => model, 'text' => resolve_value(text) } add_task(task_name, TaskType::LLM_GENERATE_EMBEDDINGS, inputs, ) end |
#llm_get_embeddings(task_name, vector_db:, index:, ids:, namespace: nil, **options) ⇒ TaskRef
Add an LLM_GET_EMBEDDINGS task (retrieve stored embeddings)
566 567 568 569 570 571 572 573 574 575 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 566 def (task_name, vector_db:, index:, ids:, namespace: nil, **) inputs = { 'vectorDB' => vector_db, 'index' => index, 'ids' => resolve_value(ids) } inputs['namespace'] = namespace if namespace add_task(task_name, TaskType::LLM_GET_EMBEDDINGS, inputs, ) end |
#llm_index(task_name, vector_db:, namespace:, index:, embeddings:, doc_id: nil, **options) ⇒ TaskRef
Add an LLM_INDEX_TEXT task
417 418 419 420 421 422 423 424 425 426 427 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 417 def llm_index(task_name, vector_db:, namespace:, index:, embeddings:, doc_id: nil, **) inputs = { 'vectorDB' => vector_db, 'namespace' => namespace, 'index' => index, 'embeddingModelProvider' => resolve_value() } inputs['docId'] = resolve_value(doc_id) if doc_id add_task(task_name, TaskType::LLM_INDEX_TEXT, inputs, ) end |
#llm_search(task_name, vector_db:, namespace:, index:, query_embeddings:, top_k: 10, **options) ⇒ TaskRef
Add an LLM_SEARCH_INDEX task
438 439 440 441 442 443 444 445 446 447 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 438 def llm_search(task_name, vector_db:, namespace:, index:, query_embeddings:, top_k: 10, **) inputs = { 'vectorDB' => vector_db, 'namespace' => namespace, 'index' => index, 'queryEmbeddings' => resolve_value(), 'k' => top_k } add_task(task_name, TaskType::LLM_SEARCH_INDEX, inputs, ) end |
#llm_search_embeddings(task_name, vector_db:, index:, embeddings:, namespace: nil, max_results: 1, embedding_model: nil, embedding_model_provider: nil, **options) ⇒ TaskRef
Add an LLM_SEARCH_EMBEDDINGS task (search vector database by embeddings)
542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 542 def (task_name, vector_db:, index:, embeddings:, namespace: nil, max_results: 1, embedding_model: nil, embedding_model_provider: nil, **) inputs = { 'vectorDB' => vector_db, 'index' => index, 'embeddings' => resolve_value(), 'maxResults' => max_results } inputs['namespace'] = namespace if namespace inputs['embeddingModel'] = if inputs['embeddingModelProvider'] = if add_task(task_name, TaskType::LLM_SEARCH_EMBEDDINGS, inputs, ) end |
#llm_store_embeddings(task_name, vector_db:, index:, embeddings:, namespace: nil, id: nil, metadata: nil, embedding_model: nil, embedding_model_provider: nil, **options) ⇒ TaskRef
Add an LLM_STORE_EMBEDDINGS task (store vectors in a vector database)
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 511 def (task_name, vector_db:, index:, embeddings:, namespace: nil, id: nil, metadata: nil, embedding_model: nil, embedding_model_provider: nil, **) inputs = { 'vectorDB' => vector_db, 'index' => index, 'embeddings' => resolve_value() } inputs['namespace'] = namespace if namespace inputs['id'] = resolve_value(id) if id inputs['metadata'] = resolve_hash() if inputs['embeddingModel'] = if inputs['embeddingModelProvider'] = if add_task(task_name, TaskType::LLM_STORE_EMBEDDINGS, inputs, ) end |
#loop_over(items) { ... } ⇒ TaskRef
Create a loop that iterates over items in an array
699 700 701 702 703 704 705 706 707 708 709 710 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 699 def loop_over(items, &block) # Set up the loop with array iteration pattern loop_tasks = [] collector = LoopCollector.new(self, loop_tasks) collector.instance_eval(&block) # Create a set variable task to track iteration items_expr = resolve_value(items) condition = '$.iteration_index < $.items.length()' add_do_while_task_with_items(condition, loop_tasks, items_expr) end |
#loop_times(count) { ... } ⇒ TaskRef
Create a loop that executes N times
670 671 672 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 670 def loop_times(count, &block) loop_while("$.loop_counter < #{count}", &block) end |
#loop_while(condition) { ... } ⇒ TaskRef
Create a loop with a custom condition
682 683 684 685 686 687 688 689 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 682 def loop_while(condition, &block) # Collect tasks in the loop body loop_tasks = [] collector = TaskCollector.new(self, loop_tasks) collector.instance_eval(&block) add_do_while_task(condition, loop_tasks) end |
#output(**params) ⇒ Object
Define workflow output parameters
78 79 80 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 78 def output(**params) @output_params.merge!(resolve_hash(params)) end |
#owner_email(email) ⇒ Object
62 63 64 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 62 def owner_email(email) @owner_email = email end |
#parallel { ... } ⇒ TaskRef
Create a parallel execution block (FORK_JOIN)
626 627 628 629 630 631 632 633 634 635 636 637 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 626 def parallel(&block) builder = ParallelBuilder.new(self) builder.instance_eval(&block) branches = builder.finalize # Create FORK_JOIN task fork_ref = add_fork_join_task(branches) # Create JOIN task join_on_refs = branches.map { |branch| branch.last.ref_name } add_join_task(join_on_refs) end |
#restartable(value) ⇒ Object
66 67 68 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 66 def restartable(value) @restartable = value end |
#set(**variables) ⇒ TaskRef
Add a SET_VARIABLE task
173 174 175 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 173 def set(**variables) add_task('set_variable', TaskType::SET_VARIABLE, resolve_hash(variables), {}) end |
#set_version(v) ⇒ Object
Configure workflow metadata
48 49 50 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 48 def set_version(v) @version = v end |
#simple(task_name, **inputs) ⇒ TaskRef
Add a SIMPLE task (worker task)
90 91 92 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 90 def simple(task_name, **inputs) add_task(task_name, TaskType::SIMPLE, inputs, {}) end |
#start_workflow(task_name, workflow:, version: nil, **inputs) ⇒ TaskRef
Add a START_WORKFLOW task (fire-and-forget, does not wait for completion)
233 234 235 236 237 238 239 240 241 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 233 def start_workflow(task_name, workflow:, version: nil, **inputs) start_workflow_input = { 'name' => workflow, 'input' => resolve_hash(inputs) } start_workflow_input['version'] = version if version add_task(task_name, TaskType::START_WORKFLOW, { 'startWorkflow' => start_workflow_input }, {}) end |
#sub_workflow(task_name, workflow:, version: nil, **inputs) ⇒ TaskRef
Add a SUB_WORKFLOW task
145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 145 def sub_workflow(task_name, workflow:, version: nil, **inputs) add_task( task_name, TaskType::SUB_WORKFLOW, inputs, { sub_workflow_name: workflow, sub_workflow_version: version } ) end |
#terminate(status, reason, **options) ⇒ TaskRef
Add a TERMINATE task
131 132 133 134 135 136 137 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 131 def terminate(status, reason, **) inputs = { 'terminationStatus' => status.to_s.upcase, 'terminationReason' => reason } add_task('terminate', TaskType::TERMINATE, inputs, ) end |
#timeout(seconds) ⇒ Object
58 59 60 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 58 def timeout(seconds) @timeout_seconds = seconds end |
#to_workflow_def ⇒ Conductor::Http::Models::WorkflowDef
Convert the builder to a WorkflowDef model
933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 933 def to_workflow_def workflow_tasks = convert_tasks_to_workflow_tasks Conductor::Http::Models::WorkflowDef.new( name: @name, version: @version, description: @description, tasks: workflow_tasks, input_parameters: @input_params, output_parameters: @output_params, timeout_seconds: @timeout_seconds, owner_email: @owner_email, restartable: @restartable, failure_workflow: @failure_workflow, schema_version: 2 ) end |
#wait(seconds = nil, until_time: nil, **options) ⇒ TaskRef
Add a WAIT task
118 119 120 121 122 123 124 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 118 def wait(seconds = nil, until_time: nil, **) inputs = {} inputs['duration'] = "#{seconds} seconds" if seconds inputs['until'] = until_time if until_time add_task('wait', TaskType::WAIT, inputs, ) end |
#wait_for_webhook(task_name, matches: {}) ⇒ TaskRef
Add a WAIT_FOR_WEBHOOK task (waits for external webhook callback)
249 250 251 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 249 def wait_for_webhook(task_name, matches: {}) add_task(task_name, TaskType::WAIT_FOR_WEBHOOK, { 'matches' => resolve_hash(matches) }, {}) end |
#wf ⇒ InputRef
Returns the workflow input proxy for accessing workflow inputs
43 44 45 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 43 def wf @wf ||= InputRef.new end |
#when_false(condition) { ... } ⇒ TaskRef
Create a branch that only executes if a condition is false
759 760 761 762 763 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 759 def when_false(condition, &block) decide condition do on 'false', &block end end |
#when_true(condition) { ... } ⇒ TaskRef
Create a branch that only executes if a condition is true
749 750 751 752 753 |
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 749 def when_true(condition, &block) decide condition do on 'true', &block end end |