Module: JobWorkflow::DSL::ClassMethods

Defined in:
lib/job_workflow/dsl.rb

Instance Method Summary collapse

Instance Method Details

#after(*task_names, &block) ⇒ Object

: (*Symbol) { (Context) -> void } -> void



172
173
174
# File 'lib/job_workflow/dsl.rb', line 172

def after(*task_names, &block)
  _workflow.add_hook(:after, task_names:, block:)
end

#argument(argument_name, type, default: nil) ⇒ Object

: (Symbol argument_name, String type, ?default: untyped) -> void



110
111
112
# File 'lib/job_workflow/dsl.rb', line 110

def argument(argument_name, type, default: nil)
  _workflow.add_argument(ArgumentDef.new(name: argument_name, type:, default:))
end

#around(*task_names, &block) ⇒ Object

: (*Symbol) { (Context, TaskCallable) -> void } -> void



177
178
179
# File 'lib/job_workflow/dsl.rb', line 177

def around(*task_names, &block)
  _workflow.add_hook(:around, task_names:, block:)
end

#before(*task_names, &block) ⇒ Object

: (*Symbol) { (Context) -> void } -> void



167
168
169
# File 'lib/job_workflow/dsl.rb', line 167

def before(*task_names, &block)
  _workflow.add_hook(:before, task_names:, block:)
end

#dry_run(value = nil, &block) ⇒ Object

: (?bool) ?{ (Context) -> bool } -> void



231
232
233
# File 'lib/job_workflow/dsl.rb', line 231

def dry_run(value = nil, &block)
  _workflow.dry_run_config = block || value
end

#from_context(context) ⇒ Object

: (Context) -> DSL



99
100
101
102
103
104
105
106
107
# File 'lib/job_workflow/dsl.rb', line 99

def from_context(context) # rubocop:disable Metrics/AbcSize
  new_context = context.dup
  task = new_context._task_context.task
  job = new(new_context.arguments.to_h)
  new_context._job = job
  job._context = new_context
  job.set(queue: task.enqueue.queue) if !task.nil? && !task.enqueue.queue.nil?
  job
end

#on_error(*task_names, &block) ⇒ Object

: (*Symbol) { (Context, StandardError, Task) -> void } -> void



182
183
184
# File 'lib/job_workflow/dsl.rb', line 182

def on_error(*task_names, &block)
  _workflow.add_hook(:error, task_names:, block:)
end

#schedule(expression, key: nil, queue: nil, priority: nil, args: {}, description: nil) ⇒ Object

rubocop:disable Metrics/ParameterLists : (

  String expression,
  ?key: (String | Symbol)?,
  ?queue: String?,
  ?priority: Integer?,
  ?args: Hash[Symbol, untyped],
  ?description: String?
) -> void


244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/job_workflow/dsl.rb', line 244

def schedule(expression, key: nil, queue: nil, priority: nil, args: {}, description: nil)
  _workflow.add_schedule(
    Schedule.new(
      expression:,
      class_name: name,
      key:,
      queue:,
      priority:,
      args:,
      description:
    )
  )
end

#task(task_name, each: ->(_ctx) { [TaskContext::NULL_VALUE] }, enqueue: nil, retry: 0, output: {}, depends_on: [], condition: ->(_ctx) { true }, throttle: {}, timeout: nil, dependency_wait: {}, dry_run: false, &block) ⇒ Object

rubocop:disable Metrics/ParameterLists

: (

  Symbol task_name,
  ?each: ^(Context) -> untyped,
  ?enqueue: true | false | ^(Context) -> bool | Hash[Symbol, untyped],
  ?retry: Integer | Hash[Symbol, untyped],
  ?output: Hash[Symbol, String],
  ?depends_on: Array[Symbol],
  ?condition: ^(Context) -> bool,
  ?throttle: Integer | Hash[Symbol, untyped],
  ?timeout: Numeric?,
  ?dependency_wait: Hash[Symbol, untyped],
  ?dry_run: bool | ^(Context) -> bool
) { (untyped) -> void } -> void


129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/job_workflow/dsl.rb', line 129

def task(
  task_name,
  each: ->(_ctx) { [TaskContext::NULL_VALUE] },
  enqueue: nil,
  retry: 0,
  output: {},
  depends_on: [],
  condition: ->(_ctx) { true },
  throttle: {},
  timeout: nil,
  dependency_wait: {},
  dry_run: false,
  &block
)
  new_task = Task.new(
    job_name: name,
    name: task_name,
    block: block,
    enqueue:,
    each:,
    task_retry: binding.local_variable_get(:retry),
    output:,
    depends_on:,
    condition:,
    throttle:,
    timeout:,
    dependency_wait:,
    dry_run:
  )
  _workflow.add_task(new_task)
  if new_task.enqueue.should_limits_concurrency? # rubocop:disable Style/GuardClause
    concurrency = new_task.enqueue.concurrency #: Integer
    workflow_concurrency(to: concurrency, key: :concurrency_key.to_proc)
  end
end

#workflow_concurrency(to:, key:, **opts) ⇒ Object

Configures concurrency limits for this workflow job.

Unlike ‘limits_concurrency` (SolidQueue’s raw API), this method passes a Context as the first argument to the key Proc, giving access to workflow-aware information such as ‘arguments`, `sub_job?`, and `concurrency_key`.

When ‘_context` is not yet initialized (e.g. during enqueue before perform), a temporary Context is built from the job’s arguments so the key Proc can always rely on ‘ctx.arguments`.

: (

  to: Integer,
  key: ^(Context) -> String?,
  ?duration: ActiveSupport::Duration?,
  ?group: String?,
  ?on_conflict: Symbol?
) -> void

Examples:

Limit duplicate workflow runs by argument

workflow_concurrency to: 1,
  key: ->(ctx) { "my_job:#{ctx.arguments.tenant_id}" },
  on_conflict: :discard

Separate parent and sub-job concurrency keys

workflow_concurrency to: 1,
  key: ->(ctx) {
    ctx.sub_job? ? ctx.concurrency_key : "my_job:#{ctx.arguments.name}"
  },
  on_conflict: :discard


216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/job_workflow/dsl.rb', line 216

def workflow_concurrency(to:, key:, **opts)
  concurrency_key_proc = key
  limits_concurrency(
    to:,
    key: proc {
      ctx = _context || Context.from_hash(
        job: self, workflow: self.class._workflow
      )._update_arguments((arguments.first || {}).symbolize_keys)
      concurrency_key_proc.call(ctx)
    },
    **opts
  )
end