Class: Textus::Maintenance::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/textus/maintenance/worker.rb

Overview

Drains the job queue: lease a job, look up its handler in the registry, run it (as the job’s stamped authority — wired in a later phase), then ack on success or fail (requeue/dead-letter) on a raise. ‘drain` runs until the queue is empty and returns a summary. Delivery is at-least-once.

Defined Under Namespace

Classes: Summary

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue:, registry:, container:, lease_ttl: 60) ⇒ Worker

Returns a new instance of Worker.



19
20
21
22
23
24
# File 'lib/textus/maintenance/worker.rb', line 19

def initialize(queue:, registry:, container:, lease_ttl: 60)
  @queue = queue
  @registry = registry
  @container = container
  @lease_ttl = lease_ttl
end

Class Method Details

.for(container:, queue:) ⇒ Object

The standard convergence worker: the closed handler allow-list plus the lease TTL from worker_config. Both ‘drain` and `serve` build it this way.



12
13
14
15
16
17
# File 'lib/textus/maintenance/worker.rb', line 12

def self.for(container:, queue:)
  new(
    queue: queue, registry: Textus::Jobs::Handlers.registry,
    container: container, lease_ttl: container.manifest.data.worker_config[:lease_ttl]
  )
end

Instance Method Details

#drain(worker_id: "drain-#{Process.pid}") ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/textus/maintenance/worker.rb', line 26

def drain(worker_id: "drain-#{Process.pid}")
  completed = 0
  failed = 0
  loop do
    leased = @queue.lease(worker_id: worker_id, lease_ttl: @lease_ttl)
    break unless leased

    case run_one(leased)
    when :completed     then completed += 1
    when :dead_lettered then failed += 1
      # :requeued -> a transient failure; it re-leases on a later iteration
    end
  end
  Summary.new(completed: completed, failed: failed)
end

#drain_pool(pool: 4) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/textus/maintenance/worker.rb', line 42

def drain_pool(pool: 4)
  summaries = []
  mutex = Mutex.new
  threads = Array.new(pool) do |i|
    Thread.new do
      s = drain(worker_id: "pool-#{Process.pid}-#{i}")
      mutex.synchronize { summaries << s }
    end
  end
  threads.each(&:join)
  Summary.new(
    completed: summaries.sum(&:completed),
    failed: summaries.sum(&:failed),
  )
end