Class: ActiveJob::Temporal::WorkerHealth

Inherits:
Object
  • Object
show all
Includes:
Temporalio::Worker::Interceptor::Activity
Defined in:
lib/activejob/temporal/worker_health.rb

Defined Under Namespace

Classes: ActivityInbound

Instance Method Summary collapse

Constructor Details

#initialize(task_queue:, namespace:, target:, max_concurrent_activities:, max_concurrent_workflows:) ⇒ WorkerHealth

Returns a new instance of WorkerHealth.



13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/activejob/temporal/worker_health.rb', line 13

def initialize(task_queue:, namespace:, target:, max_concurrent_activities:, max_concurrent_workflows:)
  @task_queue = task_queue
  @namespace = namespace
  @target = target
  @max_concurrent_activities = max_concurrent_activities
  @max_concurrent_workflows = max_concurrent_workflows
  @mutex = Mutex.new
  @started_at = nil
  @worker_running = false
  @active_tasks = 0
  @last_poll = nil
end

Instance Method Details

#intercept_activity(next_interceptor) ⇒ Object



77
78
79
# File 'lib/activejob/temporal/worker_health.rb', line 77

def intercept_activity(next_interceptor)
  ActivityInbound.new(self, next_interceptor)
end

#mark_started!Object



26
27
28
29
30
31
32
# File 'lib/activejob/temporal/worker_health.rb', line 26

def mark_started!
  @mutex.synchronize do
    @started_at ||= Time.now
    @worker_running = true
  end
  Observability.emit(:worker_start, observability_attributes)
end

#mark_stopped!Object



34
35
36
37
38
39
# File 'lib/activejob/temporal/worker_health.rb', line 34

def mark_stopped!
  @mutex.synchronize do
    @worker_running = false
  end
  Observability.emit(:worker_stop, observability_attributes)
end

#record_task_finished!Object



50
51
52
53
54
55
56
# File 'lib/activejob/temporal/worker_health.rb', line 50

def record_task_finished!
  active_tasks = @mutex.synchronize do
    @active_tasks = [@active_tasks - 1, 0].max
    @active_tasks
  end
  Observability.emit(:active_tasks, observability_attributes(count: active_tasks))
end

#record_task_started!(now: Time.now) ⇒ Object



41
42
43
44
45
46
47
48
# File 'lib/activejob/temporal/worker_health.rb', line 41

def record_task_started!(now: Time.now)
  active_tasks = @mutex.synchronize do
    @active_tasks += 1
    @last_poll = now
    @active_tasks
  end
  Observability.emit(:active_tasks, observability_attributes(count: active_tasks))
end

#snapshot(now: Time.now) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/activejob/temporal/worker_health.rb', line 58

def snapshot(now: Time.now)
  @mutex.synchronize do
    {
      status: @worker_running ? "ok" : "stopped",
      worker_running: @worker_running,
      started_at: iso8601(@started_at),
      last_poll: iso8601(@last_poll),
      active_tasks: @active_tasks,
      uptime_seconds: uptime_seconds(now),
      task_queue: @task_queue,
      namespace: @namespace,
      target: @target,
      max_concurrent_activities: @max_concurrent_activities,
      max_concurrent_workflows: @max_concurrent_workflows,
      pid: Process.pid
    }
  end
end