Class: Conductor::Workflow::Dsl::WorkflowBuilder

Inherits:
Object
  • Object
show all
Defined in:
lib/conductor/workflow/dsl/workflow_builder.rb

Overview

WorkflowBuilder is the core DSL engine for building Conductor workflows. It provides Ruby-idiomatic methods for defining tasks and control flow.

Examples:

Simple workflow

builder = WorkflowBuilder.new('my_workflow', version: 1)
user = builder.simple :get_user, user_id: builder.wf[:user_id]
builder.simple :send_email, email: user[:email]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, version: nil, description: nil, executor: nil) ⇒ WorkflowBuilder

Returns a new instance of WorkflowBuilder.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 19

def initialize(name, version: nil, description: nil, executor: nil)
  @name = name
  @version = version
  @description = description
  @executor = executor
  @tasks = []
  @output_params = {}
  @input_params = []
  @ref_counter = Hash.new(0)
  @timeout_seconds = 60
  @owner_email = nil
  @restartable = true
  @failure_workflow = nil
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



17
18
19
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 17

def name
  @name
end

#tasksObject (readonly)

Returns the value of attribute tasks.



17
18
19
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 17

def tasks
  @tasks
end

#versionInteger? (readonly)

Get the workflow version

Returns:

  • (Integer, nil)

    Workflow version



36
37
38
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 36

def version
  @version
end

Instance Method Details

#call_mcp_tool(task_name, mcp_server:, method:, arguments: nil, headers: nil, **options) ⇒ TaskRef

Add a CALL_MCP_TOOL task (invoke a tool on MCP server)

Examples:

call_mcp_tool :execute_tool, mcp_server: 'my-mcp-server',
              method: 'search', arguments: { query: 'test' }

Parameters:

  • task_name (Symbol, String)

    The task name

  • mcp_server (String)

    MCP server integration name

  • method (String)

    Tool method name

  • arguments (Hash, nil) (defaults to: nil)

    Arguments to pass to the tool

  • headers (Hash, nil) (defaults to: nil)

    Optional HTTP headers

  • options (Hash)

    Additional options

Returns:

  • (TaskRef)

    Reference to the created task



603
604
605
606
607
608
609
610
611
612
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 603

def call_mcp_tool(task_name, mcp_server:, method:, arguments: nil, headers: nil, **options)
  inputs = {
    'mcpServer' => mcp_server,
    'method' => method,
    'arguments' => resolve_hash(arguments || {})
  }
  inputs['headers'] = resolve_hash(headers) if headers

  add_task(task_name, TaskType::CALL_MCP_TOOL, inputs, options)
end

#decide(expression) { ... } ⇒ TaskRef

Create a switch/decision block

Examples:

decide user[:country] do
  on 'US' do
    simple :us_flow
  end
  on 'UK' do
    simple :uk_flow
  end
  otherwise do
    simple :default_flow
  end
end

Parameters:

  • expression (String, OutputRef)

    The expression to evaluate

Yields:

  • Block containing on/otherwise clauses

Returns:

  • (TaskRef)

    Reference to the SWITCH task



655
656
657
658
659
660
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 655

def decide(expression, &block)
  builder = SwitchBuilder.new(resolve_value(expression), self)
  builder.instance_eval(&block)

  add_switch_task(builder)
end

#description(text = nil) ⇒ Object



52
53
54
55
56
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 52

def description(text = nil)
  return @description if text.nil?

  @description = text
end

#dynamic(task_name, dynamic_task_param:, **inputs) ⇒ TaskRef

Add a DYNAMIC task (task name determined at runtime)

Examples:

dynamic :process, dynamic_task_param: '${decide_task_ref.output.taskName}'

Parameters:

  • task_name (Symbol, String)

    The base task name

  • dynamic_task_param (String)

    Expression for dynamic task name

  • inputs (Hash)

    Input parameters

Returns:

  • (TaskRef)

    Reference to the created task



293
294
295
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 293

def dynamic(task_name, dynamic_task_param:, **inputs)
  add_task(task_name, TaskType::DYNAMIC, inputs, { dynamic_task_name_param: dynamic_task_param })
end

#dynamic_fork(task_name, tasks_param:, tasks_input_param:) ⇒ TaskRef

Add a DYNAMIC fork task (parallel tasks determined at runtime)

Examples:

dynamic_fork :parallel_process,
             tasks_param: generator[:tasks],
             tasks_input_param: generator[:inputs]

Parameters:

  • task_name (Symbol, String)

    The task name

  • tasks_param (String, OutputRef)

    Expression for dynamic tasks array

  • tasks_input_param (String, OutputRef)

    Expression for task inputs

Returns:

  • (TaskRef)

    Reference to the created task



306
307
308
309
310
311
312
313
314
315
316
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 306

def dynamic_fork(task_name, tasks_param:, tasks_input_param:)
  add_task(
    task_name,
    TaskType::FORK_JOIN_DYNAMIC,
    {},
    {
      dynamic_fork_tasks_param: resolve_value(tasks_param),
      dynamic_fork_tasks_input_param: resolve_value(tasks_input_param)
    }
  )
end

#event(task_name, sink:, **inputs) ⇒ TaskRef

Add an EVENT task

Parameters:

  • task_name (Symbol, String)

    The task name

  • sink (String)

    Event sink name

  • inputs (Hash)

    Event payload

Returns:

  • (TaskRef)

    Reference to the created task



201
202
203
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 201

def event(task_name, sink:, **inputs)
  add_task(task_name, TaskType::EVENT, inputs, { sink: sink })
end

#failure_workflow(name) ⇒ Object



70
71
72
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 70

def failure_workflow(name)
  @failure_workflow = name
end

#generate_audio(task_name, provider:, model:, text: nil, voice: nil, speed: nil, response_format: nil, n: 1, **options) ⇒ TaskRef

Add a GENERATE_AUDIO task (text-to-speech)

Examples:

generate_audio :tts, provider: 'openai', model: 'tts-1', text: 'Hello world', voice: 'alloy'

Parameters:

  • task_name (Symbol, String)

    The task name

  • provider (String)

    LLM provider integration name

  • model (String)

    Audio generation model name

  • text (String, nil) (defaults to: nil)

    Text to convert to audio

  • voice (String, nil) (defaults to: nil)

    Voice selection

  • speed (Float, nil) (defaults to: nil)

    Playback speed

  • response_format (String, nil) (defaults to: nil)

    Output audio format

  • n (Integer) (defaults to: 1)

    Number of outputs (default: 1)

  • options (Hash)

    Additional options

Returns:

  • (TaskRef)

    Reference to the created task



481
482
483
484
485
486
487
488
489
490
491
492
493
494
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 481

def generate_audio(task_name, provider:, model:, text: nil, voice: nil, speed: nil,
                   response_format: nil, n: 1, **options)
  inputs = {
    'llmProvider' => provider,
    'model' => model,
    'n' => n
  }
  inputs['text'] = resolve_value(text) if text
  inputs['voice'] = voice if voice
  inputs['speed'] = speed if speed
  inputs['responseFormat'] = response_format if response_format

  add_task(task_name, TaskType::GENERATE_AUDIO, inputs, options)
end

#generate_image(task_name, provider:, model:, prompt:, size: nil, **options) ⇒ TaskRef

Add a GENERATE_IMAGE task

Parameters:

  • task_name (Symbol, String)

    The task name

  • provider (String)

    Image generation provider

  • model (String)

    Model name

  • prompt (String)

    Image generation prompt

  • size (String, nil) (defaults to: nil)

    Image size (e.g., '1024x1024')

  • options (Hash)

    Additional options

Returns:

  • (TaskRef)

    Reference to the created task



457
458
459
460
461
462
463
464
465
466
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 457

def generate_image(task_name, provider:, model:, prompt:, size: nil, **options)
  inputs = {
    'llmProvider' => provider,
    'model' => model,
    'prompt' => resolve_value(prompt)
  }
  inputs['size'] = size if size

  add_task(task_name, TaskType::GENERATE_IMAGE, inputs, options)
end

#get_document(task_name, url:, media_type:) ⇒ TaskRef

Add a GET_DOCUMENT task (retrieve and parse a document from URL)

Examples:

get_document :fetch_pdf, url: 'https://example.com/doc.pdf', media_type: 'application/pdf'

Parameters:

  • task_name (Symbol, String)

    The task name

  • url (String)

    URL of the document

  • media_type (String)

    MIME type of the document

Returns:

  • (TaskRef)

    Reference to the created task



325
326
327
328
329
330
331
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 325

def get_document(task_name, url:, media_type:)
  inputs = {
    'url' => resolve_value(url),
    'mediaType' => media_type
  }
  add_task(task_name, TaskType::GET_DOCUMENT, inputs, {})
end

#http(task_name, url:, method: :get, body: nil, headers: nil, **options) ⇒ TaskRef

Add an HTTP task

Parameters:

  • task_name (Symbol, String)

    The task name

  • url (String, OutputRef)

    The URL to call

  • method (Symbol, String) (defaults to: :get)

    HTTP method (:get, :post, :put, :delete, etc.)

  • body (Hash, String, nil) (defaults to: nil)

    Request body

  • headers (Hash, nil) (defaults to: nil)

    Request headers

  • options (Hash)

    Additional options (optional, start_delay, etc.)

Returns:

  • (TaskRef)

    Reference to the created task



102
103
104
105
106
107
108
109
110
111
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 102

def http(task_name, url:, method: :get, body: nil, headers: nil, **options)
  http_request = {
    'uri' => resolve_value(url),
    'method' => method.to_s.upcase
  }
  http_request['body'] = resolve_value(body) if body
  http_request['headers'] = resolve_value(headers) if headers

  add_task(task_name, TaskType::HTTP, { 'http_request' => http_request }, options)
end

#http_poll(task_name, url:, method: :get, body: nil, headers: nil, termination_condition: nil, polling_interval: 60, polling_strategy: 'FIXED') ⇒ TaskRef

Add an HTTP_POLL task (polls HTTP endpoint until condition is met)

Examples:

http_poll :check_status, url: 'https://api.example.com/status',
          termination_condition: '$.response.body.status == "complete"',
          polling_interval: 30

Parameters:

  • task_name (Symbol, String)

    The task name

  • url (String)

    The URL to poll

  • method (Symbol, String) (defaults to: :get)

    HTTP method (default: :get)

  • body (Hash, String, nil) (defaults to: nil)

    Request body

  • headers (Hash, nil) (defaults to: nil)

    Request headers

  • termination_condition (String) (defaults to: nil)

    JavaScript condition for when to stop polling

  • polling_interval (Integer) (defaults to: 60)

    Polling interval in seconds (default: 60)

  • polling_strategy (String) (defaults to: 'FIXED')

    'FIXED' or 'LINEAR_BACKOFF' (default: 'FIXED')

Returns:

  • (TaskRef)

    Reference to the created task



267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 267

def http_poll(task_name, url:, method: :get, body: nil, headers: nil,
              termination_condition: nil, polling_interval: 60, polling_strategy: 'FIXED')
  http_request = {
    'uri' => resolve_value(url),
    'method' => method.to_s.upcase
  }
  http_request['body'] = resolve_value(body) if body
  http_request['headers'] = resolve_value(headers) if headers

  inputs = {
    'http_request' => http_request,
    'pollingInterval' => polling_interval,
    'pollingStrategy' => polling_strategy
  }
  inputs['terminationCondition'] = termination_condition if termination_condition

  add_task(task_name, TaskType::HTTP_POLL, inputs, {})
end

#human(task_name, assignee: nil, display_name: nil, **inputs) ⇒ TaskRef

Add a HUMAN task

Parameters:

  • task_name (Symbol, String)

    The task name

  • assignee (String, nil) (defaults to: nil)

    Email or ID of the assignee

  • display_name (String, nil) (defaults to: nil)

    Display name for the task

  • inputs (Hash)

    Input parameters

Returns:

  • (TaskRef)

    Reference to the created task



163
164
165
166
167
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 163

def human(task_name, assignee: nil, display_name: nil, **inputs)
  inputs = inputs.merge('assignee' => assignee) if assignee
  inputs = inputs.merge('displayName' => display_name) if display_name
  add_task(task_name, TaskType::HUMAN, inputs, {})
end

#inline_workflow(task_name, version: 1) { ... } ⇒ TaskRef

Define an inline sub-workflow that executes as a SUB_WORKFLOW task

Examples:

inline_workflow :process_order do
  validate = simple :validate
  simple :process, data: validate[:result]
end

Parameters:

  • task_name (Symbol, String)

    The task name for the sub-workflow

  • version (Integer) (defaults to: 1)

    Version of the inline workflow (default: 1)

Yields:

  • Block containing the sub-workflow definition

Returns:

  • (TaskRef)

    Reference to the SUB_WORKFLOW task



722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 722

def inline_workflow(task_name, version: 1, &block)
  # Create a nested builder for the inline workflow
  inline_builder = WorkflowBuilder.new(
    "#{@name}_#{task_name}_inline",
    version: version
  )
  inline_builder.instance_eval(&block)

  # Get the workflow def from the inline builder
  inline_def = inline_builder.to_workflow_def

  add_task(
    task_name,
    TaskType::SUB_WORKFLOW,
    {},
    { inline_workflow_def: inline_def }
  )
end

#javascript(task_name, script:, **bindings) ⇒ TaskRef

Add an INLINE (JavaScript) task

Parameters:

  • task_name (Symbol, String)

    The task name

  • script (String)

    JavaScript code to execute

  • bindings (Hash)

    Variable bindings for the script

Returns:

  • (TaskRef)

    Reference to the created task



182
183
184
185
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 182

def javascript(task_name, script:, **bindings)
  inputs = resolve_hash(bindings)
  add_task(task_name, TaskType::INLINE, inputs, { expression: script, evaluator_type: 'javascript' })
end

#jq(task_name, query:, **inputs) ⇒ TaskRef

Add a JSON_JQ_TRANSFORM task

Parameters:

  • task_name (Symbol, String)

    The task name

  • query (String)

    JQ query expression

  • inputs (Hash)

    Input data to transform

Returns:

  • (TaskRef)

    Reference to the created task



192
193
194
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 192

def jq(task_name, query:, **inputs)
  add_task(task_name, TaskType::JSON_JQ_TRANSFORM, inputs, { query_expression: query })
end

#kafka_publish(task_name, topic:, value:, key: nil, headers: nil) ⇒ TaskRef

Add a KAFKA_PUBLISH task

Parameters:

  • task_name (Symbol, String)

    The task name

  • topic (String)

    Kafka topic

  • value (Object)

    Message value

  • key (String, nil) (defaults to: nil)

    Message key

  • headers (Hash, nil) (defaults to: nil)

    Message headers

Returns:

  • (TaskRef)

    Reference to the created task



212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 212

def kafka_publish(task_name, topic:, value:, key: nil, headers: nil)
  inputs = {
    'kafka_request' => {
      'topic' => topic,
      'value' => resolve_value(value)
    }
  }
  inputs['kafka_request']['key'] = key if key
  inputs['kafka_request']['headers'] = resolve_value(headers) if headers

  add_task(task_name, TaskType::KAFKA_PUBLISH, inputs, {})
end

#list_mcp_tools(task_name, mcp_server:, headers: nil, **options) ⇒ TaskRef

Add a LIST_MCP_TOOLS task (list available tools from MCP server)

Examples:

list_mcp_tools :get_tools, mcp_server: 'my-mcp-server'

Parameters:

  • task_name (Symbol, String)

    The task name

  • mcp_server (String)

    MCP server integration name

  • headers (Hash, nil) (defaults to: nil)

    Optional HTTP headers

  • options (Hash)

    Additional options

Returns:

  • (TaskRef)

    Reference to the created task



585
586
587
588
589
590
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 585

def list_mcp_tools(task_name, mcp_server:, headers: nil, **options)
  inputs = { 'mcpServer' => mcp_server }
  inputs['headers'] = resolve_hash(headers) if headers

  add_task(task_name, TaskType::LIST_MCP_TOOLS, inputs, options)
end

#llm_chat(task_name, provider:, model:, messages: nil, temperature: nil, top_p: nil, stop_words: nil, max_tokens: nil, **options) ⇒ TaskRef

Add an LLM_CHAT_COMPLETE task

Parameters:

  • task_name (Symbol, String)

    The task name

  • provider (String)

    LLM provider (e.g., 'openai', 'azure_openai')

  • model (String)

    Model name

  • messages (Array<ChatMessage, Hash>) (defaults to: nil)

    Chat messages

  • temperature (Float, nil) (defaults to: nil)

    Temperature (0.0-1.0)

  • top_p (Float, nil) (defaults to: nil)

    Top-p sampling parameter

  • stop_words (Array<String>, nil) (defaults to: nil)

    Stop sequences

  • max_tokens (Integer, nil) (defaults to: nil)

    Maximum tokens to generate

  • options (Hash)

    Additional options

Returns:

  • (TaskRef)

    Reference to the created task



348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 348

def llm_chat(task_name, provider:, model:, messages: nil, temperature: nil, top_p: nil,
             stop_words: nil, max_tokens: nil, **options)
  converted_messages = messages&.map do |msg|
    if msg.is_a?(Hash)
      Conductor::Workflow::Llm::ChatMessage.new(**msg)
    else
      msg
    end
  end

  inputs = {
    'llmProvider' => provider,
    'model' => model
  }
  inputs['messages'] = converted_messages.map(&:to_h) if converted_messages
  inputs['temperature'] = temperature if temperature
  inputs['topP'] = top_p if top_p
  inputs['stopWords'] = stop_words if stop_words
  inputs['maxTokens'] = max_tokens if max_tokens

  add_task(task_name, TaskType::LLM_CHAT_COMPLETE, inputs, options)
end

#llm_complete(task_name, provider:, model:, prompt:, temperature: nil, max_tokens: nil, **options) ⇒ TaskRef

Add an LLM_TEXT_COMPLETE task

Parameters:

  • task_name (Symbol, String)

    The task name

  • provider (String)

    LLM provider

  • model (String)

    Model name

  • prompt (String)

    Text prompt

  • temperature (Float, nil) (defaults to: nil)

    Temperature

  • max_tokens (Integer, nil) (defaults to: nil)

    Maximum tokens

  • options (Hash)

    Additional options

Returns:

  • (TaskRef)

    Reference to the created task



380
381
382
383
384
385
386
387
388
389
390
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 380

def llm_complete(task_name, provider:, model:, prompt:, temperature: nil, max_tokens: nil, **options)
  inputs = {
    'llmProvider' => provider,
    'model' => model,
    'prompt' => resolve_value(prompt)
  }
  inputs['temperature'] = temperature if temperature
  inputs['maxTokens'] = max_tokens if max_tokens

  add_task(task_name, TaskType::LLM_TEXT_COMPLETE, inputs, options)
end

#llm_embed(task_name, provider:, model:, text:, **options) ⇒ TaskRef

Add an LLM_GENERATE_EMBEDDINGS task

Parameters:

  • task_name (Symbol, String)

    The task name

  • provider (String)

    LLM provider

  • model (String)

    Model name

  • text (String, Array<String>)

    Text(s) to embed

  • options (Hash)

    Additional options

Returns:

  • (TaskRef)

    Reference to the created task



399
400
401
402
403
404
405
406
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 399

def llm_embed(task_name, provider:, model:, text:, **options)
  inputs = {
    'llmProvider' => provider,
    'model' => model,
    'text' => resolve_value(text)
  }
  add_task(task_name, TaskType::LLM_GENERATE_EMBEDDINGS, inputs, options)
end

#llm_get_embeddings(task_name, vector_db:, index:, ids:, namespace: nil, **options) ⇒ TaskRef

Add an LLM_GET_EMBEDDINGS task (retrieve stored embeddings)

Parameters:

  • task_name (Symbol, String)

    The task name

  • vector_db (String)

    Vector DB integration name

  • index (String)

    Index/collection name

  • ids (Array<String>, OutputRef)

    Document IDs to retrieve

  • namespace (String, nil) (defaults to: nil)

    Namespace/partition

  • options (Hash)

    Additional options

Returns:

  • (TaskRef)

    Reference to the created task



566
567
568
569
570
571
572
573
574
575
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 566

def llm_get_embeddings(task_name, vector_db:, index:, ids:, namespace: nil, **options)
  inputs = {
    'vectorDB' => vector_db,
    'index' => index,
    'ids' => resolve_value(ids)
  }
  inputs['namespace'] = namespace if namespace

  add_task(task_name, TaskType::LLM_GET_EMBEDDINGS, inputs, options)
end

#llm_index(task_name, vector_db:, namespace:, index:, embeddings:, doc_id: nil, **options) ⇒ TaskRef

Add an LLM_INDEX_TEXT task

Parameters:

  • task_name (Symbol, String)

    The task name

  • vector_db (String)

    Vector database provider

  • namespace (String)

    Index namespace

  • index (String)

    Index name

  • embeddings (Array, OutputRef)

    Embeddings to index

  • doc_id (String, OutputRef, nil) (defaults to: nil)

    Document ID

  • options (Hash)

    Additional options

Returns:

  • (TaskRef)

    Reference to the created task



417
418
419
420
421
422
423
424
425
426
427
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 417

def llm_index(task_name, vector_db:, namespace:, index:, embeddings:, doc_id: nil, **options)
  inputs = {
    'vectorDB' => vector_db,
    'namespace' => namespace,
    'index' => index,
    'embeddingModelProvider' => resolve_value(embeddings)
  }
  inputs['docId'] = resolve_value(doc_id) if doc_id

  add_task(task_name, TaskType::LLM_INDEX_TEXT, inputs, options)
end

#llm_search(task_name, vector_db:, namespace:, index:, query_embeddings:, top_k: 10, **options) ⇒ TaskRef

Add an LLM_SEARCH_INDEX task

Parameters:

  • task_name (Symbol, String)

    The task name

  • vector_db (String)

    Vector database provider

  • namespace (String)

    Index namespace

  • index (String)

    Index name

  • query_embeddings (Array, OutputRef)

    Query embeddings

  • top_k (Integer) (defaults to: 10)

    Number of results to return

  • options (Hash)

    Additional options

Returns:

  • (TaskRef)

    Reference to the created task



438
439
440
441
442
443
444
445
446
447
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 438

def llm_search(task_name, vector_db:, namespace:, index:, query_embeddings:, top_k: 10, **options)
  inputs = {
    'vectorDB' => vector_db,
    'namespace' => namespace,
    'index' => index,
    'queryEmbeddings' => resolve_value(query_embeddings),
    'k' => top_k
  }
  add_task(task_name, TaskType::LLM_SEARCH_INDEX, inputs, options)
end

#llm_search_embeddings(task_name, vector_db:, index:, embeddings:, namespace: nil, max_results: 1, embedding_model: nil, embedding_model_provider: nil, **options) ⇒ TaskRef

Add an LLM_SEARCH_EMBEDDINGS task (search vector database by embeddings)

Examples:

llm_search_embeddings :find_similar, vector_db: 'pinecone', index: 'docs',
                      embeddings: query_embed[:embeddings], max_results: 5

Parameters:

  • task_name (Symbol, String)

    The task name

  • vector_db (String)

    Vector DB integration name

  • index (String)

    Index/collection name

  • embeddings (Array, OutputRef)

    Query embedding vector

  • namespace (String, nil) (defaults to: nil)

    Namespace/partition

  • max_results (Integer) (defaults to: 1)

    Maximum results to return (default: 1)

  • embedding_model (String, nil) (defaults to: nil)

    Embedding model name

  • embedding_model_provider (String, nil) (defaults to: nil)

    Embedding provider name

  • options (Hash)

    Additional options

Returns:

  • (TaskRef)

    Reference to the created task



542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 542

def llm_search_embeddings(task_name, vector_db:, index:, embeddings:, namespace: nil,
                          max_results: 1, embedding_model: nil,
                          embedding_model_provider: nil, **options)
  inputs = {
    'vectorDB' => vector_db,
    'index' => index,
    'embeddings' => resolve_value(embeddings),
    'maxResults' => max_results
  }
  inputs['namespace'] = namespace if namespace
  inputs['embeddingModel'] = embedding_model if embedding_model
  inputs['embeddingModelProvider'] = embedding_model_provider if embedding_model_provider

  add_task(task_name, TaskType::LLM_SEARCH_EMBEDDINGS, inputs, options)
end

#llm_store_embeddings(task_name, vector_db:, index:, embeddings:, namespace: nil, id: nil, metadata: nil, embedding_model: nil, embedding_model_provider: nil, **options) ⇒ TaskRef

Add an LLM_STORE_EMBEDDINGS task (store vectors in a vector database)

Examples:

llm_store_embeddings :store_vectors, vector_db: 'pinecone', index: 'docs',
                     embeddings: embed_task[:embeddings], id: 'doc-123'

Parameters:

  • task_name (Symbol, String)

    The task name

  • vector_db (String)

    Vector DB integration name

  • index (String)

    Index/collection name

  • embeddings (Array, OutputRef)

    Embedding vector(s) to store

  • namespace (String, nil) (defaults to: nil)

    Namespace/partition

  • id (String, nil) (defaults to: nil)

    Document ID

  • metadata (Hash, nil) (defaults to: nil)

    Document metadata

  • embedding_model (String, nil) (defaults to: nil)

    Model used to generate embeddings

  • embedding_model_provider (String, nil) (defaults to: nil)

    Provider used to generate embeddings

  • options (Hash)

    Additional options

Returns:

  • (TaskRef)

    Reference to the created task



511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 511

def llm_store_embeddings(task_name, vector_db:, index:, embeddings:, namespace: nil,
                         id: nil, metadata: nil, embedding_model: nil,
                         embedding_model_provider: nil, **options)
  inputs = {
    'vectorDB' => vector_db,
    'index' => index,
    'embeddings' => resolve_value(embeddings)
  }
  inputs['namespace'] = namespace if namespace
  inputs['id'] = resolve_value(id) if id
  inputs['metadata'] = resolve_hash() if 
  inputs['embeddingModel'] = embedding_model if embedding_model
  inputs['embeddingModelProvider'] = embedding_model_provider if embedding_model_provider

  add_task(task_name, TaskType::LLM_STORE_EMBEDDINGS, inputs, options)
end

#loop_over(items) { ... } ⇒ TaskRef

Create a loop that iterates over items in an array

Examples:

loop_over user_list[:users] do
  simple :process_user, user: iteration[:item]
end

Parameters:

  • items (OutputRef, String)

    Expression or reference to array to iterate over

Yields:

  • Block containing tasks to execute for each item

Returns:

  • (TaskRef)

    Reference to the DO_WHILE task



699
700
701
702
703
704
705
706
707
708
709
710
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 699

def loop_over(items, &block)
  # Set up the loop with array iteration pattern
  loop_tasks = []
  collector = LoopCollector.new(self, loop_tasks)
  collector.instance_eval(&block)

  # Create a set variable task to track iteration
  items_expr = resolve_value(items)
  condition = '$.iteration_index < $.items.length()'

  add_do_while_task_with_items(condition, loop_tasks, items_expr)
end

#loop_times(count) { ... } ⇒ TaskRef

Create a loop that executes N times

Examples:

loop_times 3 do
  simple :process_batch
end

Parameters:

  • count (Integer)

    Number of iterations

Yields:

  • Block containing tasks to loop

Returns:

  • (TaskRef)

    Reference to the DO_WHILE task



670
671
672
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 670

def loop_times(count, &block)
  loop_while("$.loop_counter < #{count}", &block)
end

#loop_while(condition) { ... } ⇒ TaskRef

Create a loop with a custom condition

Examples:

loop_while "$.has_more == true" do
  simple :fetch_page
end

Parameters:

  • condition (String)

    JavaScript condition to evaluate

Yields:

  • Block containing tasks to loop

Returns:

  • (TaskRef)

    Reference to the DO_WHILE task



682
683
684
685
686
687
688
689
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 682

def loop_while(condition, &block)
  # Collect tasks in the loop body
  loop_tasks = []
  collector = TaskCollector.new(self, loop_tasks)
  collector.instance_eval(&block)

  add_do_while_task(condition, loop_tasks)
end

#output(**params) ⇒ Object

Define workflow output parameters

Examples:

output user_email: user[:email], order_id: wf[:order_id]

Parameters:

  • params (Hash)

    Output parameter mappings



78
79
80
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 78

def output(**params)
  @output_params.merge!(resolve_hash(params))
end

#owner_email(email) ⇒ Object



62
63
64
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 62

def owner_email(email)
  @owner_email = email
end

#parallel { ... } ⇒ TaskRef

Create a parallel execution block (FORK_JOIN)

Examples:

parallel do
  simple :task1
  simple :task2
end

Yields:

  • Block containing tasks to execute in parallel

Returns:

  • (TaskRef)

    Reference to the JOIN task



626
627
628
629
630
631
632
633
634
635
636
637
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 626

def parallel(&block)
  builder = ParallelBuilder.new(self)
  builder.instance_eval(&block)
  branches = builder.finalize

  # Create FORK_JOIN task
  fork_ref = add_fork_join_task(branches)

  # Create JOIN task
  join_on_refs = branches.map { |branch| branch.last.ref_name }
  add_join_task(join_on_refs)
end

#restartable(value) ⇒ Object



66
67
68
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 66

def restartable(value)
  @restartable = value
end

#set(**variables) ⇒ TaskRef

Add a SET_VARIABLE task

Parameters:

  • variables (Hash)

    Variables to set

  • options (Hash)

    Additional options

Returns:

  • (TaskRef)

    Reference to the created task



173
174
175
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 173

def set(**variables)
  add_task('set_variable', TaskType::SET_VARIABLE, resolve_hash(variables), {})
end

#set_version(v) ⇒ Object

Configure workflow metadata



48
49
50
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 48

def set_version(v)
  @version = v
end

#simple(task_name, **inputs) ⇒ TaskRef

Add a SIMPLE task (worker task)

Parameters:

  • task_name (Symbol, String)

    The task name

  • inputs (Hash)

    Input parameters

Returns:

  • (TaskRef)

    Reference to the created task



90
91
92
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 90

def simple(task_name, **inputs)
  add_task(task_name, TaskType::SIMPLE, inputs, {})
end

#start_workflow(task_name, workflow:, version: nil, **inputs) ⇒ TaskRef

Add a START_WORKFLOW task (fire-and-forget, does not wait for completion)

Examples:

start_workflow :trigger_async, workflow: 'async_processing', user_id: wf[:user_id]

Parameters:

  • task_name (Symbol, String)

    The task name

  • workflow (String)

    Name of the workflow to start

  • version (Integer, nil) (defaults to: nil)

    Workflow version (optional)

  • inputs (Hash)

    Input parameters for the started workflow

Returns:

  • (TaskRef)

    Reference to the created task



233
234
235
236
237
238
239
240
241
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 233

def start_workflow(task_name, workflow:, version: nil, **inputs)
  start_workflow_input = {
    'name' => workflow,
    'input' => resolve_hash(inputs)
  }
  start_workflow_input['version'] = version if version

  add_task(task_name, TaskType::START_WORKFLOW, { 'startWorkflow' => start_workflow_input }, {})
end

#sub_workflow(task_name, workflow:, version: nil, **inputs) ⇒ TaskRef

Add a SUB_WORKFLOW task

Parameters:

  • task_name (Symbol, String)

    The task name

  • workflow (String)

    Name of the workflow to call

  • version (Integer, nil) (defaults to: nil)

    Version of the workflow

  • inputs (Hash)

    Input parameters for the sub-workflow

Returns:

  • (TaskRef)

    Reference to the created task



145
146
147
148
149
150
151
152
153
154
155
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 145

def sub_workflow(task_name, workflow:, version: nil, **inputs)
  add_task(
    task_name,
    TaskType::SUB_WORKFLOW,
    inputs,
    {
      sub_workflow_name: workflow,
      sub_workflow_version: version
    }
  )
end

#terminate(status, reason, **options) ⇒ TaskRef

Add a TERMINATE task

Parameters:

  • status (Symbol, String)

    Termination status (:completed, :failed)

  • reason (String)

    Reason for termination

  • options (Hash)

    Additional options

Returns:

  • (TaskRef)

    Reference to the created task



131
132
133
134
135
136
137
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 131

def terminate(status, reason, **options)
  inputs = {
    'terminationStatus' => status.to_s.upcase,
    'terminationReason' => reason
  }
  add_task('terminate', TaskType::TERMINATE, inputs, options)
end

#timeout(seconds) ⇒ Object



58
59
60
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 58

def timeout(seconds)
  @timeout_seconds = seconds
end

#to_workflow_defConductor::Http::Models::WorkflowDef

Convert the builder to a WorkflowDef model

Returns:



933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 933

def to_workflow_def
  workflow_tasks = convert_tasks_to_workflow_tasks

  Conductor::Http::Models::WorkflowDef.new(
    name: @name,
    version: @version,
    description: @description,
    tasks: workflow_tasks,
    input_parameters: @input_params,
    output_parameters: @output_params,
    timeout_seconds: @timeout_seconds,
    owner_email: @owner_email,
    restartable: @restartable,
    failure_workflow: @failure_workflow,
    schema_version: 2
  )
end

#wait(seconds = nil, until_time: nil, **options) ⇒ TaskRef

Add a WAIT task

Parameters:

  • seconds (Integer, nil) (defaults to: nil)

    Duration to wait in seconds

  • until_time (String, nil) (defaults to: nil)

    Wait until specific time (ISO8601 format)

  • options (Hash)

    Additional options

Returns:

  • (TaskRef)

    Reference to the created task



118
119
120
121
122
123
124
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 118

def wait(seconds = nil, until_time: nil, **options)
  inputs = {}
  inputs['duration'] = "#{seconds} seconds" if seconds
  inputs['until'] = until_time if until_time

  add_task('wait', TaskType::WAIT, inputs, options)
end

#wait_for_webhook(task_name, matches: {}) ⇒ TaskRef

Add a WAIT_FOR_WEBHOOK task (waits for external webhook callback)

Examples:

wait_for_webhook :payment_callback, matches: { 'order_id' => wf[:order_id] }

Parameters:

  • task_name (Symbol, String)

    The task name

  • matches (Hash) (defaults to: {})

    Match conditions for the webhook payload

Returns:

  • (TaskRef)

    Reference to the created task



249
250
251
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 249

def wait_for_webhook(task_name, matches: {})
  add_task(task_name, TaskType::WAIT_FOR_WEBHOOK, { 'matches' => resolve_hash(matches) }, {})
end

#wfInputRef

Returns the workflow input proxy for accessing workflow inputs

Examples:

wf[:user_id] # => "${workflow.input.user_id}"
wf.var(:counter) # => "${workflow.variables.counter}"

Returns:

  • (InputRef)

    Proxy for workflow.input, workflow.variables, etc.



43
44
45
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 43

def wf
  @wf ||= InputRef.new
end

#when_false(condition) { ... } ⇒ TaskRef

Create a branch that only executes if a condition is false

Parameters:

  • condition (String, OutputRef)

    The condition to evaluate

Yields:

  • Block containing tasks to execute if condition is false

Returns:

  • (TaskRef)

    Reference to the SWITCH task



759
760
761
762
763
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 759

def when_false(condition, &block)
  decide condition do
    on 'false', &block
  end
end

#when_true(condition) { ... } ⇒ TaskRef

Create a branch that only executes if a condition is true

Examples:

when_true order[:is_premium] do
  simple :apply_discount
end

Parameters:

  • condition (String, OutputRef)

    The condition to evaluate

Yields:

  • Block containing tasks to execute if condition is true

Returns:

  • (TaskRef)

    Reference to the SWITCH task



749
750
751
752
753
# File 'lib/conductor/workflow/dsl/workflow_builder.rb', line 749

def when_true(condition, &block)
  decide condition do
    on 'true', &block
  end
end