Class: Kino::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/kino/server.rb

Overview

Public server API. All network I/O lives in Rust (tokio + hyper); this class only manages lifecycle and the Ruby worker pool.

Topology is Puma-style two-level: workers ractors × threads threads per ractor in :ractor mode; the same total capacity flattened onto plain Threads in :threaded mode (which runs ANY Rack app, Rails included).

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(app, config_file: nil, **options) ⇒ Server

Settings precedence: explicit kwargs > config_file DSL > defaults.

Examples:

Kino::Server.new(app, config_file: "kino.rb", port: 3000)

Parameters:

  • app (#call)

    a Rack 3 application

  • config_file (String, nil) (defaults to: nil)

    path to a kino.rb config file

  • options (Hash)

    any Configuration setting, e.g. port:, workers:, threads:, mode:, request_timeout:, tls: cert/key Hash



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/kino/server.rb', line 34

def initialize(app, config_file: nil, **options)
  config = Configuration.new
  config.load_file(config_file) if config_file
  config.merge!(options)
  settings = config.to_h

  @app = app
  @bind = settings[:bind]
  @requested_port = settings[:port]
  @workers = Integer(settings[:workers])
  @mode = resolve_mode(settings[:mode])
  # Default threads per mode: 1 in :ractor (threads inside a ractor
  # share its lock; a measured +17% on fast handlers; raise `workers`
  # for I/O concurrency instead), 3 in :threaded (threads ARE the
  # concurrency there).
  @threads = Integer(settings[:threads] || ((@mode == :ractor) ? 1 : 3))
  @queue_depth = Integer(settings[:queue_depth])
  @queue_timeout_ms = (Float(settings[:queue_timeout]) * 1000).round
  @request_timeout_ms = settings[:request_timeout] ? (Float(settings[:request_timeout]) * 1000).round : 0
  @max_connections = settings[:max_connections] ? Integer(settings[:max_connections]) : default_max_connections
  @max_body_size = Integer(settings[:max_body_size] || 0)
  @batch = [Integer(settings[:batch]), 1].max
  @lanes = !!settings[:lanes]
  @log_requests = !!settings[:log_requests]
  @shutdown_timeout = settings[:shutdown_timeout]
  @tokio_threads = settings[:tokio_threads]
  @tls = validate_tls(settings[:tls])
  @pidfile = settings[:pidfile]
  @worker_threads = []
  @supervisor = nil
  @started = false
end

Instance Attribute Details

#bindString (readonly)

Returns the bind address.

Returns:

  • (String)

    the bind address



19
20
21
# File 'lib/kino/server.rb', line 19

def bind
  @bind
end

#modeSymbol (readonly)

Returns the resolved dispatch mode, :ractor or :threaded.

Returns:

  • (Symbol)

    the resolved dispatch mode, :ractor or :threaded



16
17
18
# File 'lib/kino/server.rb', line 16

def mode
  @mode
end

#portInteger? (readonly)

Returns the bound port (nil until #start; the actual port when configured with port 0).

Returns:

  • (Integer, nil)

    the bound port (nil until #start; the actual port when configured with port 0)



13
14
15
# File 'lib/kino/server.rb', line 13

def port
  @port
end

Class Method Details

.run(app, **opts) ⇒ Kino::Server

Production entry point: start, print the banner, trap INT/TERM for graceful shutdown (second signal force-exits), block until done. The kino CLI funnels into this too (CLI#serve).

Parameters:

  • app (#call)

    a Rack 3 application

  • opts (Hash)

    see #initialize

Returns:



153
154
155
156
157
158
159
160
161
162
# File 'lib/kino/server.rb', line 153

def self.run(app, **opts)
  server = new(app, **opts)
  CLI.opening_credits
  server.start
  CLI.action!(server)
  CLI.fin_at_exit
  trap_signals(server)
  server.wait
  server
end

.trap_signals(server) ⇒ void

This method returns an undefined value.

Signal handling shared by Server.run and the kino CLI: INT/TERM drain gracefully (a second signal force-exits), USR1 prints a stats line.

Parameters:



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/kino/server.rb', line 169

def self.trap_signals(server)
  # kill -USR1 <pid> prints a one-line stats snapshot (find the pid in
  # the pidfile when configured).
  trap("USR1") do
    Thread.new { $stdout.puts Kino::CLI.stats_line(server.stats) }
  end
  signaled = false
  %w[INT TERM].each do |signal|
    trap(signal) do
      Process.exit!(1) if signaled
      signaled = true
      $stderr.write("Kino: draining (signal again to force exit)\n")
      # Trap context forbids mutexes; do the real work on a thread.
      Thread.new { server.shutdown }
    end
  end
end

Instance Method Details

#shutdown(timeout: nil) ⇒ nil

Graceful shutdown: stop accepting, drain in-flight work up to the deadline, then escalate: abort remaining clients (500), interrupt blocked workers, kill stragglers; and tear down the runtime. Always returns by ~deadline + a small epsilon; idempotent.

Parameters:

  • timeout (Numeric, nil) (defaults to: nil)

    drain deadline in seconds (default: the configured shutdown_timeout)

Returns:

  • (nil)


106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/kino/server.rb', line 106

def shutdown(timeout: nil)
  return unless @started

  deadline = monotonic_now + (timeout || @shutdown_timeout)
  Native.stop_accepting(@id)

  # Drain: wait for queued + in-flight to reach zero, bounded by deadline.
  until monotonic_now >= deadline
    queued, in_flight = Native.queue_stats(@id)
    break if queued.zero? && in_flight.zero?

    sleep 0.01
  end

  # Idle workers see the closed queue and exit their loops.
  Native.close_queue(@id)
  join_workers(deadline)

  unless workers_done?
    # Past the deadline with stuck handlers: free the clients first,
    # then try to unblock and reap the workers.
    Native.abort_all_inflight(@id)
    Native.interrupt_all_workers(@id)
    join_workers(monotonic_now + 0.2)
    kill_stragglers
  end

  Native.shutdown_runtime(@id, 1_000)
  @worker_threads.clear
  @started = false
  File.delete(@pidfile) if @pidfile && File.exist?(@pidfile)
  nil
end

#startself

Bind, boot the native front-end, and spawn the worker pool.

Returns:

  • (self)

Raises:



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/kino/server.rb', line 72

def start
  raise Error, "server already started" if @started

  @id, @port = Native.server_start(
    bind: @bind, port: @requested_port,
    queue_depth: @queue_depth, queue_timeout_ms: @queue_timeout_ms,
    request_timeout_ms: @request_timeout_ms,
    max_connections: @max_connections,
    max_body_size: @max_body_size,
    tokio_threads: @tokio_threads,
    tls_cert: @tls&.fetch(:cert), tls_key: @tls&.fetch(:key),
    lanes: @lanes, log_requests: @log_requests
  )
  File.write(@pidfile, "#{Process.pid}\n") if @pidfile
  if @mode == :ractor
    @supervisor = RactorSupervisor.new(@id, @app, workers: @workers, threads: @threads, batch: @batch).start
  else
    @worker_threads = (@workers * @threads).times.map do
      worker_id = Native.register_worker(@id)
      Thread.new { Worker.run(@id, worker_id, @app, @batch) }
    end
  end
  @started = true
  self
end

#statsHash{Symbol => Object}

Live snapshot. Counters come from the native layer (one relaxed atomic per request); config echo makes the line self-describing.

Returns:

  • (Hash{Symbol => Object})

    mode, lanes, workers, threads, batch, respawns; plus queued, in_flight, served, rejected, timeouts (and lane_depths in lanes mode) once started



193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/kino/server.rb', line 193

def stats
  base = {
    mode: @mode, lanes: @lanes, workers: @workers, threads: @threads,
    batch: @batch, respawns: @supervisor ? @supervisor.respawns : 0
  }
  return base unless @started

  queued, in_flight, served, rejected, timeouts, lane_depths = Native.server_stats(@id)
  base.merge!(queued:, in_flight:, served:, rejected:, timeouts:)
  base[:lane_depths] = lane_depths if lane_depths
  base
end

#tls?Boolean

Returns whether TLS termination is configured.

Returns:

  • (Boolean)

    whether TLS termination is configured



22
23
24
# File 'lib/kino/server.rb', line 22

def tls?
  !@tls.nil?
end

#waitvoid

This method returns an undefined value.

Block until every worker has exited (i.e. until shutdown).



142
143
144
# File 'lib/kino/server.rb', line 142

def wait
  @supervisor ? @supervisor.join : @worker_threads.each(&:join)
end