Class: JobWorkflow::Context
- Inherits:
-
Object
- Object
- JobWorkflow::Context
- Defined in:
- lib/job_workflow/context.rb
Overview
rubocop:disable Metrics/ClassLength
Instance Attribute Summary collapse
-
#arguments ⇒ Object
readonly
: Arguments.
-
#job_status ⇒ Object
readonly
: JobStatus.
-
#output ⇒ Object
readonly
: Output.
-
#workflow ⇒ Object
readonly
: Workflow.
Class Method Summary collapse
-
.deserialize(hash) ⇒ Object
: (Hash[String, untyped]) -> Context.
-
.from_hash(hash) ⇒ Object
: (Hash[Symbol, untyped]) -> Context.
Instance Method Summary collapse
-
#_add_task_output(task_output) ⇒ Object
: (TaskOutput) -> void.
-
#_job ⇒ Object
: () -> DSL?.
-
#_job=(job) ⇒ Object
: (DSL) -> void.
-
#_load_parent_task_output ⇒ Object
: () -> void.
-
#_task_context ⇒ Object
: () -> TaskContext.
-
#_update_arguments(other_arguments) ⇒ Object
: (Hash[Symbol, untyped]) -> Context.
-
#_with_each_value(task, start_index: nil) ⇒ Object
: (Task, ?start_index: Integer?) -> Enumerator.
-
#_with_task_throttle ⇒ Object
: () { () -> void } -> void.
-
#concurrency_key ⇒ Object
: () -> String?.
-
#dry_run? ⇒ Boolean
: () -> bool.
-
#each_task_output ⇒ Object
: () -> TaskOutput?.
-
#each_value ⇒ Object
: () -> untyped.
-
#initialize(workflow:, arguments:, task_context:, output:, job_status:, job: nil) ⇒ Context
constructor
: ( workflow: Workflow, arguments: Arguments, task_context: TaskContext, output: Output, job_status: JobStatus, ?job: DSL? ) -> void.
-
#instrument(operation = "custom", **payload) ⇒ Object
Instruments a custom operation with ActiveSupport::Notifications.
-
#job_id ⇒ Object
: () -> String.
-
#serialize ⇒ Object
: () -> Hash[String, untyped].
-
#skip_in_dry_run(dry_run_name = nil, fallback: nil) ⇒ Object
: (?Symbol?, ?fallback: untyped) { () -> untyped } -> untyped.
-
#sub_job? ⇒ Boolean
: () -> bool.
-
#throttle(limit:, key: nil, ttl: 180) ⇒ Object
: (limit: Integer, ?key: String?, ?ttl: Integer) { () -> void } -> void.
Constructor Details
#initialize(workflow:, arguments:, task_context:, output:, job_status:, job: nil) ⇒ Context
: (
workflow: Workflow,
arguments: Arguments,
task_context: TaskContext,
output: Output,
job_status: JobStatus,
?job: DSL?
) -> void
55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/job_workflow/context.rb', line 55 def initialize(workflow:, arguments:, task_context:, output:, job_status:, job: nil) # rubocop:disable Metrics/ParameterLists raise "job does not match the provided workflow" if job&.then { |j| j.class._workflow != workflow } self.job = job self.workflow = workflow self.arguments = arguments self.task_context = task_context self.output = output self.job_status = job_status self.enabled_with_each_value = false self.throttle_index = 0 self.skip_in_dry_run_index = 0 end |
Instance Attribute Details
#arguments ⇒ Object
: Arguments
6 7 8 |
# File 'lib/job_workflow/context.rb', line 6 def arguments @arguments end |
#job_status ⇒ Object
: JobStatus
8 9 10 |
# File 'lib/job_workflow/context.rb', line 8 def job_status @job_status end |
#output ⇒ Object
: Output
7 8 9 |
# File 'lib/job_workflow/context.rb', line 7 def output @output end |
#workflow ⇒ Object
: Workflow
5 6 7 |
# File 'lib/job_workflow/context.rb', line 5 def workflow @workflow end |
Class Method Details
.deserialize(hash) ⇒ Object
: (Hash[String, untyped]) -> Context
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/job_workflow/context.rb', line 25 def deserialize(hash) workflow = hash.fetch("workflow") new( job: hash["job"], workflow: hash.fetch("workflow"), arguments: Arguments.new(data: workflow.build_arguments_hash), task_context: TaskContext.deserialize( hash["task_context"].merge( "task" => workflow.fetch_task( hash.fetch( "task_context", {} #: Hash[String, untyped] )["task_name"]&.to_sym ) ) ), output: Output.deserialize(hash), job_status: JobStatus.deserialize(hash) ) end |
.from_hash(hash) ⇒ Object
: (Hash[Symbol, untyped]) -> Context
12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/job_workflow/context.rb', line 12 def from_hash(hash) workflow = hash.fetch(:workflow) new( job: hash[:job], workflow:, arguments: Arguments.new(data: workflow.build_arguments_hash), task_context: TaskContext.new(**(hash[:task_context] || {}).symbolize_keys), output: Output.from_hash_array(hash.fetch(:task_outputs, [])), job_status: JobStatus.from_hash_array(hash.fetch(:task_job_statuses, [])) ) end |
Instance Method Details
#_add_task_output(task_output) ⇒ Object
: (TaskOutput) -> void
224 225 226 |
# File 'lib/job_workflow/context.rb', line 224 def _add_task_output(task_output) output.add_task_output(task_output) end |
#_job ⇒ Object
: () -> DSL?
86 87 88 |
# File 'lib/job_workflow/context.rb', line 86 def _job job end |
#_job=(job) ⇒ Object
: (DSL) -> void
81 82 83 |
# File 'lib/job_workflow/context.rb', line 81 def _job=(job) self.job = job end |
#_load_parent_task_output ⇒ Object
: () -> void
229 230 231 232 233 234 235 |
# File 'lib/job_workflow/context.rb', line 229 def _load_parent_task_output return unless sub_job? workflow_status = WorkflowStatus.find(parent_job_id) parent_context = workflow_status.context parent_context.output.flat_task_outputs.each { |task_output| output.add_task_output(task_output) } end |
#_task_context ⇒ Object
: () -> TaskContext
219 220 221 |
# File 'lib/job_workflow/context.rb', line 219 def _task_context task_context end |
#_update_arguments(other_arguments) ⇒ Object
: (Hash[Symbol, untyped]) -> Context
75 76 77 78 |
# File 'lib/job_workflow/context.rb', line 75 def _update_arguments(other_arguments) self.arguments = arguments.merge(other_arguments.symbolize_keys) self end |
#_with_each_value(task, start_index: nil) ⇒ Object
: (Task, ?start_index: Integer?) -> Enumerator
112 113 114 115 116 117 118 119 120 121 |
# File 'lib/job_workflow/context.rb', line 112 def _with_each_value(task, start_index: nil) raise "Nested _with_each_value calls are not allowed" if enabled_with_each_value self.enabled_with_each_value = true Enumerator.new do |y| with_task_context(task, y, start_index:) ensure self.enabled_with_each_value = false end end |
#_with_task_throttle ⇒ Object
: () { () -> void } -> void
124 125 126 127 128 129 130 131 |
# File 'lib/job_workflow/context.rb', line 124 def _with_task_throttle(&) task = task_context.task || (raise "with_throttle can be called only within iterate_each_value") semaphore = task.throttle.semaphore return yield if semaphore.nil? semaphore.with(&) end |
#concurrency_key ⇒ Object
: () -> String?
104 105 106 107 108 109 |
# File 'lib/job_workflow/context.rb', line 104 def concurrency_key task = task_context.task return if task.nil? [task_context.parent_job_id, task.task_name].compact.join("/") end |
#dry_run? ⇒ Boolean
: () -> bool
181 182 183 |
# File 'lib/job_workflow/context.rb', line 181 def dry_run? task_context.dry_run end |
#each_task_output ⇒ Object
: () -> TaskOutput?
208 209 210 211 212 213 214 215 216 |
# File 'lib/job_workflow/context.rb', line 208 def each_task_output task = task_context.task raise "each_task_output can be called only _with_task block" if task.nil? raise "each_task_output can be called only _with_each_value block" unless task_context.enabled? task_name = task.task_name each_index = task_context.index output.fetch(task_name:, each_index:) end |
#each_value ⇒ Object
: () -> untyped
201 202 203 204 205 |
# File 'lib/job_workflow/context.rb', line 201 def each_value raise "each_value can be called only within each_values block" unless task_context.enabled? task_context.value end |
#instrument(operation = "custom", **payload) ⇒ Object
Instruments a custom operation with ActiveSupport::Notifications. This creates a span in OpenTelemetry (if enabled) and logs the event.
: (?String, **untyped) { () -> untyped } -> untyped
167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/job_workflow/context.rb', line 167 def instrument(operation = "custom", **payload, &) task = task_context.task full_payload = { job_id: job_id, job_name: job.class.name, task_name: task&.task_name, each_index: task_context.index, operation:, **payload } Instrumentation.instrument_custom(operation, full_payload, &) end |
#job_id ⇒ Object
: () -> String
91 92 93 94 95 96 |
# File 'lib/job_workflow/context.rb', line 91 def job_id local_job = job raise "job is not set" if local_job.nil? local_job.job_id end |
#serialize ⇒ Object
: () -> Hash[String, untyped]
70 71 72 |
# File 'lib/job_workflow/context.rb', line 70 def serialize sub_job? ? serialize_for_sub_job : serialize_for_job end |
#skip_in_dry_run(dry_run_name = nil, fallback: nil) ⇒ Object
: (?Symbol?, ?fallback: untyped) { () -> untyped } -> untyped
186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/job_workflow/context.rb', line 186 def skip_in_dry_run(dry_run_name = nil, fallback: nil) local_job = job task = task_context.task raise "job is not set" if local_job.nil? raise "skip_in_dry_run can be called only within with_task_context" if task.nil? current_index = skip_in_dry_run_index self.skip_in_dry_run_index += 1 Instrumentation.instrument_dry_run(local_job, self, dry_run_name, current_index, dry_run?) do dry_run? ? fallback : yield end end |
#sub_job? ⇒ Boolean
: () -> bool
99 100 101 |
# File 'lib/job_workflow/context.rb', line 99 def sub_job? parent_job_id != job_id end |
#throttle(limit:, key: nil, ttl: 180) ⇒ Object
: (limit: Integer, ?key: String?, ?ttl: Integer) { () -> void } -> void
134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/job_workflow/context.rb', line 134 def throttle(limit:, key: nil, ttl: 180, &) task = task_context.task || (raise "throttle can be called only in task") semaphore = Semaphore.new( concurrency_key: key || "#{task.throttle_prefix_key}:#{throttle_index}", concurrency_limit: limit, concurrency_duration: ttl.seconds ) self.throttle_index += 1 semaphore.with(&) end |