Class: ActiveJob::Temporal::WorkerHealth
- Inherits:
-
Object
- Object
- ActiveJob::Temporal::WorkerHealth
- Includes:
- Temporalio::Worker::Interceptor::Activity
- Defined in:
- lib/activejob/temporal/worker_health.rb
Defined Under Namespace
Classes: ActivityInbound
Instance Method Summary collapse
-
#initialize(task_queue:, namespace:, target:, max_concurrent_activities:, max_concurrent_workflows:) ⇒ WorkerHealth
constructor
A new instance of WorkerHealth.
- #intercept_activity(next_interceptor) ⇒ Object
- #mark_started! ⇒ Object
- #mark_stopped! ⇒ Object
- #record_task_finished! ⇒ Object
- #record_task_started!(now: Time.now) ⇒ Object
- #snapshot(now: Time.now) ⇒ Object
Constructor Details
#initialize(task_queue:, namespace:, target:, max_concurrent_activities:, max_concurrent_workflows:) ⇒ WorkerHealth
Returns a new instance of WorkerHealth.
13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/activejob/temporal/worker_health.rb', line 13 def initialize(task_queue:, namespace:, target:, max_concurrent_activities:, max_concurrent_workflows:) @task_queue = task_queue @namespace = namespace @target = target @max_concurrent_activities = max_concurrent_activities @max_concurrent_workflows = max_concurrent_workflows @mutex = Mutex.new @started_at = nil @worker_running = false @active_tasks = 0 @last_poll = nil end |
Instance Method Details
#intercept_activity(next_interceptor) ⇒ Object
77 78 79 |
# File 'lib/activejob/temporal/worker_health.rb', line 77 def intercept_activity(next_interceptor) ActivityInbound.new(self, next_interceptor) end |
#mark_started! ⇒ Object
26 27 28 29 30 31 32 |
# File 'lib/activejob/temporal/worker_health.rb', line 26 def mark_started! @mutex.synchronize do @started_at ||= Time.now @worker_running = true end Observability.emit(:worker_start, observability_attributes) end |
#mark_stopped! ⇒ Object
34 35 36 37 38 39 |
# File 'lib/activejob/temporal/worker_health.rb', line 34 def mark_stopped! @mutex.synchronize do @worker_running = false end Observability.emit(:worker_stop, observability_attributes) end |
#record_task_finished! ⇒ Object
50 51 52 53 54 55 56 |
# File 'lib/activejob/temporal/worker_health.rb', line 50 def record_task_finished! active_tasks = @mutex.synchronize do @active_tasks = [@active_tasks - 1, 0].max @active_tasks end Observability.emit(:active_tasks, observability_attributes(count: active_tasks)) end |
#record_task_started!(now: Time.now) ⇒ Object
41 42 43 44 45 46 47 48 |
# File 'lib/activejob/temporal/worker_health.rb', line 41 def record_task_started!(now: Time.now) active_tasks = @mutex.synchronize do @active_tasks += 1 @last_poll = now @active_tasks end Observability.emit(:active_tasks, observability_attributes(count: active_tasks)) end |
#snapshot(now: Time.now) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/activejob/temporal/worker_health.rb', line 58 def snapshot(now: Time.now) @mutex.synchronize do { status: @worker_running ? "ok" : "stopped", worker_running: @worker_running, started_at: iso8601(@started_at), last_poll: iso8601(@last_poll), active_tasks: @active_tasks, uptime_seconds: uptime_seconds(now), task_queue: @task_queue, namespace: @namespace, target: @target, max_concurrent_activities: @max_concurrent_activities, max_concurrent_workflows: @max_concurrent_workflows, pid: Process.pid } end end |