Module: JobWorkflow::Instrumentation
- Defined in:
- lib/job_workflow/instrumentation.rb,
lib/job_workflow/instrumentation/log_subscriber.rb,
lib/job_workflow/instrumentation/opentelemetry_subscriber.rb
Overview
Instrumentation provides ActiveSupport::Notifications-based event instrumentation for JobWorkflow workflows and tasks.
Defined Under Namespace
Modules: Events Classes: LogSubscriber, OpenTelemetrySubscriber
Constant Summary collapse
- NAMESPACE =
rubocop:disable Metrics/ModuleLength
"job_workflow"
Class Method Summary collapse
-
.instrument_custom(operation, payload = {}) ⇒ Object
: (String, Hash[Symbol, untyped]) { () -> untyped } -> untyped.
-
.instrument_dependent_wait(job, task) ⇒ Object
: (DSL, Task) { () -> untyped } -> untyped.
-
.instrument_dry_run(job, ctx, dry_run_name, skip_in_dry_run_index, dry_run) ⇒ Object
: (DSL, Context, Symbol?, Integer, bool) { () -> untyped } -> untyped.
-
.instrument_task(job, task, ctx) ⇒ Object
: (DSL, Task, Context) { () -> untyped } -> untyped.
-
.instrument_throttle(semaphore) ⇒ Object
: (Semaphore) { () -> untyped } -> untyped.
-
.instrument_workflow(job) ⇒ Object
: (DSL) { () -> untyped } -> untyped.
-
.notify_dependent_reschedule(job, task, reschedule_delay, poll_count) ⇒ Object
: (DSL, Task, Numeric, Integer) -> void.
-
.notify_queue_pause(queue_name) ⇒ Object
: (String) -> void.
-
.notify_queue_resume(queue_name) ⇒ Object
: (String) -> void.
-
.notify_task_enqueue(job, task, sub_job_count) ⇒ Object
: (DSL, Task, Integer) -> void.
-
.notify_task_retry(task, ctx, job_id, attempt, delay, error) ⇒ Object
: (Task, Context, String, Integer, Float, StandardError) -> void.
-
.notify_task_skip(job, task, reason) ⇒ Object
: (DSL, Task, String) -> void.
-
.notify_throttle_release(semaphore) ⇒ Object
: (Semaphore) -> void.
Class Method Details
.instrument_custom(operation, payload = {}) ⇒ Object
: (String, Hash[Symbol, untyped]) { () -> untyped } -> untyped
118 119 120 121 |
# File 'lib/job_workflow/instrumentation.rb', line 118 def instrument_custom(operation, payload = {}, &) event_name = "#{operation}.#{NAMESPACE}" instrument(event_name, payload, &) end |
.instrument_dependent_wait(job, task) ⇒ Object
: (DSL, Task) { () -> untyped } -> untyped
77 78 79 80 81 82 83 |
# File 'lib/job_workflow/instrumentation.rb', line 77 def instrument_dependent_wait(job, task, &) payload = build_dependent_payload(job, task) instrument(Events::DEPENDENT_WAIT_START, payload) instrument(Events::DEPENDENT_WAIT, payload, &) ensure instrument(Events::DEPENDENT_WAIT_COMPLETE, payload) end |
.instrument_dry_run(job, ctx, dry_run_name, skip_in_dry_run_index, dry_run) ⇒ Object
: (DSL, Context, Symbol?, Integer, bool) { () -> untyped } -> untyped
124 125 126 127 128 129 |
# File 'lib/job_workflow/instrumentation.rb', line 124 def instrument_dry_run(job, ctx, dry_run_name, skip_in_dry_run_index, dry_run, &) start_event = dry_run ? Events::DRY_RUN_SKIP : Events::DRY_RUN_EXECUTE payload = build_skip_in_dry_run_payload(job, ctx, dry_run_name, skip_in_dry_run_index, dry_run) instrument(start_event, payload) instrument(Events::DRY_RUN, payload, &) end |
.instrument_task(job, task, ctx) ⇒ Object
: (DSL, Task, Context) { () -> untyped } -> untyped
53 54 55 56 57 58 59 |
# File 'lib/job_workflow/instrumentation.rb', line 53 def instrument_task(job, task, ctx, &) payload = build_task_payload(job, task, ctx) instrument(Events::TASK_START, payload) instrument(Events::TASK, payload, &) ensure instrument(Events::TASK_COMPLETE, payload) end |
.instrument_throttle(semaphore) ⇒ Object
: (Semaphore) { () -> untyped } -> untyped
94 95 96 97 98 99 100 |
# File 'lib/job_workflow/instrumentation.rb', line 94 def instrument_throttle(semaphore, &) payload = build_throttle_payload(semaphore) instrument(Events::THROTTLE_ACQUIRE_START, payload) instrument(Events::THROTTLE_ACQUIRE, payload, &) ensure instrument(Events::THROTTLE_ACQUIRE_COMPLETE, payload) end |
.instrument_workflow(job) ⇒ Object
: (DSL) { () -> untyped } -> untyped
44 45 46 47 48 49 50 |
# File 'lib/job_workflow/instrumentation.rb', line 44 def instrument_workflow(job, &) payload = build_workflow_payload(job) instrument(Events::WORKFLOW_START, payload) instrument(Events::WORKFLOW, payload, &) ensure instrument(Events::WORKFLOW_COMPLETE, payload) end |
.notify_dependent_reschedule(job, task, reschedule_delay, poll_count) ⇒ Object
: (DSL, Task, Numeric, Integer) -> void
86 87 88 89 90 91 |
# File 'lib/job_workflow/instrumentation.rb', line 86 def notify_dependent_reschedule(job, task, reschedule_delay, poll_count) instrument( Events::DEPENDENT_RESCHEDULE, build_dependent_reschedule_payload(job, task, reschedule_delay, poll_count) ) end |
.notify_queue_pause(queue_name) ⇒ Object
: (String) -> void
108 109 110 |
# File 'lib/job_workflow/instrumentation.rb', line 108 def notify_queue_pause(queue_name) instrument(Events::QUEUE_PAUSE, build_queue_payload(queue_name)) end |
.notify_queue_resume(queue_name) ⇒ Object
: (String) -> void
113 114 115 |
# File 'lib/job_workflow/instrumentation.rb', line 113 def notify_queue_resume(queue_name) instrument(Events::QUEUE_RESUME, build_queue_payload(queue_name)) end |
.notify_task_enqueue(job, task, sub_job_count) ⇒ Object
: (DSL, Task, Integer) -> void
67 68 69 |
# File 'lib/job_workflow/instrumentation.rb', line 67 def notify_task_enqueue(job, task, sub_job_count) instrument(Events::TASK_ENQUEUE, build_task_enqueue_payload(job, task, sub_job_count)) end |
.notify_task_retry(task, ctx, job_id, attempt, delay, error) ⇒ Object
: (Task, Context, String, Integer, Float, StandardError) -> void
72 73 74 |
# File 'lib/job_workflow/instrumentation.rb', line 72 def notify_task_retry(task, ctx, job_id, attempt, delay, error) # rubocop:disable Metrics/ParameterLists instrument(Events::TASK_RETRY, build_task_retry_payload(task, ctx, job_id, attempt, delay, error)) end |
.notify_task_skip(job, task, reason) ⇒ Object
: (DSL, Task, String) -> void
62 63 64 |
# File 'lib/job_workflow/instrumentation.rb', line 62 def notify_task_skip(job, task, reason) instrument(Events::TASK_SKIP, build_task_skip_payload(job, task, reason)) end |
.notify_throttle_release(semaphore) ⇒ Object
: (Semaphore) -> void
103 104 105 |
# File 'lib/job_workflow/instrumentation.rb', line 103 def notify_throttle_release(semaphore) instrument(Events::THROTTLE_RELEASE, build_throttle_payload(semaphore)) end |