Module: JobWorkflow::DSL::ClassMethods
- Defined in:
- lib/job_workflow/dsl.rb
Instance Method Summary collapse
-
#after(*task_names, &block) ⇒ Object
: (*Symbol) { (Context) -> void } -> void.
-
#argument(argument_name, type, default: nil) ⇒ Object
: (Symbol argument_name, String type, ?default: untyped) -> void.
-
#around(*task_names, &block) ⇒ Object
: (*Symbol) { (Context, TaskCallable) -> void } -> void.
-
#before(*task_names, &block) ⇒ Object
: (*Symbol) { (Context) -> void } -> void.
-
#dry_run(value = nil, &block) ⇒ Object
: (?bool) ?{ (Context) -> bool } -> void.
-
#from_context(context) ⇒ Object
: (Context) -> DSL.
-
#on_error(*task_names, &block) ⇒ Object
: (*Symbol) { (Context, StandardError, Task) -> void } -> void.
-
#schedule(expression, key: nil, queue: nil, priority: nil, args: {}, description: nil) ⇒ Object
rubocop:disable Metrics/ParameterLists : ( String expression, ?key: (String | Symbol)?, ?queue: String?, ?priority: Integer?, ?args: Hash[Symbol, untyped], ?description: String? ) -> void.
-
#task(task_name, each: Task::DEFAULT_EACH, enqueue: nil, retry: 0, output: {}, depends_on: [], condition: ->(_ctx) { true }, throttle: {}, timeout: nil, dependency_wait: {}, dry_run: false, &block) ⇒ Object
rubocop:disable Metrics/ParameterLists.
-
#workflow_concurrency(to:, key:, **opts) ⇒ Object
Configures concurrency limits for this workflow job.
Instance Method Details
#after(*task_names, &block) ⇒ Object
: (*Symbol) { (Context) -> void } -> void
168 169 170 |
# File 'lib/job_workflow/dsl.rb', line 168 def after(*task_names, &block) _workflow.add_hook(:after, task_names:, block:) end |
#argument(argument_name, type, default: nil) ⇒ Object
: (Symbol argument_name, String type, ?default: untyped) -> void
110 111 112 |
# File 'lib/job_workflow/dsl.rb', line 110 def argument(argument_name, type, default: nil) _workflow.add_argument(ArgumentDef.new(name: argument_name, type:, default:)) end |
#around(*task_names, &block) ⇒ Object
: (*Symbol) { (Context, TaskCallable) -> void } -> void
173 174 175 |
# File 'lib/job_workflow/dsl.rb', line 173 def around(*task_names, &block) _workflow.add_hook(:around, task_names:, block:) end |
#before(*task_names, &block) ⇒ Object
: (*Symbol) { (Context) -> void } -> void
163 164 165 |
# File 'lib/job_workflow/dsl.rb', line 163 def before(*task_names, &block) _workflow.add_hook(:before, task_names:, block:) end |
#dry_run(value = nil, &block) ⇒ Object
: (?bool) ?{ (Context) -> bool } -> void
227 228 229 |
# File 'lib/job_workflow/dsl.rb', line 227 def dry_run(value = nil, &block) _workflow.dry_run_config = block || value end |
#from_context(context) ⇒ Object
: (Context) -> DSL
99 100 101 102 103 104 105 106 107 |
# File 'lib/job_workflow/dsl.rb', line 99 def from_context(context) # rubocop:disable Metrics/AbcSize new_context = context.dup task = new_context._task_context.task job = new(new_context.arguments.to_h) new_context._job = job job._context = new_context job.set(queue: task.enqueue.queue) if !task.nil? && !task.enqueue.queue.nil? job end |
#on_error(*task_names, &block) ⇒ Object
: (*Symbol) { (Context, StandardError, Task) -> void } -> void
178 179 180 |
# File 'lib/job_workflow/dsl.rb', line 178 def on_error(*task_names, &block) _workflow.add_hook(:error, task_names:, block:) end |
#schedule(expression, key: nil, queue: nil, priority: nil, args: {}, description: nil) ⇒ Object
rubocop:disable Metrics/ParameterLists : (
String expression,
?key: (String | Symbol)?,
?queue: String?,
?priority: Integer?,
?args: Hash[Symbol, untyped],
?description: String?
) -> void
240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/job_workflow/dsl.rb', line 240 def schedule(expression, key: nil, queue: nil, priority: nil, args: {}, description: nil) _workflow.add_schedule( Schedule.new( expression:, class_name: name, key:, queue:, priority:, args:, description: ) ) end |
#task(task_name, each: Task::DEFAULT_EACH, enqueue: nil, retry: 0, output: {}, depends_on: [], condition: ->(_ctx) { true }, throttle: {}, timeout: nil, dependency_wait: {}, dry_run: false, &block) ⇒ Object
rubocop:disable Metrics/ParameterLists
: (
Symbol task_name,
?each: ^(Context) -> untyped,
?enqueue: true | false | ^(Context) -> bool | Hash[Symbol, untyped],
?retry: Integer | Hash[Symbol, untyped],
?output: Hash[Symbol, String],
?depends_on: Array[Symbol],
?condition: ^(Context) -> bool,
?throttle: Integer | Hash[Symbol, untyped],
?timeout: Numeric?,
?dependency_wait: Hash[Symbol, untyped],
?dry_run: bool | ^(Context) -> bool
) { (untyped) -> void } -> void
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/job_workflow/dsl.rb', line 129 def task( task_name, each: Task::DEFAULT_EACH, enqueue: nil, retry: 0, output: {}, depends_on: [], condition: ->(_ctx) { true }, throttle: {}, timeout: nil, dependency_wait: {}, dry_run: false, &block ) new_task = Task.new( job_name: name, name: task_name, block: block, enqueue:, each:, task_retry: binding.local_variable_get(:retry), output:, depends_on:, condition:, throttle:, timeout:, dependency_wait:, dry_run: ) _workflow.add_task(new_task) end |
#workflow_concurrency(to:, key:, **opts) ⇒ Object
Configures concurrency limits for this workflow job.
Unlike ‘limits_concurrency` (SolidQueue’s raw API), this method passes a Context as the first argument to the key Proc, giving access to workflow-aware information such as ‘arguments`, `sub_job?`, and `concurrency_key`.
When ‘_context` is not yet initialized (e.g. during enqueue before perform), a temporary Context is built from the job’s arguments so the key Proc can always rely on ‘ctx.arguments`.
: (
to: Integer,
key: ^(Context) -> String?,
?duration: ActiveSupport::Duration?,
?group: String?,
?on_conflict: Symbol?
) -> void
212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/job_workflow/dsl.rb', line 212 def workflow_concurrency(to:, key:, **opts) concurrency_key_proc = key limits_concurrency( to:, key: proc { ctx = _context || Context.from_hash( job: self, workflow: self.class._workflow )._update_arguments((arguments.first || {}).symbolize_keys) concurrency_key_proc.call(ctx) }, **opts ) end |