Module: JobWorkflow::DSL::ClassMethods
- Defined in:
- lib/job_workflow/dsl.rb
Instance Method Summary collapse
-
#after(*task_names, &block) ⇒ Object
: (*Symbol) { (Context) -> void } -> void.
-
#argument(argument_name, type, default: nil) ⇒ Object
: (Symbol argument_name, String type, ?default: untyped) -> void.
-
#around(*task_names, &block) ⇒ Object
: (*Symbol) { (Context, TaskCallable) -> void } -> void.
-
#before(*task_names, &block) ⇒ Object
: (*Symbol) { (Context) -> void } -> void.
-
#dry_run(value = nil, &block) ⇒ Object
: (?bool) ?{ (Context) -> bool } -> void.
-
#from_context(context) ⇒ Object
: (Context) -> DSL.
-
#on_error(*task_names, &block) ⇒ Object
: (*Symbol) { (Context, StandardError, Task) -> void } -> void.
-
#schedule(expression, key: nil, queue: nil, priority: nil, args: {}, description: nil) ⇒ Object
rubocop:disable Metrics/ParameterLists : ( String expression, ?key: (String | Symbol)?, ?queue: String?, ?priority: Integer?, ?args: Hash[Symbol, untyped], ?description: String? ) -> void.
-
#task(task_name, each: ->(_ctx) { [TaskContext::NULL_VALUE] }, enqueue: nil, retry: 0, output: {}, depends_on: [], condition: ->(_ctx) { true }, throttle: {}, timeout: nil, dependency_wait: {}, dry_run: false, &block) ⇒ Object
rubocop:disable Metrics/ParameterLists.
-
#workflow_concurrency(to:, key:, **opts) ⇒ Object
Configures concurrency limits for this workflow job.
Instance Method Details
#after(*task_names, &block) ⇒ Object
: (*Symbol) { (Context) -> void } -> void
172 173 174 |
# File 'lib/job_workflow/dsl.rb', line 172 def after(*task_names, &block) _workflow.add_hook(:after, task_names:, block:) end |
#argument(argument_name, type, default: nil) ⇒ Object
: (Symbol argument_name, String type, ?default: untyped) -> void
110 111 112 |
# File 'lib/job_workflow/dsl.rb', line 110 def argument(argument_name, type, default: nil) _workflow.add_argument(ArgumentDef.new(name: argument_name, type:, default:)) end |
#around(*task_names, &block) ⇒ Object
: (*Symbol) { (Context, TaskCallable) -> void } -> void
177 178 179 |
# File 'lib/job_workflow/dsl.rb', line 177 def around(*task_names, &block) _workflow.add_hook(:around, task_names:, block:) end |
#before(*task_names, &block) ⇒ Object
: (*Symbol) { (Context) -> void } -> void
167 168 169 |
# File 'lib/job_workflow/dsl.rb', line 167 def before(*task_names, &block) _workflow.add_hook(:before, task_names:, block:) end |
#dry_run(value = nil, &block) ⇒ Object
: (?bool) ?{ (Context) -> bool } -> void
231 232 233 |
# File 'lib/job_workflow/dsl.rb', line 231 def dry_run(value = nil, &block) _workflow.dry_run_config = block || value end |
#from_context(context) ⇒ Object
: (Context) -> DSL
99 100 101 102 103 104 105 106 107 |
# File 'lib/job_workflow/dsl.rb', line 99 def from_context(context) # rubocop:disable Metrics/AbcSize new_context = context.dup task = new_context._task_context.task job = new(new_context.arguments.to_h) new_context._job = job job._context = new_context job.set(queue: task.enqueue.queue) if !task.nil? && !task.enqueue.queue.nil? job end |
#on_error(*task_names, &block) ⇒ Object
: (*Symbol) { (Context, StandardError, Task) -> void } -> void
182 183 184 |
# File 'lib/job_workflow/dsl.rb', line 182 def on_error(*task_names, &block) _workflow.add_hook(:error, task_names:, block:) end |
#schedule(expression, key: nil, queue: nil, priority: nil, args: {}, description: nil) ⇒ Object
rubocop:disable Metrics/ParameterLists : (
String expression,
?key: (String | Symbol)?,
?queue: String?,
?priority: Integer?,
?args: Hash[Symbol, untyped],
?description: String?
) -> void
244 245 246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/job_workflow/dsl.rb', line 244 def schedule(expression, key: nil, queue: nil, priority: nil, args: {}, description: nil) _workflow.add_schedule( Schedule.new( expression:, class_name: name, key:, queue:, priority:, args:, description: ) ) end |
#task(task_name, each: ->(_ctx) { [TaskContext::NULL_VALUE] }, enqueue: nil, retry: 0, output: {}, depends_on: [], condition: ->(_ctx) { true }, throttle: {}, timeout: nil, dependency_wait: {}, dry_run: false, &block) ⇒ Object
rubocop:disable Metrics/ParameterLists
: (
Symbol task_name,
?each: ^(Context) -> untyped,
?enqueue: true | false | ^(Context) -> bool | Hash[Symbol, untyped],
?retry: Integer | Hash[Symbol, untyped],
?output: Hash[Symbol, String],
?depends_on: Array[Symbol],
?condition: ^(Context) -> bool,
?throttle: Integer | Hash[Symbol, untyped],
?timeout: Numeric?,
?dependency_wait: Hash[Symbol, untyped],
?dry_run: bool | ^(Context) -> bool
) { (untyped) -> void } -> void
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/job_workflow/dsl.rb', line 129 def task( task_name, each: ->(_ctx) { [TaskContext::NULL_VALUE] }, enqueue: nil, retry: 0, output: {}, depends_on: [], condition: ->(_ctx) { true }, throttle: {}, timeout: nil, dependency_wait: {}, dry_run: false, &block ) new_task = Task.new( job_name: name, name: task_name, block: block, enqueue:, each:, task_retry: binding.local_variable_get(:retry), output:, depends_on:, condition:, throttle:, timeout:, dependency_wait:, dry_run: ) _workflow.add_task(new_task) if new_task.enqueue.should_limits_concurrency? # rubocop:disable Style/GuardClause concurrency = new_task.enqueue.concurrency #: Integer workflow_concurrency(to: concurrency, key: :concurrency_key.to_proc) end end |
#workflow_concurrency(to:, key:, **opts) ⇒ Object
Configures concurrency limits for this workflow job.
Unlike ‘limits_concurrency` (SolidQueue’s raw API), this method passes a Context as the first argument to the key Proc, giving access to workflow-aware information such as ‘arguments`, `sub_job?`, and `concurrency_key`.
When ‘_context` is not yet initialized (e.g. during enqueue before perform), a temporary Context is built from the job’s arguments so the key Proc can always rely on ‘ctx.arguments`.
: (
to: Integer,
key: ^(Context) -> String?,
?duration: ActiveSupport::Duration?,
?group: String?,
?on_conflict: Symbol?
) -> void
216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/job_workflow/dsl.rb', line 216 def workflow_concurrency(to:, key:, **opts) concurrency_key_proc = key limits_concurrency( to:, key: proc { ctx = _context || Context.from_hash( job: self, workflow: self.class._workflow )._update_arguments((arguments.first || {}).symbolize_keys) concurrency_key_proc.call(ctx) }, **opts ) end |