Class: Raptor::Cluster

Inherits:
Object
  • Object
show all
Defined in:
lib/raptor/cluster.rb

Overview

Multi-process web server cluster with advanced concurrency architecture.

Cluster manages multiple worker processes, each running a complete server stack including a reactor thread, server thread, ractor pool for HTTP parsing, and thread pool for application processing. It handles process forking, signal management, graceful shutdown, and automatic worker restart when a worker process unexpectedly exits.

The architecture provides horizontal scaling through processes while maintaining efficient I/O and CPU utilization within each process through the combination of NIO reactors, ractor-based parsing, and thread pools.

Flow per worker process:

  1. Server continuously accepts connections but skips acceptance when backlog is high

  2. Reactor manages I/O multiplexing and provides backlog metrics for load control

  3. Ractor pool handles CPU-intensive HTTP parsing in parallel

  4. Thread pool processes Rack applications and handles response writing

  5. Natural load balancing occurs through backpressure-based acceptance control

Examples:

Basic usage

options = {
  threads: 8, ractors: 2, workers: 4,
  binds: ["tcp://0.0.0.0:3000"],
  rackup: "config.ru",
  client: { first_data_timeout: 30, chunk_data_timeout: 10 }
}
Cluster.run(options)

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ void

Creates a new Cluster with the specified configuration.

Initializes the cluster with thread, ractor, and worker counts, sets up network binding, loads the Rack application, and prepares for multi-process operation.

Parameters:

  • options (Hash)

    cluster configuration options

Options Hash (options):

  • :threads (Integer)

    number of threads per worker process

  • :ractors (Integer)

    number of ractors per worker process

  • :workers (Integer)

    number of worker processes

  • :binds (Array<String>)

    array of bind URIs

  • :rackup (String)

    path to Rack configuration file

  • :client (Hash)

    client timeout configuration



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/raptor/cluster.rb', line 85

def initialize(options)
  @thread_count = options[:threads]
  @ractor_count = options[:ractors]
  @worker_count = options[:workers]
  @client_options = options[:client]

  @binder = Binder.new(options[:binds])
  @server_port = @binder.server_port
  @app = Rack::Builder.parse_file(options[:rackup])
  log_initialization

  @shutdown = false
  @workers = {}
  @stats = Stats.new(@worker_count)
  @stats_file = options[:stats_file]
end

Class Method Details

.run(options) ⇒ void

This method returns an undefined value.

Convenience method to create and run a cluster with the given options.

Parameters:

  • options (Hash)

    cluster configuration options



53
54
55
# File 'lib/raptor/cluster.rb', line 53

def self.run(options)
  new(options).run
end

Instance Method Details

#runvoid

This method returns an undefined value.

Starts the multi-process cluster and manages worker processes.

Forks the configured number of worker processes and monitors them, automatically restarting any that exit unexpectedly. Handles graceful shutdown via INT or TERM signals, and stats logging via USR1.

Each worker process includes:

  • 1 server thread (continuously accepts connections with backpressure control)

  • 1 reactor thread (I/O multiplexing, timeout handling, backlog monitoring)

  • N ractor workers (parallel HTTP parsing)

  • 1 ractor collector thread (coordinates parsing results)

  • M worker threads (Rack application processing and response writing)

  • 1 stats thread (writes per-worker metrics to shared memory every second)



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/raptor/cluster.rb', line 119

def run
  trap("INT") { shutdown }
  trap("TERM") { shutdown }
  trap("USR1") { log_stats }

  @worker_count.times { |index| spawn_worker(index) }

  stats_file_thread = if @stats_file
    Thread.new do
      Thread.current.name = "Raptor Stats File"

      write_stats_file_loop
    end
  end

  until @shutdown
    begin
      pid, status = Process.wait2(-1, Process::WNOHANG)
    rescue Errno::ECHILD
      break
    end

    if pid
      index = @workers.key(pid)
      @workers.delete(index)

      unless @shutdown
        warn "[#{Process.pid}] Restarting worker #{index} (#{pid}), #{exit_description(status)}"
        spawn_worker(index)
      end
    else
      sleep 0.1
    end
  end

  @workers.values.each { |pid| Process.kill("TERM", pid) rescue nil }
  @workers.values.each { |pid| Process.wait(pid) rescue nil }
  stats_file_thread&.join
  File.delete(@stats_file) rescue nil if @stats_file
  @stats.unmap
end

#statsArray<Hash>

Returns stats for all worker processes.

Returns:

  • (Array<Hash>)

    array of per-worker stat hashes, each containing :pid, :requests, :backlog, :started_at, :last_checkin, and :booted



167
168
169
# File 'lib/raptor/cluster.rb', line 167

def stats
  @stats.all
end