Class: JobWorkflow::Context

Inherits:
Object
  • Object
show all
Defined in:
lib/job_workflow/context.rb

Overview

rubocop:disable Metrics/ClassLength

Constant Summary collapse

EACH_TASK_CURSOR_MARKER =
"__job_workflow_each_cursor__"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(workflow:, arguments:, task_context:, output:, job_status:, job: nil) ⇒ Context

: (

  workflow: Workflow,
  arguments: Arguments,
  task_context: TaskContext,
  output: Output,
  job_status: JobStatus,
  ?job: _JobInterface?
) -> void


57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/job_workflow/context.rb', line 57

def initialize(workflow:, arguments:, task_context:, output:, job_status:, job: nil) # rubocop:disable Metrics/ParameterLists, Metrics/AbcSize, Metrics/MethodLength
  if job&.class.respond_to?(:_workflow) && job.class._workflow != workflow
    raise "job does not match the provided workflow"
  end

  self.job = job
  self.workflow = workflow
  self.arguments = arguments
  self.task_context = task_context
  self.output = output
  self.job_status = job_status
  self.enabled_with_each_value = false
  self.throttle_index = 0
  self.skip_in_dry_run_index = 0
  self.current_step = nil
  self.current_cursor = nil
end

Instance Attribute Details

#argumentsObject

: Arguments



8
9
10
# File 'lib/job_workflow/context.rb', line 8

def arguments
  @arguments
end

#job_statusObject

: JobStatus



10
11
12
# File 'lib/job_workflow/context.rb', line 10

def job_status
  @job_status
end

#outputObject

: Output



9
10
11
# File 'lib/job_workflow/context.rb', line 9

def output
  @output
end

#workflowObject

: Workflow



7
8
9
# File 'lib/job_workflow/context.rb', line 7

def workflow
  @workflow
end

Class Method Details

.deserialize(hash) ⇒ Object

: (Hash[String, untyped]) -> Context



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/job_workflow/context.rb', line 27

def deserialize(hash)
  workflow = hash.fetch("workflow")
  new(
    job: hash["job"],
    workflow: hash.fetch("workflow"),
    arguments: Arguments.new(data: workflow.build_arguments_hash),
    task_context: TaskContext.deserialize(
      hash["task_context"].merge(
        "task" => workflow.fetch_task(
          hash.fetch(
            "task_context",
            {} #: Hash[String, untyped]
          )["task_name"]&.to_sym
        )
      )
    ),
    output: Output.deserialize(hash),
    job_status: JobStatus.deserialize(hash)
  )
end

.from_hash(hash) ⇒ Object

: (Hash[Symbol, untyped]) -> Context



14
15
16
17
18
19
20
21
22
23
24
# File 'lib/job_workflow/context.rb', line 14

def from_hash(hash)
  workflow = hash.fetch(:workflow)
  new(
    job: hash[:job],
    workflow:,
    arguments: Arguments.new(data: workflow.build_arguments_hash),
    task_context: TaskContext.new(**(hash[:task_context] || {}).symbolize_keys),
    output: Output.from_hash_array(hash.fetch(:task_outputs, [])),
    job_status: JobStatus.from_hash_array(hash.fetch(:task_job_statuses, []))
  )
end

Instance Method Details

#_add_task_output(task_output) ⇒ Object

: (TaskOutput) -> void



267
268
269
# File 'lib/job_workflow/context.rb', line 267

def _add_task_output(task_output)
  output.add_task_output(task_output)
end

#_jobObject

: () -> _JobInterface?



117
118
119
# File 'lib/job_workflow/context.rb', line 117

def _job
  job
end

#_job=(job) ⇒ Object

: (_JobInterface) -> void



112
113
114
# File 'lib/job_workflow/context.rb', line 112

def _job=(job)
  self.job = job
end

#_load_parent_task_outputObject

: () -> void



272
273
274
275
276
277
278
# File 'lib/job_workflow/context.rb', line 272

def _load_parent_task_output
  return unless sub_job?

  workflow_status = WorkflowStatus.find(parent_job_id)
  parent_context = workflow_status.context
  parent_context.output.flat_task_outputs.each { |task_output| output.add_task_output(task_output) }
end

#_task_contextObject

: () -> TaskContext



250
251
252
# File 'lib/job_workflow/context.rb', line 250

def _task_context
  task_context
end

#_update_arguments(other_arguments) ⇒ Object

: (Hash[Symbol, untyped]) -> Context



81
82
83
84
# File 'lib/job_workflow/context.rb', line 81

def _update_arguments(other_arguments)
  self.arguments = arguments.merge(other_arguments.symbolize_keys)
  self
end

#_with_current_step(step, cursor: nil) ⇒ Object

: (ActiveJob::Continuation::Step, ?cursor: untyped) { () -> void } -> void



255
256
257
258
259
260
261
262
263
264
# File 'lib/job_workflow/context.rb', line 255

def _with_current_step(step, cursor: nil)
  previous_step = current_step
  previous_cursor = current_cursor
  self.current_step = step
  self.current_cursor = cursor
  yield
ensure
  self.current_step = previous_step
  self.current_cursor = previous_cursor
end

#_with_each_value(task, start_index: nil) ⇒ Object

: (Task, ?start_index: Integer?) -> Enumerator



143
144
145
146
147
148
149
150
151
152
# File 'lib/job_workflow/context.rb', line 143

def _with_each_value(task, start_index: nil)
  raise "Nested _with_each_value calls are not allowed" if enabled_with_each_value

  self.enabled_with_each_value = true
  Enumerator.new do |y|
    with_task_context(task, y, start_index:)
  ensure
    self.enabled_with_each_value = false
  end
end

#_with_task_throttleObject

: () { () -> void } -> void



155
156
157
158
159
160
161
162
# File 'lib/job_workflow/context.rb', line 155

def _with_task_throttle(&)
  task = task_context.task || (raise "with_throttle can be called only within iterate_each_value")

  semaphore = task.throttle.semaphore
  return yield if semaphore.nil?

  semaphore.with(&)
end

#checkpoint!Object

: () -> void



103
104
105
106
107
108
109
# File 'lib/job_workflow/context.rb', line 103

def checkpoint!
  step = current_step || (raise "checkpoint! can be called only in task")

  return step.checkpoint! unless each_task?

  step.set!(build_step_cursor(current_cursor))
end

#concurrency_keyObject

: () -> String?



135
136
137
138
139
140
# File 'lib/job_workflow/context.rb', line 135

def concurrency_key
  task = task_context.task
  return if task.nil?

  [task_context.parent_job_id, task.task_name].compact.join("/")
end

#cursorObject

: () -> untyped



87
88
89
90
91
# File 'lib/job_workflow/context.rb', line 87

def cursor
  return if current_step.nil?

  current_cursor
end

#dry_run?Boolean

: () -> bool

Returns:

  • (Boolean)


212
213
214
# File 'lib/job_workflow/context.rb', line 212

def dry_run?
  task_context.dry_run
end

#each_task_outputObject

: () -> TaskOutput?



239
240
241
242
243
244
245
246
247
# File 'lib/job_workflow/context.rb', line 239

def each_task_output
  task = task_context.task
  raise "each_task_output can be called only _with_task block" if task.nil?
  raise "each_task_output can be called only _with_each_value block" unless task_context.enabled?

  task_name = task.task_name
  each_index = task_context.index
  output.fetch(task_name:, each_index:)
end

#each_valueObject

: () -> untyped



232
233
234
235
236
# File 'lib/job_workflow/context.rb', line 232

def each_value
  raise "each_value can be called only within each_values block" unless task_context.enabled?

  task_context.value
end

#instrument(operation = "custom", **payload) ⇒ Object

Instruments a custom operation with ActiveSupport::Notifications. This creates a span in OpenTelemetry (if enabled) and logs the event.

: (?String, **untyped) { () -> untyped } -> untyped

Examples:

Basic usage

```ruby
ctx.instrument("api_call", endpoint: "/users") do
  HTTP.get("https://api.example.com/users")
end
```

With automatic operation name

```ruby
ctx.instrument do
  # operation name defaults to "custom"
  expensive_operation()
end
```


198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/job_workflow/context.rb', line 198

def instrument(operation = "custom", **payload, &)
  task = task_context.task
  full_payload = {
    job_id: job_id,
    job_name: job.class.name,
    task_name: task&.task_name,
    each_index: task_context.index,
    operation:,
    **payload
  }
  Instrumentation.instrument_custom(operation, full_payload, &)
end

#job_idObject

: () -> String



122
123
124
125
126
127
# File 'lib/job_workflow/context.rb', line 122

def job_id
  local_job = job
  raise "job is not set" if local_job.nil?

  local_job.job_id
end

#serializeObject

: () -> Hash[String, untyped]



76
77
78
# File 'lib/job_workflow/context.rb', line 76

def serialize
  sub_job? ? serialize_for_sub_job : serialize_for_job
end

#set_cursor!(value) ⇒ Object

: (untyped) -> void



94
95
96
97
98
99
100
# File 'lib/job_workflow/context.rb', line 94

def set_cursor!(value)
  step = current_step || (raise "set_cursor! can be called only in task")

  ActiveJob::Arguments.serialize([value])
  self.current_cursor = value
  step.set!(build_step_cursor(value))
end

#skip_in_dry_run(dry_run_name = nil, fallback: nil) ⇒ Object

: (?Symbol?, ?fallback: untyped) { () -> untyped } -> untyped



217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/job_workflow/context.rb', line 217

def skip_in_dry_run(dry_run_name = nil, fallback: nil)
  local_job = job
  task = task_context.task

  raise "job is not set" if local_job.nil?
  raise "skip_in_dry_run can be called only within with_task_context" if task.nil?

  current_index = skip_in_dry_run_index
  self.skip_in_dry_run_index += 1
  Instrumentation.instrument_dry_run(local_job, self, dry_run_name, current_index, dry_run?) do
    dry_run? ? fallback : yield
  end
end

#sub_job?Boolean

: () -> bool

Returns:

  • (Boolean)


130
131
132
# File 'lib/job_workflow/context.rb', line 130

def sub_job?
  parent_job_id != job_id
end

#throttle(limit:, key: nil, ttl: 180) ⇒ Object

: (limit: Integer, ?key: String?, ?ttl: Integer) { () -> void } -> void



165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/job_workflow/context.rb', line 165

def throttle(limit:, key: nil, ttl: 180, &)
  task = task_context.task || (raise "throttle can be called only in task")

  semaphore = Semaphore.new(
    concurrency_key: key || "#{task.throttle_prefix_key}:#{throttle_index}",
    concurrency_limit: limit,
    concurrency_duration: ttl.seconds
  )

  self.throttle_index += 1

  semaphore.with(&)
end