Class: Conductor::Workflow::Dsl::WorkflowDefinition

Inherits:
Object
  • Object
show all
Defined in:
lib/conductor/workflow/dsl/workflow_definition.rb

Overview

WorkflowDefinition wraps a WorkflowBuilder and provides methods for registering and executing workflows.

Examples:

workflow = Conductor.workflow :my_workflow do
  simple :task1
end

workflow.register(overwrite: true)
result = workflow.execute(input: { foo: 'bar' })

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(builder, executor: nil) ⇒ WorkflowDefinition

Returns a new instance of WorkflowDefinition.

Parameters:



22
23
24
25
# File 'lib/conductor/workflow/dsl/workflow_definition.rb', line 22

def initialize(builder, executor: nil)
  @builder = builder
  @executor = executor
end

Instance Attribute Details

#builderObject (readonly)

Returns the value of attribute builder.



18
19
20
# File 'lib/conductor/workflow/dsl/workflow_definition.rb', line 18

def builder
  @builder
end

Instance Method Details

#call(**input) ⇒ WorkflowRun

Execute this workflow (alias for execute)

Examples:

result = workflow.call(user_id: 123, email: 'user@example.com')

Parameters:

  • input (Hash)

    Workflow input parameters

Returns:

  • (WorkflowRun)

    Workflow execution result



93
94
95
# File 'lib/conductor/workflow/dsl/workflow_definition.rb', line 93

def call(**input)
  execute(input: input)
end

#execute(input: {}, wait_for_seconds: 30, correlation_id: nil, domain: nil, wait_until_task_ref: nil, request_id: nil) ⇒ WorkflowRun

Execute this workflow and wait for completion

Examples:

result = workflow.execute(input: { user_id: 123 })
puts result.status

Parameters:

  • input (Hash) (defaults to: {})

    Workflow input parameters (default: {})

  • wait_for_seconds (Integer) (defaults to: 30)

    Maximum time to wait for completion (default: 30)

  • correlation_id (String, nil) (defaults to: nil)

    Correlation ID for tracking

  • domain (String, nil) (defaults to: nil)

    Task domain for all tasks

  • wait_until_task_ref (String, nil) (defaults to: nil)

    Wait until specific task completes

  • request_id (String, nil) (defaults to: nil)

    Unique request ID for idempotency

Returns:

  • (WorkflowRun)

    Workflow execution result

Raises:

  • (RuntimeError)

    If no executor is configured



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/conductor/workflow/dsl/workflow_definition.rb', line 71

def execute(input: {}, wait_for_seconds: 30, correlation_id: nil, domain: nil,
            wait_until_task_ref: nil, request_id: nil)
  raise 'Executor required for execution. Pass executor: option to Conductor.workflow' unless @executor

  @executor.execute(
    @builder.name,
    input: input,
    version: @builder.version,
    wait_for_seconds: wait_for_seconds,
    correlation_id: correlation_id,
    domain: domain,
    wait_until_task_ref: wait_until_task_ref,
    request_id: request_id
  )
end

#inspectString

Inspect the workflow definition

Returns:

  • (String)

    Human-readable representation



137
138
139
140
# File 'lib/conductor/workflow/dsl/workflow_definition.rb', line 137

def inspect
  "#<Conductor::Workflow::Dsl::WorkflowDefinition name=#{@builder.name.inspect} " \
    "version=#{@builder.version.inspect} tasks=#{@builder.tasks.size}>"
end

#nameString

Get the workflow name

Returns:

  • (String)

    Workflow name



29
30
31
# File 'lib/conductor/workflow/dsl/workflow_definition.rb', line 29

def name
  @builder.name
end

#register(overwrite: false) ⇒ Object

Register this workflow with Conductor

Examples:

workflow.register(overwrite: true)

Parameters:

  • overwrite (Boolean) (defaults to: false)

    Overwrite existing workflow definition (default: false)

Returns:

  • (Object)

    API response

Raises:

  • (RuntimeError)

    If no executor is configured



52
53
54
55
56
# File 'lib/conductor/workflow/dsl/workflow_definition.rb', line 52

def register(overwrite: false)
  raise 'Executor required for registration. Pass executor: option to Conductor.workflow' unless @executor

  @executor.register_workflow(self, overwrite: overwrite)
end

#start(input: {}, correlation_id: nil, domain: nil) ⇒ String

Start this workflow asynchronously (returns immediately with workflow ID)

Examples:

workflow_id = workflow.start(input: { user_id: 123 })

Parameters:

  • input (Hash) (defaults to: {})

    Workflow input parameters (default: {})

  • correlation_id (String, nil) (defaults to: nil)

    Correlation ID for tracking

  • domain (String, nil) (defaults to: nil)

    Task domain for all tasks

Returns:

  • (String)

    Workflow ID

Raises:

  • (RuntimeError)

    If no executor is configured



106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/conductor/workflow/dsl/workflow_definition.rb', line 106

def start(input: {}, correlation_id: nil, domain: nil)
  raise 'Executor required for starting workflow. Pass executor: option to Conductor.workflow' unless @executor

  request = Conductor::Http::Models::StartWorkflowRequest.new(
    name: @builder.name,
    version: @builder.version,
    input: input,
    correlation_id: correlation_id
  )
  request.task_to_domain = { '*' => domain } if domain

  @executor.start_workflow(request)
end

#status(workflow_id, include_tasks: true) ⇒ Workflow

Get the workflow status

Examples:

workflow_id = workflow.start(input: { user_id: 123 })
status = workflow.status(workflow_id)

Parameters:

  • workflow_id (String)

    The workflow execution ID

  • include_tasks (Boolean) (defaults to: true)

    Include task details (default: true)

Returns:

  • (Workflow)

    Workflow execution details

Raises:

  • (RuntimeError)

    If no executor is configured



129
130
131
132
133
# File 'lib/conductor/workflow/dsl/workflow_definition.rb', line 129

def status(workflow_id, include_tasks: true)
  raise 'Executor required for checking status. Pass executor: option to Conductor.workflow' unless @executor

  @executor.get_workflow(workflow_id, include_tasks: include_tasks)
end

#to_sString

Convert to string

Returns:

  • (String)

    Workflow name and version



144
145
146
# File 'lib/conductor/workflow/dsl/workflow_definition.rb', line 144

def to_s
  "#{@builder.name}:#{@builder.version || 'latest'}"
end

#to_workflow_defConductor::Http::Models::WorkflowDef

Convert to WorkflowDef model

Returns:



41
42
43
# File 'lib/conductor/workflow/dsl/workflow_definition.rb', line 41

def to_workflow_def
  @builder.to_workflow_def
end

#versionInteger?

Get the workflow version

Returns:

  • (Integer, nil)

    Workflow version



35
36
37
# File 'lib/conductor/workflow/dsl/workflow_definition.rb', line 35

def version
  @builder.version
end