Class: JobWorkflow::Context

Inherits:
Object
  • Object
show all
Defined in:
lib/job_workflow/context.rb

Overview

rubocop:disable Metrics/ClassLength

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(workflow:, arguments:, task_context:, output:, job_status:, job: nil) ⇒ Context

: (

  workflow: Workflow,
  arguments: Arguments,
  task_context: TaskContext,
  output: Output,
  job_status: JobStatus,
  ?job: DSL?
) -> void


55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/job_workflow/context.rb', line 55

def initialize(workflow:, arguments:, task_context:, output:, job_status:, job: nil) # rubocop:disable Metrics/ParameterLists
  raise "job does not match the provided workflow" if job&.then { |j| j.class._workflow != workflow }

  self.job = job
  self.workflow = workflow
  self.arguments = arguments
  self.task_context = task_context
  self.output = output
  self.job_status = job_status
  self.enabled_with_each_value = false
  self.throttle_index = 0
  self.skip_in_dry_run_index = 0
end

Instance Attribute Details

#argumentsObject

: Arguments



6
7
8
# File 'lib/job_workflow/context.rb', line 6

def arguments
  @arguments
end

#job_statusObject

: JobStatus



8
9
10
# File 'lib/job_workflow/context.rb', line 8

def job_status
  @job_status
end

#outputObject

: Output



7
8
9
# File 'lib/job_workflow/context.rb', line 7

def output
  @output
end

#workflowObject

: Workflow



5
6
7
# File 'lib/job_workflow/context.rb', line 5

def workflow
  @workflow
end

Class Method Details

.deserialize(hash) ⇒ Object

: (Hash[String, untyped]) -> Context



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/job_workflow/context.rb', line 25

def deserialize(hash)
  workflow = hash.fetch("workflow")
  new(
    job: hash["job"],
    workflow: hash.fetch("workflow"),
    arguments: Arguments.new(data: workflow.build_arguments_hash),
    task_context: TaskContext.deserialize(
      hash["task_context"].merge(
        "task" => workflow.fetch_task(
          hash.fetch(
            "task_context",
            {} #: Hash[String, untyped]
          )["task_name"]&.to_sym
        )
      )
    ),
    output: Output.deserialize(hash),
    job_status: JobStatus.deserialize(hash)
  )
end

.from_hash(hash) ⇒ Object

: (Hash[Symbol, untyped]) -> Context



12
13
14
15
16
17
18
19
20
21
22
# File 'lib/job_workflow/context.rb', line 12

def from_hash(hash)
  workflow = hash.fetch(:workflow)
  new(
    job: hash[:job],
    workflow:,
    arguments: Arguments.new(data: workflow.build_arguments_hash),
    task_context: TaskContext.new(**(hash[:task_context] || {}).symbolize_keys),
    output: Output.from_hash_array(hash.fetch(:task_outputs, [])),
    job_status: JobStatus.from_hash_array(hash.fetch(:task_job_statuses, []))
  )
end

Instance Method Details

#_add_task_output(task_output) ⇒ Object

: (TaskOutput) -> void



224
225
226
# File 'lib/job_workflow/context.rb', line 224

def _add_task_output(task_output)
  output.add_task_output(task_output)
end

#_jobObject

: () -> DSL?



86
87
88
# File 'lib/job_workflow/context.rb', line 86

def _job
  job
end

#_job=(job) ⇒ Object

: (DSL) -> void



81
82
83
# File 'lib/job_workflow/context.rb', line 81

def _job=(job)
  self.job = job
end

#_load_parent_task_outputObject

: () -> void



229
230
231
232
233
234
235
# File 'lib/job_workflow/context.rb', line 229

def _load_parent_task_output
  return unless sub_job?

  workflow_status = WorkflowStatus.find(parent_job_id)
  parent_context = workflow_status.context
  parent_context.output.flat_task_outputs.each { |task_output| output.add_task_output(task_output) }
end

#_task_contextObject

: () -> TaskContext



219
220
221
# File 'lib/job_workflow/context.rb', line 219

def _task_context
  task_context
end

#_update_arguments(other_arguments) ⇒ Object

: (Hash[Symbol, untyped]) -> Context



75
76
77
78
# File 'lib/job_workflow/context.rb', line 75

def _update_arguments(other_arguments)
  self.arguments = arguments.merge(other_arguments.symbolize_keys)
  self
end

#_with_each_value(task, start_index: nil) ⇒ Object

: (Task, ?start_index: Integer?) -> Enumerator



112
113
114
115
116
117
118
119
120
121
# File 'lib/job_workflow/context.rb', line 112

def _with_each_value(task, start_index: nil)
  raise "Nested _with_each_value calls are not allowed" if enabled_with_each_value

  self.enabled_with_each_value = true
  Enumerator.new do |y|
    with_task_context(task, y, start_index:)
  ensure
    self.enabled_with_each_value = false
  end
end

#_with_task_throttleObject

: () { () -> void } -> void



124
125
126
127
128
129
130
131
# File 'lib/job_workflow/context.rb', line 124

def _with_task_throttle(&)
  task = task_context.task || (raise "with_throttle can be called only within iterate_each_value")

  semaphore = task.throttle.semaphore
  return yield if semaphore.nil?

  semaphore.with(&)
end

#concurrency_keyObject

: () -> String?



104
105
106
107
108
109
# File 'lib/job_workflow/context.rb', line 104

def concurrency_key
  task = task_context.task
  return if task.nil?

  [task_context.parent_job_id, task.task_name].compact.join("/")
end

#dry_run?Boolean

: () -> bool

Returns:

  • (Boolean)


181
182
183
# File 'lib/job_workflow/context.rb', line 181

def dry_run?
  task_context.dry_run
end

#each_task_outputObject

: () -> TaskOutput?



208
209
210
211
212
213
214
215
216
# File 'lib/job_workflow/context.rb', line 208

def each_task_output
  task = task_context.task
  raise "each_task_output can be called only _with_task block" if task.nil?
  raise "each_task_output can be called only _with_each_value block" unless task_context.enabled?

  task_name = task.task_name
  each_index = task_context.index
  output.fetch(task_name:, each_index:)
end

#each_valueObject

: () -> untyped



201
202
203
204
205
# File 'lib/job_workflow/context.rb', line 201

def each_value
  raise "each_value can be called only within each_values block" unless task_context.enabled?

  task_context.value
end

#instrument(operation = "custom", **payload) ⇒ Object

Instruments a custom operation with ActiveSupport::Notifications. This creates a span in OpenTelemetry (if enabled) and logs the event.

: (?String, **untyped) { () -> untyped } -> untyped

Examples:

Basic usage

```ruby
ctx.instrument("api_call", endpoint: "/users") do
  HTTP.get("https://api.example.com/users")
end
```

With automatic operation name

```ruby
ctx.instrument do
  # operation name defaults to "custom"
  expensive_operation()
end
```


167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/job_workflow/context.rb', line 167

def instrument(operation = "custom", **payload, &)
  task = task_context.task
  full_payload = {
    job_id: job_id,
    job_name: job.class.name,
    task_name: task&.task_name,
    each_index: task_context.index,
    operation:,
    **payload
  }
  Instrumentation.instrument_custom(operation, full_payload, &)
end

#job_idObject

: () -> String



91
92
93
94
95
96
# File 'lib/job_workflow/context.rb', line 91

def job_id
  local_job = job
  raise "job is not set" if local_job.nil?

  local_job.job_id
end

#serializeObject

: () -> Hash[String, untyped]



70
71
72
# File 'lib/job_workflow/context.rb', line 70

def serialize
  sub_job? ? serialize_for_sub_job : serialize_for_job
end

#skip_in_dry_run(dry_run_name = nil, fallback: nil) ⇒ Object

: (?Symbol?, ?fallback: untyped) { () -> untyped } -> untyped



186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/job_workflow/context.rb', line 186

def skip_in_dry_run(dry_run_name = nil, fallback: nil)
  local_job = job
  task = task_context.task

  raise "job is not set" if local_job.nil?
  raise "skip_in_dry_run can be called only within with_task_context" if task.nil?

  current_index = skip_in_dry_run_index
  self.skip_in_dry_run_index += 1
  Instrumentation.instrument_dry_run(local_job, self, dry_run_name, current_index, dry_run?) do
    dry_run? ? fallback : yield
  end
end

#sub_job?Boolean

: () -> bool

Returns:

  • (Boolean)


99
100
101
# File 'lib/job_workflow/context.rb', line 99

def sub_job?
  parent_job_id != job_id
end

#throttle(limit:, key: nil, ttl: 180) ⇒ Object

: (limit: Integer, ?key: String?, ?ttl: Integer) { () -> void } -> void



134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/job_workflow/context.rb', line 134

def throttle(limit:, key: nil, ttl: 180, &)
  task = task_context.task || (raise "throttle can be called only in task")

  semaphore = Semaphore.new(
    concurrency_key: key || "#{task.throttle_prefix_key}:#{throttle_index}",
    concurrency_limit: limit,
    concurrency_duration: ttl.seconds
  )

  self.throttle_index += 1

  semaphore.with(&)
end