Module: JobWorkflow::Instrumentation

Defined in:
lib/job_workflow/instrumentation.rb,
lib/job_workflow/instrumentation/log_subscriber.rb,
lib/job_workflow/instrumentation/opentelemetry_subscriber.rb

Overview

Instrumentation provides ActiveSupport::Notifications-based event instrumentation for JobWorkflow workflows and tasks.

Examples:

Subscribing to events

```ruby
ActiveSupport::Notifications.subscribe("task.start.job_workflow") do |name, start, finish, id, payload|
  puts "Task #{payload[:task_name]} started"
end
```

Defined Under Namespace

Modules: Events Classes: LogSubscriber, OpenTelemetrySubscriber

Constant Summary collapse

NAMESPACE =

rubocop:disable Metrics/ModuleLength

"job_workflow"

Class Method Summary collapse

Class Method Details

.instrument_custom(operation, payload = {}) ⇒ Object

: (String, Hash[Symbol, untyped]) { () -> untyped } -> untyped



118
119
120
121
# File 'lib/job_workflow/instrumentation.rb', line 118

def instrument_custom(operation, payload = {}, &)
  event_name = "#{operation}.#{NAMESPACE}"
  instrument(event_name, payload, &)
end

.instrument_dependent_wait(job, task) ⇒ Object

: (DSL, Task) { () -> untyped } -> untyped



77
78
79
80
81
82
83
# File 'lib/job_workflow/instrumentation.rb', line 77

def instrument_dependent_wait(job, task, &)
  payload = build_dependent_payload(job, task)
  instrument(Events::DEPENDENT_WAIT_START, payload)
  instrument(Events::DEPENDENT_WAIT, payload, &)
ensure
  instrument(Events::DEPENDENT_WAIT_COMPLETE, payload)
end

.instrument_dry_run(job, ctx, dry_run_name, skip_in_dry_run_index, dry_run) ⇒ Object

: (DSL, Context, Symbol?, Integer, bool) { () -> untyped } -> untyped



124
125
126
127
128
129
# File 'lib/job_workflow/instrumentation.rb', line 124

def instrument_dry_run(job, ctx, dry_run_name, skip_in_dry_run_index, dry_run, &)
  start_event = dry_run ? Events::DRY_RUN_SKIP : Events::DRY_RUN_EXECUTE
  payload = build_skip_in_dry_run_payload(job, ctx, dry_run_name, skip_in_dry_run_index, dry_run)
  instrument(start_event, payload)
  instrument(Events::DRY_RUN, payload, &)
end

.instrument_task(job, task, ctx) ⇒ Object

: (DSL, Task, Context) { () -> untyped } -> untyped



53
54
55
56
57
58
59
# File 'lib/job_workflow/instrumentation.rb', line 53

def instrument_task(job, task, ctx, &)
  payload = build_task_payload(job, task, ctx)
  instrument(Events::TASK_START, payload)
  instrument(Events::TASK, payload, &)
ensure
  instrument(Events::TASK_COMPLETE, payload)
end

.instrument_throttle(semaphore) ⇒ Object

: (Semaphore) { () -> untyped } -> untyped



94
95
96
97
98
99
100
# File 'lib/job_workflow/instrumentation.rb', line 94

def instrument_throttle(semaphore, &)
  payload = build_throttle_payload(semaphore)
  instrument(Events::THROTTLE_ACQUIRE_START, payload)
  instrument(Events::THROTTLE_ACQUIRE, payload, &)
ensure
  instrument(Events::THROTTLE_ACQUIRE_COMPLETE, payload)
end

.instrument_workflow(job) ⇒ Object

: (DSL) { () -> untyped } -> untyped



44
45
46
47
48
49
50
# File 'lib/job_workflow/instrumentation.rb', line 44

def instrument_workflow(job, &)
  payload = build_workflow_payload(job)
  instrument(Events::WORKFLOW_START, payload)
  instrument(Events::WORKFLOW, payload, &)
ensure
  instrument(Events::WORKFLOW_COMPLETE, payload)
end

.notify_dependent_reschedule(job, task, reschedule_delay, poll_count) ⇒ Object

: (DSL, Task, Numeric, Integer) -> void



86
87
88
89
90
91
# File 'lib/job_workflow/instrumentation.rb', line 86

def notify_dependent_reschedule(job, task, reschedule_delay, poll_count)
  instrument(
    Events::DEPENDENT_RESCHEDULE,
    build_dependent_reschedule_payload(job, task, reschedule_delay, poll_count)
  )
end

.notify_queue_pause(queue_name) ⇒ Object

: (String) -> void



108
109
110
# File 'lib/job_workflow/instrumentation.rb', line 108

def notify_queue_pause(queue_name)
  instrument(Events::QUEUE_PAUSE, build_queue_payload(queue_name))
end

.notify_queue_resume(queue_name) ⇒ Object

: (String) -> void



113
114
115
# File 'lib/job_workflow/instrumentation.rb', line 113

def notify_queue_resume(queue_name)
  instrument(Events::QUEUE_RESUME, build_queue_payload(queue_name))
end

.notify_task_enqueue(job, task, sub_job_count) ⇒ Object

: (DSL, Task, Integer) -> void



67
68
69
# File 'lib/job_workflow/instrumentation.rb', line 67

def notify_task_enqueue(job, task, sub_job_count)
  instrument(Events::TASK_ENQUEUE, build_task_enqueue_payload(job, task, sub_job_count))
end

.notify_task_retry(task, ctx, job_id, attempt, delay, error) ⇒ Object

: (Task, Context, String, Integer, Float, StandardError) -> void



72
73
74
# File 'lib/job_workflow/instrumentation.rb', line 72

def notify_task_retry(task, ctx, job_id, attempt, delay, error) # rubocop:disable Metrics/ParameterLists
  instrument(Events::TASK_RETRY, build_task_retry_payload(task, ctx, job_id, attempt, delay, error))
end

.notify_task_skip(job, task, reason) ⇒ Object

: (DSL, Task, String) -> void



62
63
64
# File 'lib/job_workflow/instrumentation.rb', line 62

def notify_task_skip(job, task, reason)
  instrument(Events::TASK_SKIP, build_task_skip_payload(job, task, reason))
end

.notify_throttle_release(semaphore) ⇒ Object

: (Semaphore) -> void



103
104
105
# File 'lib/job_workflow/instrumentation.rb', line 103

def notify_throttle_release(semaphore)
  instrument(Events::THROTTLE_RELEASE, build_throttle_payload(semaphore))
end