Class: Quicsilver::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/quicsilver/server/server.rb,
lib/quicsilver/server/listener_data.rb,
lib/quicsilver/server/request_handler.rb,
lib/quicsilver/server/request_registry.rb

Defined Under Namespace

Classes: ListenerData, PendingStream, RequestHandler, RequestRegistry

Constant Summary collapse

STREAM_EVENT_RECEIVE =
"RECEIVE"
STREAM_EVENT_RECEIVE_FIN =
"RECEIVE_FIN"
STREAM_EVENT_CONNECTION_ESTABLISHED =
"CONNECTION_ESTABLISHED"
STREAM_EVENT_SEND_COMPLETE =
"SEND_COMPLETE"
STREAM_EVENT_CONNECTION_CLOSED =
"CONNECTION_CLOSED"
STREAM_EVENT_STREAM_RESET =
"STREAM_RESET"
STREAM_EVENT_STOP_SENDING =
"STOP_SENDING"
ServerStopError =
Class.new(StandardError)
DrainTimeoutError =
Class.new(StandardError)
DEFAULT_THREAD_POOL_SIZE =
5
DEFAULT_QUEUE_MULTIPLIER =
4
DEFAULT_MAX_CONNECTIONS =
100

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(port = 4433, address: "0.0.0.0", app: nil, server_configuration: nil, threads: DEFAULT_THREAD_POOL_SIZE, max_queue_size: nil, max_connections: DEFAULT_MAX_CONNECTIONS) ⇒ Server

Returns a new instance of Server.



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/quicsilver/server/server.rb', line 53

def initialize(port = 4433, address: "0.0.0.0", app: nil, server_configuration: nil, threads: DEFAULT_THREAD_POOL_SIZE, max_queue_size: nil, max_connections: DEFAULT_MAX_CONNECTIONS)
  @port = port
  @address = address
  @app = app || default_rack_app
  @server_configuration = server_configuration || Transport::Configuration.new
  @running = false
  @shutting_down = false
  @listener_data = nil
  @config_handle = nil
  @connections = {}
  @request_registry = RequestRegistry.new
  @handler_threads = []
  @handler_mutex = Mutex.new
  @thread_pool_size = threads
  @max_queue_size = max_queue_size || threads * DEFAULT_QUEUE_MULTIPLIER
  @work_queue = Queue.new
  @max_connections = max_connections
  @cancelled_streams = Set.new
  @cancelled_mutex = Mutex.new
  @pending_streams = {}  # stream_id => PendingStream (for streaming dispatch)
  @pending_mutex = Mutex.new

  protocol_app = wrap_app(@app, @server_configuration.mode)

  @request_handler = RequestHandler.new(
    app: protocol_app,
    configuration: @server_configuration,
    request_registry: @request_registry,
    cancelled_streams: @cancelled_streams,
    cancelled_mutex: @cancelled_mutex
  )

  self.class.instance = self
end

Class Attribute Details

.instanceObject

Returns the value of attribute instance.



41
42
43
# File 'lib/quicsilver/server/server.rb', line 41

def instance
  @instance
end

Instance Attribute Details

#addressObject (readonly)

Returns the value of attribute address.



5
6
7
# File 'lib/quicsilver/server/server.rb', line 5

def address
  @address
end

#connectionsObject (readonly)

Returns the value of attribute connections.



5
6
7
# File 'lib/quicsilver/server/server.rb', line 5

def connections
  @connections
end

#max_connectionsObject (readonly)

Returns the value of attribute max_connections.



5
6
7
# File 'lib/quicsilver/server/server.rb', line 5

def max_connections
  @max_connections
end

#max_queue_sizeObject (readonly)

Returns the value of attribute max_queue_size.



5
6
7
# File 'lib/quicsilver/server/server.rb', line 5

def max_queue_size
  @max_queue_size
end

#portObject (readonly)

Returns the value of attribute port.



5
6
7
# File 'lib/quicsilver/server/server.rb', line 5

def port
  @port
end

#request_registryObject (readonly)

Returns the value of attribute request_registry.



5
6
7
# File 'lib/quicsilver/server/server.rb', line 5

def request_registry
  @request_registry
end

#runningObject (readonly)

Returns the value of attribute running.



5
6
7
# File 'lib/quicsilver/server/server.rb', line 5

def running
  @running
end

#server_configurationObject (readonly)

Returns the value of attribute server_configuration.



5
6
7
# File 'lib/quicsilver/server/server.rb', line 5

def server_configuration
  @server_configuration
end

#shutting_downObject (readonly)

Returns the value of attribute shutting_down.



5
6
7
# File 'lib/quicsilver/server/server.rb', line 5

def shutting_down
  @shutting_down
end

Class Method Details

.handle_stream(connection_data, stream_id, event, data, early_data) ⇒ Object

Callback from C extension - delegates to server instance



44
45
46
# File 'lib/quicsilver/server/server.rb', line 44

def handle_stream(connection_data, stream_id, event, data, early_data)
  instance&.handle_stream_event(connection_data, stream_id, event, data, early_data)
end

Instance Method Details

#cancelled_stream?(stream_id) ⇒ Boolean

Returns:

  • (Boolean)


160
161
162
# File 'lib/quicsilver/server/server.rb', line 160

def cancelled_stream?(stream_id)
  @cancelled_mutex.synchronize { @cancelled_streams.include?(stream_id) }
end

#drain(timeout: 5) ⇒ Object

Wait for work queue to drain, then shut down the pool



165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/quicsilver/server/server.rb', line 165

def drain(timeout: 5)
  Quicsilver.logger.debug("Draining work queue (#{@work_queue.size} pending)")

  deadline = Time.now + timeout

  # Wait for work queue to empty
  while @work_queue.size > 0 && Time.now < deadline
    sleep 0.05
  end

  # Signal workers to exit
  stop_worker_pool
end

#handle_stream_event(connection_data, stream_id, event, data, early_data) ⇒ Object

:nodoc:



222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/quicsilver/server/server.rb', line 222

def handle_stream_event(connection_data, stream_id, event, data, early_data) # :nodoc:
  connection_handle = connection_data[0]

  case event
  when STREAM_EVENT_CONNECTION_ESTABLISHED
    if @connections.size >= @max_connections
      Quicsilver.logger.warn("Connection limit reached (#{@max_connections}), rejecting connection")
      Quicsilver.connection_shutdown(connection_handle, Protocol::H3_EXCESSIVE_LOAD, false)
      return
    end

    connection = Transport::Connection.new(connection_handle, connection_data,
      max_header_size: @server_configuration.max_header_size)
    @connections[connection_handle] = connection
    connection.setup_http3_streams

  when STREAM_EVENT_CONNECTION_CLOSED
    @connections.delete(connection_handle)&.streams&.clear
    Quicsilver.close_server_connection(connection_handle)

  when STREAM_EVENT_SEND_COMPLETE
    # Buffer cleanup handled in C extension
  when STREAM_EVENT_RECEIVE
    return unless (connection = @connections[connection_handle])
    handle_receive(connection, connection_handle, stream_id, data, early_data: early_data)
  when STREAM_EVENT_RECEIVE_FIN
    return unless (connection = @connections[connection_handle])
    handle_receive_fin(connection, connection_handle, stream_id, data, early_data: early_data)
  when STREAM_EVENT_STREAM_RESET
    return unless (connection = @connections[connection_handle])
    event = Transport::StreamEvent.new(data, "STREAM_RESET")
    Quicsilver.logger.debug("Stream #{stream_id} reset by peer with error code: 0x#{event.error_code.to_s(16)}")

    # Closing a critical unidirectional stream is a connection error (RFC 9114 §6.2.1)
    if connection.critical_stream?(stream_id)
      Quicsilver.logger.error("Critical stream #{stream_id} reset by peer")
      Quicsilver.connection_shutdown(connection_handle, Protocol::H3_CLOSED_CRITICAL_STREAM, false) rescue nil
    else
      @cancelled_mutex.synchronize { @cancelled_streams.add(stream_id) }
      pending = @pending_mutex.synchronize { @pending_streams.delete(stream_id) }
      pending&.body&.close(RuntimeError.new("Stream #{stream_id} reset by peer"))
      @request_registry.complete(stream_id)
    end
  when STREAM_EVENT_STOP_SENDING
    return unless @connections[connection_handle]
    event = Transport::StreamEvent.new(data, "STOP_SENDING")
    Quicsilver.logger.debug("Stream #{stream_id} stop sending requested with error code: 0x#{event.error_code.to_s(16)}")
    @cancelled_mutex.synchronize { @cancelled_streams.add(stream_id) }
    Quicsilver.stream_reset(event.handle, Protocol::H3_REQUEST_CANCELLED)
    @request_registry.complete(stream_id)
  end
end

#running?Boolean

Returns:

  • (Boolean)


156
157
158
# File 'lib/quicsilver/server/server.rb', line 156

def running?
  @running
end

#shutdown(timeout: 30) ⇒ Object

Graceful shutdown: send GOAWAY, drain requests, then stop



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/quicsilver/server/server.rb', line 180

def shutdown(timeout: 30)
  return unless @running
  return if @shutting_down

  @shutting_down = true
  Quicsilver.logger.info("Initiating graceful shutdown (timeout: #{timeout}s)")

  # Phase 1: Send GOAWAY - tell clients to stop sending new requests
  @connections.each_value { |c| c.send_goaway(Protocol::MAX_STREAM_ID) }

  # Phase 2: Drain in-flight requests
  drain(timeout: timeout)

  # Phase 2b: Send final GOAWAY with actual last processed stream ID (RFC 9114 §5.2)
  @connections.each_value do |c|
    c.send_goaway
  rescue => e
    Quicsilver.logger.debug("Second GOAWAY failed: #{e.message}")
  end

  # Grace period: let pending responses reach clients
  sleep [0.5, timeout * 0.1].min

  # Log any requests that didn't complete
  unless @request_registry.empty?
    @request_registry.active_requests.each do |stream_id, req|
      elapsed = Time.now - req[:started_at]
      Quicsilver.logger.warn("Force-closing request: #{req[:method]} #{req[:path]} (stream: #{stream_id}, elapsed: #{elapsed.round(2)}s)")
    end
  end

  # Phase 3: Shutdown connections
  @connections.each_value(&:shutdown)
  sleep [0.1, timeout * 0.05].min

  # Phase 4: Hard stop
  stop
  @shutting_down = false

  Quicsilver.logger.info("Graceful shutdown complete")
end

#startObject



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/quicsilver/server/server.rb', line 88

def start
  raise ServerIsRunningError, "Server is already running" if @running

  Quicsilver.open_connection
  @config_handle = Quicsilver.create_server_configuration(@server_configuration.to_h)
  raise ServerConfigurationError, "Failed to create server configuration" unless @config_handle

  result = Quicsilver.create_listener(@config_handle)
  @listener_data = ListenerData.new(result[0], result[1])
  raise ServerListenerError, "Failed to create listener #{@address}:#{@port}"  unless @listener_data

  unless Quicsilver.start_listener(@listener_data.listener_handle, @address, @port, @server_configuration.alpn)
    Quicsilver.close_configuration(@config_handle)
    @config_handle = nil
    cleanup_failed_server
    raise ServerListenerError, "Failed to start listener on #{@address}:#{@port}"
  end

  @running = true

  setup_signal_handlers
  start_worker_pool
  Quicsilver.event_loop.start
  Quicsilver.event_loop.join  # Block until shutdown
rescue ServerConfigurationError, ServerListenerError => e
  cleanup_failed_server
  @running = false
  raise e
rescue => e
  cleanup_failed_server
  @running = false

  error_msg = case e.message
  when /0x16/
    "Invalid parameter error - check certificate files and network configuration"
  when /0x30/
    "Address already in use - port #{@port} may be occupied"
  else
    e.message
  end

  raise ServerError, "Server start failed: #{error_msg}"
end

#stopObject



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/quicsilver/server/server.rb', line 132

def stop
  return unless @running

  drain

  if @listener_data && @listener_data.listener_handle
    Quicsilver.stop_listener(@listener_data.listener_handle)
    Quicsilver.close_listener([@listener_data.listener_handle, @listener_data.context_handle])
  end

  if @config_handle
    Quicsilver.close_configuration(@config_handle)
    @config_handle = nil
  end

  Quicsilver.event_loop.stop
  @running = false
  @listener_data = nil
rescue => e
  @listener_data = nil
  @running = false
  raise ServerStopError, "Failed to stop server: #{e.message}"
end