Class: Wurk::Swarm

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/wurk/swarm.rb,
lib/wurk/swarm/child_boot.rb

Overview

Parent supervisor. Forks N children per the worker topology, monitors PIDs, relays signals, respawns crashed children, handles rolling restart on SIGUSR1, recycles RSS-bloated children.

Boot ordering (must be exact — see docs/idea/03-process-model.md):

1. Host app boots fully; eager loads done.
2. Railtie `after_initialize` fires.
3. `boot` closes parent-side connections (Redis, ActiveRecord).
4. `boot` forks N children.
5. Each child reconnects DB + opens a fresh Redis pool, then
   installs its own signal handlers and starts the Launcher.
6. Parent calls `supervise` to enter the wait/relay loop.

Signals (see docs/idea/04-signals.md):

TERM/INT  → `shutdown`           (graceful drain)
TSTP      → relay TSTP           (pause fetch)
CONT      → relay CONT           (resume fetch)
USR1      → `rolling_restart`    (zero-downtime cycle)

Defined Under Namespace

Classes: ChildBoot

Constant Summary collapse

SUPERVISE_TICK =
0.2
RESPAWN_BACKOFF =
1.0
HEARTBEAT_WAIT =
30
MEMORY_CHECK_INTERVAL =
10
DEFAULT_SHUTDOWN_TIMEOUT =
25

Constants included from Component

Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE

Instance Attribute Summary collapse

Attributes included from Component

#config

Instance Method Summary collapse

Methods included from Component

#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog

Constructor Details

#initialize(topology:, config: Wurk.configuration, memory_limit: nil, shutdown_timeout: DEFAULT_SHUTDOWN_TIMEOUT) ⇒ Swarm

Returns a new instance of Swarm.



39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/wurk/swarm.rb', line 39

def initialize(topology:, config: Wurk.configuration, memory_limit: nil,
               shutdown_timeout: DEFAULT_SHUTDOWN_TIMEOUT)
  @topology = topology
  @config = config
  @memory_limit = memory_limit
  @shutdown_timeout = shutdown_timeout
  @children = {}
  @assignments = []
  @stopping = false
  @last_memory_check = 0
  @signal_queue = ::Thread::Queue.new
end

Instance Attribute Details

#childrenObject (readonly)

Returns the value of attribute children.



37
38
39
# File 'lib/wurk/swarm.rb', line 37

def children
  @children
end

#topologyObject (readonly)

Returns the value of attribute topology.



37
38
39
# File 'lib/wurk/swarm.rb', line 37

def topology
  @topology
end

Instance Method Details

#boot(install_signals: true) ⇒ Object

‘install_signals:` is false in tests so the integration suite can drive `shutdown` / `rolling_restart` directly without poisoning the test process’s signal handlers.

Raises:

  • (ArgumentError)


55
56
57
58
59
60
61
62
63
64
# File 'lib/wurk/swarm.rb', line 55

def boot(install_signals: true)
  raise 'Wurk::Swarm already booted' unless @assignments.empty?
  raise ArgumentError, 'Topology has no slots' if @topology.empty?

  @assignments = @topology.assignments.freeze
  close_parent_sockets
  fork_children
  install_signal_handlers if install_signals
  @children.keys
end

#rolling_restartObject

SIGUSR1. For each existing child, fork a replacement, wait for its first heartbeat, then TERM + drain the old one. Long-running jobs in the old slot get the full shutdown_timeout while the replacement is already serving new work.



86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/wurk/swarm.rb', line 86

def rolling_restart
  @children.dup.each do |old_pid, meta|
    replacement = fork_child(meta[:slot], meta[:index])
    @children[replacement] = meta
    unless wait_for_heartbeat(replacement)
      logger.warn do
        "swarm: replacement #{replacement} heartbeat not seen within #{HEARTBEAT_WAIT}s; proceeding anyway"
      end
    end
    safe_kill(old_pid, 'TERM')
    wait_pid(old_pid, @shutdown_timeout)
    @children.delete(old_pid)
  end
end

#shutdown(timeout: @shutdown_timeout) ⇒ Object



75
76
77
78
79
80
# File 'lib/wurk/swarm.rb', line 75

def shutdown(timeout: @shutdown_timeout)
  @stopping = true
  relay_signal('TERM')
  wait_for_children(timeout)
  hard_kill_stragglers
end

#superviseObject



66
67
68
69
70
71
72
73
# File 'lib/wurk/swarm.rb', line 66

def supervise
  until done?
    drain_signals
    reap_one_child
    check_memory_pressure
    sleep SUPERVISE_TICK
  end
end