Class: Wurk::Manager

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/wurk/manager.rb

Overview

One per Capsule. Lives inside each forked child and owns the Processor pool. Replaces dead processors on the fly (replace-on-die), forwards the quiet/stop signals received by the Swarm to its processors, and ensures in-flight UnitsOfWork are bulk_requeued before threads are killed.

Lifecycle:

* `start`         — spawn each processor thread.
* `quiet`         — stop fetching; in-flight jobs run to completion.
* `stop(deadline)`— quiet + wait for drain; hard_shutdown on timeout.

Spec: docs/target/sidekiq-free.md §13.

Constant Summary collapse

PAUSE_TIME =

0.1 in TTY mode so interactive shutdown feels snappy; 0.5 in production so the supervisor isn’t spinning while threads drain.

$stdout.tty? ? 0.1 : 0.5

Constants included from Component

Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE

Instance Attribute Summary collapse

Attributes included from Component

#config

Instance Method Summary collapse

Methods included from Component

#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog

Constructor Details

#initialize(capsule) ⇒ Manager

Returns a new instance of Manager.

Raises:

  • (ArgumentError)


27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/wurk/manager.rb', line 27

def initialize(capsule)
  @config = @capsule = capsule
  @count = capsule.concurrency
  raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1

  @done = false
  @workers = Set.new
  @plock = ::Mutex.new
  @count.times do
    @workers << Processor.new(@capsule, &method(:processor_result))
  end
end

Instance Attribute Details

#capsuleObject (readonly)

Returns the value of attribute capsule.



25
26
27
# File 'lib/wurk/manager.rb', line 25

def capsule
  @capsule
end

#workersObject (readonly)

Returns the value of attribute workers.



25
26
27
# File 'lib/wurk/manager.rb', line 25

def workers
  @workers
end

Instance Method Details

#hard_shutdownObject

Reached when the deadline expired with workers still busy. We must push their in-flight UoWs back to the public queues BEFORE raising Wurk::Shutdown into the threads — losing a job is worse than running it twice (Sidekiq’s at-least-once contract).



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/wurk/manager.rb', line 94

def hard_shutdown # rubocop:disable Metrics/AbcSize
  cleanup = nil
  @plock.synchronize do
    cleanup = @workers.dup
  end

  if cleanup.any?
    jobs = cleanup.map(&:job).compact

    logger.warn { "Terminating #{cleanup.size} busy threads" }
    logger.debug { "Jobs still in progress #{jobs.inspect}" }

    capsule.fetcher.bulk_requeue(jobs)
  end

  cleanup.each(&:kill)

  # The caller typically `exit`s immediately after we return; give
  # threads a brief window to run their `ensure` blocks.
  deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + 3
  wait_for(deadline) { @workers.empty? }
end

#processor_result(processor, _reason = nil) ⇒ Object

Processor#run callback: invoked when a Processor thread exits, whether cleanly or via raised exception. Removes the dead processor from the pool and (unless we’re already stopping) spawns a replacement so the capsule’s concurrency stays constant.



79
80
81
82
83
84
85
86
87
88
# File 'lib/wurk/manager.rb', line 79

def processor_result(processor, _reason = nil)
  @plock.synchronize do
    @workers.delete(processor)
    unless @done
      p = Processor.new(@capsule, &method(:processor_result))
      @workers << p
      p.start
    end
  end
end

#quietObject



44
45
46
47
48
49
50
# File 'lib/wurk/manager.rb', line 44

def quiet
  return if @done

  @done = true
  logger.info { "Terminating quiet threads for #{capsule.name} capsule" }
  @workers.each(&:terminate)
end

#startObject



40
41
42
# File 'lib/wurk/manager.rb', line 40

def start
  @workers.each(&:start)
end

#stop(deadline) ⇒ Object

Graceful shutdown: quiet first, then poll for workers to clear. If the deadline elapses with workers still alive we fall through to hard_shutdown, which bulk_requeues their UoWs before killing threads.



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/wurk/manager.rb', line 55

def stop(deadline)
  quiet
  # Lifecycle hooks (e.g. :quiet) can be async; give them a tick to settle
  # before we start polling. Matches Sidekiq's PAUSE_TIME behavior.
  sleep PAUSE_TIME
  return if @workers.empty?

  logger.info { 'Pausing to allow jobs to finish...' }
  wait_for(deadline) { @workers.empty? }
  return if @workers.empty?

  hard_shutdown
ensure
  capsule.stop
end

#stopped?Boolean

Returns:

  • (Boolean)


71
72
73
# File 'lib/wurk/manager.rb', line 71

def stopped?
  @done
end