Module: JobWorkflow::DSL::ClassMethods

Defined in:
lib/job_workflow/dsl.rb

Instance Method Summary collapse

Instance Method Details

#after(*task_names, &block) ⇒ Object

: (*Symbol) { (Context) -> void } -> void



168
169
170
# File 'lib/job_workflow/dsl.rb', line 168

def after(*task_names, &block)
  _workflow.add_hook(:after, task_names:, block:)
end

#argument(argument_name, type, default: nil) ⇒ Object

: (Symbol argument_name, String type, ?default: untyped) -> void



110
111
112
# File 'lib/job_workflow/dsl.rb', line 110

def argument(argument_name, type, default: nil)
  _workflow.add_argument(ArgumentDef.new(name: argument_name, type:, default:))
end

#around(*task_names, &block) ⇒ Object

: (*Symbol) { (Context, TaskCallable) -> void } -> void



173
174
175
# File 'lib/job_workflow/dsl.rb', line 173

def around(*task_names, &block)
  _workflow.add_hook(:around, task_names:, block:)
end

#before(*task_names, &block) ⇒ Object

: (*Symbol) { (Context) -> void } -> void



163
164
165
# File 'lib/job_workflow/dsl.rb', line 163

def before(*task_names, &block)
  _workflow.add_hook(:before, task_names:, block:)
end

#dry_run(value = nil, &block) ⇒ Object

: (?bool) ?{ (Context) -> bool } -> void



227
228
229
# File 'lib/job_workflow/dsl.rb', line 227

def dry_run(value = nil, &block)
  _workflow.dry_run_config = block || value
end

#from_context(context) ⇒ Object

: (Context) -> DSL



99
100
101
102
103
104
105
106
107
# File 'lib/job_workflow/dsl.rb', line 99

def from_context(context) # rubocop:disable Metrics/AbcSize
  new_context = context.dup
  task = new_context._task_context.task
  job = new(new_context.arguments.to_h)
  new_context._job = job
  job._context = new_context
  job.set(queue: task.enqueue.queue) if !task.nil? && !task.enqueue.queue.nil?
  job
end

#on_error(*task_names, &block) ⇒ Object

: (*Symbol) { (Context, StandardError, Task) -> void } -> void



178
179
180
# File 'lib/job_workflow/dsl.rb', line 178

def on_error(*task_names, &block)
  _workflow.add_hook(:error, task_names:, block:)
end

#schedule(expression, key: nil, queue: nil, priority: nil, args: {}, description: nil) ⇒ Object

rubocop:disable Metrics/ParameterLists : (

  String expression,
  ?key: (String | Symbol)?,
  ?queue: String?,
  ?priority: Integer?,
  ?args: Hash[Symbol, untyped],
  ?description: String?
) -> void


240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/job_workflow/dsl.rb', line 240

def schedule(expression, key: nil, queue: nil, priority: nil, args: {}, description: nil)
  _workflow.add_schedule(
    Schedule.new(
      expression:,
      class_name: name,
      key:,
      queue:,
      priority:,
      args:,
      description:
    )
  )
end

#task(task_name, each: Task::DEFAULT_EACH, enqueue: nil, retry: 0, output: {}, depends_on: [], condition: ->(_ctx) { true }, throttle: {}, timeout: nil, dependency_wait: {}, dry_run: false, &block) ⇒ Object

rubocop:disable Metrics/ParameterLists

: (

  Symbol task_name,
  ?each: ^(Context) -> untyped,
  ?enqueue: true | false | ^(Context) -> bool | Hash[Symbol, untyped],
  ?retry: Integer | Hash[Symbol, untyped],
  ?output: Hash[Symbol, String],
  ?depends_on: Array[Symbol],
  ?condition: ^(Context) -> bool,
  ?throttle: Integer | Hash[Symbol, untyped],
  ?timeout: Numeric?,
  ?dependency_wait: Hash[Symbol, untyped],
  ?dry_run: bool | ^(Context) -> bool
) { (untyped) -> void } -> void


129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/job_workflow/dsl.rb', line 129

def task(
  task_name,
  each: Task::DEFAULT_EACH,
  enqueue: nil,
  retry: 0,
  output: {},
  depends_on: [],
  condition: ->(_ctx) { true },
  throttle: {},
  timeout: nil,
  dependency_wait: {},
  dry_run: false,
  &block
)
  new_task = Task.new(
    job_name: name,
    name: task_name,
    block: block,
    enqueue:,
    each:,
    task_retry: binding.local_variable_get(:retry),
    output:,
    depends_on:,
    condition:,
    throttle:,
    timeout:,
    dependency_wait:,
    dry_run:
  )
  _workflow.add_task(new_task)
end

#workflow_concurrency(to:, key:, **opts) ⇒ Object

Configures concurrency limits for this workflow job.

Unlike ‘limits_concurrency` (SolidQueue’s raw API), this method passes a Context as the first argument to the key Proc, giving access to workflow-aware information such as ‘arguments`, `sub_job?`, and `concurrency_key`.

When ‘_context` is not yet initialized (e.g. during enqueue before perform), a temporary Context is built from the job’s arguments so the key Proc can always rely on ‘ctx.arguments`.

: (

  to: Integer,
  key: ^(Context) -> String?,
  ?duration: ActiveSupport::Duration?,
  ?group: String?,
  ?on_conflict: Symbol?
) -> void

Examples:

Limit duplicate workflow runs by argument

workflow_concurrency to: 1,
  key: ->(ctx) { "my_job:#{ctx.arguments.tenant_id}" },
  on_conflict: :discard

Separate parent and sub-job concurrency keys

workflow_concurrency to: 1,
  key: ->(ctx) {
    ctx.sub_job? ? ctx.concurrency_key : "my_job:#{ctx.arguments.name}"
  },
  on_conflict: :discard


212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/job_workflow/dsl.rb', line 212

def workflow_concurrency(to:, key:, **opts)
  concurrency_key_proc = key
  limits_concurrency(
    to:,
    key: proc {
      ctx = _context || Context.from_hash(
        job: self, workflow: self.class._workflow
      )._update_arguments((arguments.first || {}).symbolize_keys)
      concurrency_key_proc.call(ctx)
    },
    **opts
  )
end