Class: Raptor::Request

Inherits:
Object
  • Object
show all
Defined in:
lib/raptor/request.rb

Overview

Parses HTTP/1.x requests and dispatches them to the Rack application. Coordinates with the Ractor pool for parsing and with the reactor for requests that need more data before they can be handled.

Defined Under Namespace

Classes: Error, WriteError

Constant Summary collapse

BODY_BUFFER_THRESHOLD =
256 * 1024
FILE_CHUNK_SIZE =
64 * 1024
READ_BUFFER_SIZE =
64 * 1024
WRITE_TIMEOUT =
5
KEEPALIVE_READ_TIMEOUT =
0.001
MAX_KEEPALIVE_REQUESTS =
100
HTTP_10 =
"HTTP/1.0"
HTTP_11 =
"HTTP/1.1"
STATUS_LINE_CACHE_10 =
Hash.new do |h, status|
  reason = Rack::Utils::HTTP_STATUS_CODES[status]
  h[status] = "HTTP/1.0 #{status}#{reason ? " #{reason}" : ""}\r\n".freeze
end
STATUS_LINE_CACHE_11 =
Hash.new do |h, status|
  reason = Rack::Utils::HTTP_STATUS_CODES[status]
  h[status] = "HTTP/1.1 #{status}#{reason ? " #{reason}" : ""}\r\n".freeze
end
STATUS_WITH_NO_ENTITY_BODY =
Set.new([204, 304, *100..199]).freeze
BAD_REQUEST_RESPONSE =
"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
INTERNAL_SERVER_ERROR_RESPONSE =
"HTTP/1.1 500 Internal Server Error\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
CONTENT_TOO_LARGE_RESPONSE =
"HTTP/1.1 413 Content Too Large\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
CONNECTION_CLOSE =
"close"
CONNECTION_KEEPALIVE =
"keep-alive"
TRANSFER_ENCODING_CHUNKED =
"chunked"
HTTP_CONNECTION =
"HTTP_CONNECTION"
HTTP_TRANSFER_ENCODING =
"HTTP_TRANSFER_ENCODING"
RACK_HEADER_PREFIX =
"rack."
RACK_HIJACKED =
"rack.hijacked"
RACK_HIJACK_IO =
"rack.hijack_io"
ILLEGAL_HEADER_KEY_REGEX =
/[\x00-\x20\(\)<>@,;:\\"\/\[\]\?=\{\}\x7F]/
ILLEGAL_HEADER_VALUE_REGEX =
/[\x00-\x08\x0A-\x1F]/

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(app, server_port, client_options: {}, on_error: nil) ⇒ void

Creates a new Request handler.

Parameters:

  • app (#call)

    the Rack application to dispatch complete requests to

  • server_port (Integer)

    port number used to populate SERVER_PORT in the Rack env

  • client_options (Hash) (defaults to: {})

    client limits configuration

  • on_error (#call, nil) (defaults to: nil)

    callback invoked with (env, exception) when the Rack app raises

Options Hash (client_options:):

  • :max_body_size (Integer, nil)

    maximum request body size in bytes

  • :body_spool_threshold (Integer, nil)

    spool bodies larger than this to a tempfile



136
137
138
139
140
141
142
143
# File 'lib/raptor/request.rb', line 136

def initialize(app, server_port, client_options: {}, on_error: nil)
  @app = app
  @server_port = server_port
  @max_body_size = client_options[:max_body_size]
  @body_spool_threshold = client_options[:body_spool_threshold]
  @on_error = on_error
  @running = AtomicBoolean.new(true)
end

Class Method Details

.decode_chunked(buffer, max_size = nil) ⇒ Array(String, Symbol)

Decodes a chunked transfer-encoded body buffer.

Returns the decoded bytes and a state symbol: ‘:complete` when the terminating zero-length chunk was found, `:too_large` when the decoded size would exceed `max_size`, or `:incomplete` otherwise.

Parameters:

  • buffer (String)

    the raw body buffer to decode

  • max_size (Integer, nil) (defaults to: nil)

    maximum decoded body size, or nil for unlimited

Returns:

  • (Array(String, Symbol))

    decoded body and completion state



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/raptor/request.rb', line 73

def self.decode_chunked(buffer, max_size = nil)
  decoded = String.new
  offset = 0

  while offset < buffer.bytesize
    crlf = buffer.index("\r\n", offset)
    return [decoded, :incomplete] unless crlf

    chunk_size = buffer.byteslice(offset, crlf - offset).to_i(16)
    return [decoded, :complete] if chunk_size == 0
    return [decoded, :too_large] if max_size && decoded.bytesize + chunk_size > max_size

    offset = crlf + 2
    decoded << buffer.byteslice(offset, chunk_size)
    offset += chunk_size + 2
  end

  [decoded, :incomplete]
end

.socket_write(socket, string) ⇒ void

This method returns an undefined value.

Writes ‘string` in full, retrying on partial writes. Bounded by `WRITE_TIMEOUT` so a slow client can’t pin the writing thread.

Parameters:

  • socket (TCPSocket)

    the socket to write to

  • string (String)

    the data to write

Raises:

  • (WriteError)

    if the socket is not writable within the timeout or raises IOError



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/raptor/request.rb', line 102

def self.socket_write(socket, string)
  bytes = 0
  byte_size = string.bytesize

  while bytes < byte_size
    begin
      bytes += socket.write_nonblock(bytes.zero? ? string : string.byteslice(bytes..-1))
    rescue IO::WaitWritable
      raise WriteError unless socket.wait_writable(WRITE_TIMEOUT)
      retry
    rescue IOError
      raise WriteError
    end
  end
end

Instance Method Details

#eager_accept(socket, id, reactor, thread_pool, remote_addr, url_scheme) ⇒ void

This method returns an undefined value.

Eagerly reads and parses the first request on a freshly accepted connection on the server thread, dispatching directly to the thread pool when complete. Falls back to the reactor when more data is needed.

Parameters:

  • socket (TCPSocket)

    the freshly accepted client socket

  • id (Integer)

    unique client identifier

  • reactor (Reactor)

    the reactor for fallback registration

  • thread_pool (AtomicThreadPool)

    thread pool for application processing

  • remote_addr (String)

    client IP address

  • url_scheme (String)

    “http” or “https”



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/raptor/request.rb', line 168

def eager_accept(socket, id, reactor, thread_pool, remote_addr, url_scheme)
  data = begin
    socket.read_nonblock(READ_BUFFER_SIZE)
  rescue IO::WaitReadable
    reactor.add(
      id: id,
      socket: socket,
      remote_addr: remote_addr,
      url_scheme: url_scheme
    )
    return
  rescue EOFError, IOError
    socket.close rescue nil
    return
  end

  buffer = String.new
  buffer << data

  while socket.respond_to?(:pending) && socket.pending > 0
    buffer << socket.read_nonblock(socket.pending)
  end

  parser = HttpParser.new
  env = {}
  nread = begin
    parser.execute(env, buffer, 0)
  rescue HttpParserError
    reject_malformed(socket)
    return
  end
  parse_data = { parse_count: 1, content_length: parser.content_length }

  body = nil
  if !parser.finished?
    fallback_to_reactor(socket, id, buffer, env, parse_data, reactor, 0, remote_addr, url_scheme, persisted: false)
    return
  elsif parser.has_body?
    if @max_body_size && parser.content_length > @max_body_size
      reject_oversized(socket)
      return
    end

    body = buffer.byteslice(nread..-1) || ""

    if env[HTTP_TRANSFER_ENCODING]&.include?(TRANSFER_ENCODING_CHUNKED)
      body, chunked_state = Request.decode_chunked(body, @max_body_size)
      case chunked_state
      when :complete
        env.delete(HTTP_TRANSFER_ENCODING)
      when :too_large
        reject_oversized(socket)
        return
      else
        fallback_to_reactor(socket, id, buffer, env, parse_data, reactor, 0, remote_addr, url_scheme, persisted: false)
        return
      end
    elsif parser.content_length > body.bytesize
      fallback_to_reactor(socket, id, buffer, env, parse_data, reactor, 0, remote_addr, url_scheme, persisted: false)
      return
    end
  end

  thread_pool << proc do
    process_client(socket, id, env, parse_data, body, reactor, thread_pool, 1, remote_addr, url_scheme)
  end
end

#handle_parsed_request(parsed_request, reactor, thread_pool) ⇒ void

This method returns an undefined value.

Handles a parsed HTTP request by either continuing parsing or dispatching to the Rack app.

For incomplete requests, updates reactor state and re-registers for more I/O. For complete requests, removes from reactor, builds Rack env, and dispatches to thread pool.

Parameters:

  • parsed_request (Hash)

    the parsed request state from the ractor pool

  • reactor (Reactor)

    the reactor managing the client connection

  • thread_pool (AtomicThreadPool)

    thread pool for application processing



309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/raptor/request.rb', line 309

def handle_parsed_request(parsed_request, reactor, thread_pool)
  if parsed_request[:too_large]
    socket = reactor.remove(parsed_request[:id])
    reject_oversized(socket) if socket
    return
  end

  if parsed_request[:malformed]
    socket = reactor.remove(parsed_request[:id])
    reject_malformed(socket) if socket
    return
  end

  unless parsed_request[:complete]
    reactor.update_state(parsed_request)
  else
    socket = reactor.remove(parsed_request[:id])
    request_count = (parsed_request[:request_count] || 0) + 1
    remote_addr = parsed_request[:remote_addr] || Server::DEFAULT_REMOTE_ADDR
    url_scheme = parsed_request[:url_scheme] || Server::HTTP_SCHEME

    thread_pool << proc do
      process_client(
        socket,
        parsed_request[:id],
        parsed_request[:env].dup,
        parsed_request[:parse_data],
        parsed_request[:body],
        reactor,
        thread_pool,
        request_count,
        remote_addr,
        url_scheme
      )
    end
  end
end

#http_parser_workerProc

Returns a Proc for HTTP parsing work in Ractor context.

The returned Proc processes raw socket data through the appropriate HTTP parser and returns either a complete request state (ready for app processing) or incomplete request state (needs more data).

Returns:

  • (Proc)

    a Ractor-safe proc that accepts a state hash and returns an updated state hash



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/raptor/request.rb', line 245

def http_parser_worker
  max_body_size = @max_body_size

  proc do |data|
    next Raptor::Http2.process_frames(data) if data[:protocol] == :http2

    parser = Raptor::HttpParser.new
    env = {}
    nread = begin
      parser.execute(env, data[:buffer], 0)
    rescue Raptor::HttpParserError
      next Ractor.make_shareable(data.merge(complete: true, malformed: true))
    end
    parse_data = if data[:parse_data]
      data[:parse_data].dup
    else
      { parse_count: 0, content_length: parser.content_length }
    end
    parse_data[:parse_count] += 1

    message = if parser.finished?
      if parser.has_body?
        body_buffer = data[:buffer].byteslice(nread..-1) || ""

        if max_body_size && parser.content_length > max_body_size
          data.merge(env: env, body: nil, parse_data: parse_data, complete: true, too_large: true)
        elsif env[HTTP_TRANSFER_ENCODING]&.include?(TRANSFER_ENCODING_CHUNKED)
          decoded_body, chunked_state = Raptor::Request.decode_chunked(body_buffer, max_body_size)

          case chunked_state
          when :complete
            env.delete(HTTP_TRANSFER_ENCODING)
            data.merge(env: env, body: decoded_body, parse_data: parse_data, complete: true)
          when :too_large
            data.merge(env: env, body: nil, parse_data: parse_data, complete: true, too_large: true)
          else
            data.merge(env: env, parse_data: parse_data)
          end
        elsif parser.content_length > body_buffer.bytesize
          data.merge(env: env, parse_data: parse_data)
        else
          data.merge(env: env, body: body_buffer, parse_data: parse_data, complete: true)
        end
      else
        data.merge(env: env, body: nil, parse_data: parse_data, complete: true)
      end
    else
      data.merge(env: env, parse_data: parse_data)
    end
    Ractor.make_shareable(message)
  end
end

#shutdownvoid

This method returns an undefined value.

Signals eager keep-alive loops to stop processing further requests on their connections. In-flight requests complete normally.



151
152
153
# File 'lib/raptor/request.rb', line 151

def shutdown
  @running.make_false
end