Class: Raptor::Request

Inherits:
Object
  • Object
show all
Defined in:
lib/raptor/request.rb

Overview

Handles HTTP request processing and Rack application integration.

Request manages the HTTP parsing pipeline using Ractors and coordinates with the reactor for connection state management. It bridges between the low-level HTTP parsing and high-level Rack application interface, handling both incomplete requests (that need more data) and complete requests (ready for application processing).

Defined Under Namespace

Classes: Error, WriteError

Constant Summary collapse

BODY_BUFFER_THRESHOLD =
256 * 1024
FILE_CHUNK_SIZE =
64 * 1024
READ_BUFFER_SIZE =
64 * 1024
WRITE_TIMEOUT =
5
KEEPALIVE_READ_TIMEOUT =
0.001
MAX_KEEPALIVE_REQUESTS =
100
HTTP_SCHEME =
"http"
HTTP_10 =
"HTTP/1.0"
HTTP_11 =
"HTTP/1.1"
STATUS_LINE_CACHE_10 =
Hash.new do |h, status|
  reason = Rack::Utils::HTTP_STATUS_CODES[status]
  h[status] = "HTTP/1.0 #{status}#{reason ? " #{reason}" : ""}\r\n".freeze
end
STATUS_LINE_CACHE_11 =
Hash.new do |h, status|
  reason = Rack::Utils::HTTP_STATUS_CODES[status]
  h[status] = "HTTP/1.1 #{status}#{reason ? " #{reason}" : ""}\r\n".freeze
end
STATUS_WITH_NO_ENTITY_BODY =
Set.new([204, 304, *100..199]).freeze
BAD_REQUEST_RESPONSE =
"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
INTERNAL_SERVER_ERROR_RESPONSE =
"HTTP/1.1 500 Internal Server Error\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
CONTENT_TOO_LARGE_RESPONSE =
"HTTP/1.1 413 Content Too Large\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
CONNECTION_CLOSE =
"close"
CONNECTION_KEEPALIVE =
"keep-alive"
TRANSFER_ENCODING_CHUNKED =
"chunked"
HTTP_CONNECTION =
"HTTP_CONNECTION"
HTTP_TRANSFER_ENCODING =
"HTTP_TRANSFER_ENCODING"
RACK_HEADER_PREFIX =
"rack."
RACK_HIJACKED =
"rack.hijacked"
RACK_HIJACK_IO =
"rack.hijack_io"
ILLEGAL_HEADER_KEY_REGEX =
/[\x00-\x20\(\)<>@,;:\\"\/\[\]\?=\{\}\x7F]/
ILLEGAL_HEADER_VALUE_REGEX =
/[\x00-\x08\x0A-\x1F]/

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(app, server_port, client_options: {}, on_error: nil) ⇒ void

Creates a new Request handler.

Parameters:

  • app (#call)

    the Rack application to dispatch complete requests to

  • server_port (Integer)

    port number used to populate SERVER_PORT in the Rack env

  • client_options (Hash) (defaults to: {})

    client limits configuration

  • on_error (#call, nil) (defaults to: nil)

    callback invoked with (env, exception) when the Rack app raises

Options Hash (client_options:):

  • :max_body_size (Integer, nil)

    maximum request body size in bytes

  • :body_spool_threshold (Integer, nil)

    spool bodies larger than this to a tempfile



142
143
144
145
146
147
148
149
# File 'lib/raptor/request.rb', line 142

def initialize(app, server_port, client_options: {}, on_error: nil)
  @app = app
  @server_port = server_port
  @max_body_size = client_options[:max_body_size]
  @body_spool_threshold = client_options[:body_spool_threshold]
  @on_error = on_error
  @running = AtomicBoolean.new(true)
end

Class Method Details

.decode_chunked(buffer, max_size = nil) ⇒ Array(String, Symbol)

Decodes a chunked transfer-encoded body buffer.

Returns the decoded bytes and a state symbol: ‘:complete` when the terminating zero-length chunk was found, `:too_large` when the decoded size would exceed `max_size`, or `:incomplete` otherwise.

Parameters:

  • buffer (String)

    the raw body buffer to decode

  • max_size (Integer, nil) (defaults to: nil)

    maximum decoded body size, or nil for unlimited

Returns:

  • (Array(String, Symbol))

    decoded body and completion state



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/raptor/request.rb', line 77

def self.decode_chunked(buffer, max_size = nil)
  decoded = String.new
  offset = 0

  while offset < buffer.bytesize
    crlf = buffer.index("\r\n", offset)
    return [decoded, :incomplete] unless crlf

    chunk_size = buffer.byteslice(offset, crlf - offset).to_i(16)
    return [decoded, :complete] if chunk_size == 0
    return [decoded, :too_large] if max_size && decoded.bytesize + chunk_size > max_size

    offset = crlf + 2
    decoded << buffer.byteslice(offset, chunk_size)
    offset += chunk_size + 2
  end

  [decoded, :incomplete]
end

.socket_write(socket, string) ⇒ void

This method returns an undefined value.

Writes a string to the socket, retrying on partial writes and flow control blocks.

Uses write_nonblock with ‘WRITE_TIMEOUT` to avoid blocking the thread indefinitely on slow clients.

Parameters:

  • socket (TCPSocket)

    the socket to write to

  • string (String)

    the data to write

Raises:

  • (WriteError)

    if the socket is not writable within the timeout or raises IOError



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/raptor/request.rb', line 108

def self.socket_write(socket, string)
  bytes = 0
  byte_size = string.bytesize

  while bytes < byte_size
    begin
      bytes += socket.write_nonblock(bytes.zero? ? string : string.byteslice(bytes..-1))
    rescue IO::WaitWritable
      raise WriteError unless socket.wait_writable(WRITE_TIMEOUT)
      retry
    rescue IOError
      raise WriteError
    end
  end
end

Instance Method Details

#eager_accept(socket, id, reactor, thread_pool, remote_addr, url_scheme) ⇒ void

This method returns an undefined value.

Eagerly reads and parses the first request on a freshly accepted connection on the server thread, dispatching directly to the thread pool when complete. Falls back to the reactor when more data is needed.

Parameters:

  • socket (TCPSocket)

    the freshly accepted client socket

  • id (Integer)

    unique client identifier

  • reactor (Reactor)

    the reactor for fallback registration

  • thread_pool (AtomicThreadPool)

    thread pool for application processing

  • remote_addr (String)

    client IP address

  • url_scheme (String)

    “http” or “https”



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/raptor/request.rb', line 174

def eager_accept(socket, id, reactor, thread_pool, remote_addr, url_scheme)
  data = begin
    socket.read_nonblock(READ_BUFFER_SIZE)
  rescue IO::WaitReadable
    reactor.add(
      id: id,
      socket: socket,
      remote_addr: remote_addr,
      url_scheme: url_scheme
    )
    return
  rescue EOFError, IOError
    socket.close rescue nil
    return
  end

  buffer = String.new
  buffer << data

  while socket.respond_to?(:pending) && socket.pending > 0
    buffer << socket.read_nonblock(socket.pending)
  end

  parser = HttpParser.new
  env = {}
  nread = begin
    parser.execute(env, buffer, 0)
  rescue HttpParserError
    reject_malformed(socket)
    return
  end
  parse_data = { parse_count: 1, content_length: parser.content_length }

  body = nil
  if !parser.finished?
    fallback_to_reactor(socket, id, buffer, env, parse_data, reactor, 0, remote_addr, url_scheme, persisted: false)
    return
  elsif parser.has_body?
    if @max_body_size && parser.content_length > @max_body_size
      reject_oversized(socket)
      return
    end

    body = buffer.byteslice(nread..-1) || ""

    if env[HTTP_TRANSFER_ENCODING]&.include?(TRANSFER_ENCODING_CHUNKED)
      body, chunked_state = Request.decode_chunked(body, @max_body_size)
      case chunked_state
      when :complete
        env.delete(HTTP_TRANSFER_ENCODING)
      when :too_large
        reject_oversized(socket)
        return
      else
        fallback_to_reactor(socket, id, buffer, env, parse_data, reactor, 0, remote_addr, url_scheme, persisted: false)
        return
      end
    elsif parser.content_length > body.bytesize
      fallback_to_reactor(socket, id, buffer, env, parse_data, reactor, 0, remote_addr, url_scheme, persisted: false)
      return
    end
  end

  thread_pool << proc do
    process_client(socket, id, env, parse_data, body, reactor, thread_pool, 1, remote_addr, url_scheme)
  end
end

#handle_parsed_request(parsed_request, reactor, thread_pool) ⇒ void

This method returns an undefined value.

Handles a parsed HTTP request by either continuing parsing or dispatching to the Rack app.

For incomplete requests, updates reactor state and re-registers for more I/O. For complete requests, removes from reactor, builds Rack env, and dispatches to thread pool.

Parameters:

  • parsed_request (Hash)

    the parsed request state from the ractor pool

  • reactor (Reactor)

    the reactor managing the client connection

  • thread_pool (AtomicThreadPool)

    thread pool for application processing



315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/raptor/request.rb', line 315

def handle_parsed_request(parsed_request, reactor, thread_pool)
  if parsed_request[:too_large]
    socket = reactor.remove(parsed_request[:id])
    reject_oversized(socket) if socket
    return
  end

  if parsed_request[:malformed]
    socket = reactor.remove(parsed_request[:id])
    reject_malformed(socket) if socket
    return
  end

  unless parsed_request[:complete]
    reactor.update_state(parsed_request)
  else
    socket = reactor.remove(parsed_request[:id])
    request_count = (parsed_request[:request_count] || 0) + 1
    remote_addr = parsed_request[:remote_addr] || "127.0.0.1"
    url_scheme = parsed_request[:url_scheme] || HTTP_SCHEME

    thread_pool << proc do
      process_client(
        socket,
        parsed_request[:id],
        parsed_request[:env].dup,
        parsed_request[:parse_data],
        parsed_request[:body],
        reactor,
        thread_pool,
        request_count,
        remote_addr,
        url_scheme
      )
    end
  end
end

#http_parser_workerProc

Returns a Proc for HTTP parsing work in Ractor context.

The returned Proc processes raw socket data through the appropriate HTTP parser and returns either a complete request state (ready for app processing) or incomplete request state (needs more data).

Returns:

  • (Proc)

    a Ractor-safe proc that accepts a state hash and returns an updated state hash



251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/raptor/request.rb', line 251

def http_parser_worker
  max_body_size = @max_body_size

  proc do |data|
    next Raptor::Http2.process_frames(data) if data[:protocol] == :http2

    parser = Raptor::HttpParser.new
    env = {}
    nread = begin
      parser.execute(env, data[:buffer], 0)
    rescue Raptor::HttpParserError
      next Ractor.make_shareable(data.merge(complete: true, malformed: true))
    end
    parse_data = if data[:parse_data]
      data[:parse_data].dup
    else
      { parse_count: 0, content_length: parser.content_length }
    end
    parse_data[:parse_count] += 1

    message = if parser.finished?
      if parser.has_body?
        body_buffer = data[:buffer].byteslice(nread..-1) || ""

        if max_body_size && parser.content_length > max_body_size
          data.merge(env: env, body: nil, parse_data: parse_data, complete: true, too_large: true)
        elsif env[HTTP_TRANSFER_ENCODING]&.include?(TRANSFER_ENCODING_CHUNKED)
          decoded_body, chunked_state = Raptor::Request.decode_chunked(body_buffer, max_body_size)

          case chunked_state
          when :complete
            env.delete(HTTP_TRANSFER_ENCODING)
            data.merge(env: env, body: decoded_body, parse_data: parse_data, complete: true)
          when :too_large
            data.merge(env: env, body: nil, parse_data: parse_data, complete: true, too_large: true)
          else
            data.merge(env: env, parse_data: parse_data)
          end
        elsif parser.content_length > body_buffer.bytesize
          data.merge(env: env, parse_data: parse_data)
        else
          data.merge(env: env, body: body_buffer, parse_data: parse_data, complete: true)
        end
      else
        data.merge(env: env, body: nil, parse_data: parse_data, complete: true)
      end
    else
      data.merge(env: env, parse_data: parse_data)
    end
    Ractor.make_shareable(message)
  end
end

#shutdownvoid

This method returns an undefined value.

Signals eager keep-alive loops to stop processing further requests on their connections. In-flight requests complete normally.



157
158
159
# File 'lib/raptor/request.rb', line 157

def shutdown
  @running.make_false
end