Class: Raptor::Request

Inherits:
Object
  • Object
show all
Defined in:
lib/raptor/request.rb

Overview

Handles HTTP request processing and Rack application integration.

Request manages the HTTP parsing pipeline using Ractors and coordinates with the reactor for connection state management. It bridges between the low-level HTTP parsing and high-level Rack application interface, handling both incomplete requests (that need more data) and complete requests (ready for application processing).

Defined Under Namespace

Classes: Error, WriteError

Constant Summary collapse

BODY_BUFFER_THRESHOLD =
256 * 1024
FILE_CHUNK_SIZE =
64 * 1024
READ_BUFFER_SIZE =
64 * 1024
WRITE_TIMEOUT =
5
KEEPALIVE_READ_TIMEOUT =
0.001
MAX_KEEPALIVE_REQUESTS =
100
HTTP_SCHEME =
"http"
HTTP_10 =
"HTTP/1.0"
HTTP_11 =
"HTTP/1.1"
STATUS_LINE_CACHE_10 =
Hash.new do |h, status|
  reason = Rack::Utils::HTTP_STATUS_CODES[status]
  h[status] = "HTTP/1.0 #{status}#{reason ? " #{reason}" : ""}\r\n".freeze
end
STATUS_LINE_CACHE_11 =
Hash.new do |h, status|
  reason = Rack::Utils::HTTP_STATUS_CODES[status]
  h[status] = "HTTP/1.1 #{status}#{reason ? " #{reason}" : ""}\r\n".freeze
end
STATUS_WITH_NO_ENTITY_BODY =
Set.new([204, 304, *100..199]).freeze
INTERNAL_SERVER_ERROR_RESPONSE =
"HTTP/1.1 500 Internal Server Error\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
CONTENT_TOO_LARGE_RESPONSE =
"HTTP/1.1 413 Content Too Large\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
CONNECTION_CLOSE =
"close"
CONNECTION_KEEPALIVE =
"keep-alive"
TRANSFER_ENCODING_CHUNKED =
"chunked"
HTTP_CONNECTION =
"HTTP_CONNECTION"
HTTP_TRANSFER_ENCODING =
"HTTP_TRANSFER_ENCODING"
RACK_HEADER_PREFIX =
"rack."
RACK_HIJACKED =
"rack.hijacked"
RACK_HIJACK_IO =
"rack.hijack_io"
ILLEGAL_HEADER_KEY_REGEX =
/[\x00-\x20\(\)<>@,;:\\"\/\[\]\?=\{\}\x7F]/
ILLEGAL_HEADER_VALUE_REGEX =
/[\x00-\x08\x0A-\x1F]/

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(app, server_port, client_options: {}, on_error: nil) ⇒ void

Creates a new Request handler.

Parameters:

  • app (#call)

    the Rack application to dispatch complete requests to

  • server_port (Integer)

    port number used to populate SERVER_PORT in the Rack env

  • client_options (Hash) (defaults to: {})

    client limits configuration

  • on_error (#call, nil) (defaults to: nil)

    callback invoked with (env, exception) when the Rack app raises

Options Hash (client_options:):

  • :max_body_size (Integer, nil)

    maximum request body size in bytes

  • :body_spool_threshold (Integer, nil)

    spool bodies larger than this to a tempfile



112
113
114
115
116
117
118
# File 'lib/raptor/request.rb', line 112

def initialize(app, server_port, client_options: {}, on_error: nil)
  @app = app
  @server_port = server_port
  @max_body_size = client_options[:max_body_size]
  @body_spool_threshold = client_options[:body_spool_threshold]
  @on_error = on_error
end

Class Method Details

.decode_chunked(buffer, max_size = nil) ⇒ Array(String, Symbol)

Decodes a chunked transfer-encoded body buffer.

Returns the decoded bytes and a state symbol: ‘:complete` when the terminating zero-length chunk was found, `:too_large` when the decoded size would exceed `max_size`, or `:incomplete` otherwise.

Parameters:

  • buffer (String)

    the raw body buffer to decode

  • max_size (Integer, nil) (defaults to: nil)

    maximum decoded body size, or nil for unlimited

Returns:

  • (Array(String, Symbol))

    decoded body and completion state



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/raptor/request.rb', line 75

def self.decode_chunked(buffer, max_size = nil)
  decoded = String.new
  offset = 0

  while offset < buffer.bytesize
    crlf = buffer.index("\r\n", offset)
    return [decoded, :incomplete] unless crlf

    chunk_size = buffer.byteslice(offset, crlf - offset).to_i(16)
    return [decoded, :complete] if chunk_size == 0
    return [decoded, :too_large] if max_size && decoded.bytesize + chunk_size > max_size

    offset = crlf + 2
    decoded << buffer.byteslice(offset, chunk_size)
    offset += chunk_size + 2
  end

  [decoded, :incomplete]
end

Instance Method Details

#eager_accept(socket, id, reactor, thread_pool, remote_addr, url_scheme) ⇒ void

This method returns an undefined value.

Eagerly reads and parses the first request on a freshly accepted connection on the server thread, dispatching directly to the thread pool when complete. Falls back to the reactor when more data is needed.

Parameters:

  • socket (TCPSocket)

    the freshly accepted client socket

  • id (Integer)

    unique client identifier

  • reactor (Reactor)

    the reactor for fallback registration

  • thread_pool (AtomicThreadPool)

    thread pool for application processing

  • remote_addr (String)

    client IP address

  • url_scheme (String)

    “http” or “https”



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/raptor/request.rb', line 133

def eager_accept(socket, id, reactor, thread_pool, remote_addr, url_scheme)
  data = begin
    socket.read_nonblock(READ_BUFFER_SIZE)
  rescue IO::WaitReadable
    reactor.add(
      id: id,
      socket: socket,
      remote_addr: remote_addr,
      url_scheme: url_scheme
    )
    return
  rescue EOFError, IOError
    socket.close rescue nil
    return
  end

  buffer = String.new
  buffer << data

  while socket.respond_to?(:pending) && socket.pending > 0
    buffer << socket.read_nonblock(socket.pending)
  end

  parser = HttpParser.new
  env = {}
  nread = parser.execute(env, buffer, 0)
  parse_data = { parse_count: 1, content_length: parser.content_length }

  body = nil
  if !parser.finished?
    fallback_to_reactor(socket, id, buffer, env, parse_data, reactor, 0, remote_addr, url_scheme, persisted: false)
    return
  elsif parser.has_body?
    if @max_body_size && parser.content_length > @max_body_size
      reject_oversized(socket)
      return
    end

    body = buffer.byteslice(nread..-1) || ""

    if env[HTTP_TRANSFER_ENCODING]&.include?(TRANSFER_ENCODING_CHUNKED)
      body, chunked_state = Request.decode_chunked(body, @max_body_size)
      case chunked_state
      when :complete
        env.delete(HTTP_TRANSFER_ENCODING)
      when :too_large
        reject_oversized(socket)
        return
      else
        fallback_to_reactor(socket, id, buffer, env, parse_data, reactor, 0, remote_addr, url_scheme, persisted: false)
        return
      end
    elsif parser.content_length > body.bytesize
      fallback_to_reactor(socket, id, buffer, env, parse_data, reactor, 0, remote_addr, url_scheme, persisted: false)
      return
    end
  end

  thread_pool << proc do
    process_client(socket, id, env, parse_data, body, reactor, thread_pool, 1, remote_addr, url_scheme)
  end
end

#handle_parsed_request(parsed_request, reactor, thread_pool) ⇒ void

This method returns an undefined value.

Handles a parsed HTTP request by either continuing parsing or dispatching to the Rack app.

For incomplete requests, updates reactor state and re-registers for more I/O. For complete requests, removes from reactor, builds Rack env, and dispatches to thread pool.

Parameters:

  • parsed_request (Hash)

    the parsed request state from the ractor pool

  • reactor (Reactor)

    the reactor managing the client connection

  • thread_pool (AtomicThreadPool)

    thread pool for application processing



265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# File 'lib/raptor/request.rb', line 265

def handle_parsed_request(parsed_request, reactor, thread_pool)
  if parsed_request[:too_large]
    socket = reactor.remove(parsed_request[:id])
    reject_oversized(socket) if socket
    return
  end

  unless parsed_request[:complete]
    reactor.update_state(parsed_request)
  else
    socket = reactor.remove(parsed_request[:id])
    request_count = (parsed_request[:request_count] || 0) + 1
    remote_addr = parsed_request[:remote_addr] || "127.0.0.1"
    url_scheme = parsed_request[:url_scheme] || HTTP_SCHEME

    thread_pool << proc do
      process_client(
        socket,
        parsed_request[:id],
        parsed_request[:env].dup,
        parsed_request[:parse_data],
        parsed_request[:body],
        reactor,
        thread_pool,
        request_count,
        remote_addr,
        url_scheme
      )
    end
  end
end

#http_parser_workerProc

Returns a Proc for HTTP parsing work in Ractor context.

The returned Proc processes raw socket data through the appropriate HTTP parser and returns either a complete request state (ready for app processing) or incomplete request state (needs more data).

Returns:

  • (Proc)

    a Ractor-safe proc that accepts a state hash and returns an updated state hash



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/raptor/request.rb', line 205

def http_parser_worker
  max_body_size = @max_body_size

  proc do |data|
    next Raptor::Http2.process_frames(data) if data[:protocol] == :http2

    parser = Raptor::HttpParser.new
    env = {}
    nread = parser.execute(env, data[:buffer], 0)
    parse_data = if data[:parse_data]
      data[:parse_data].dup
    else
      { parse_count: 0, content_length: parser.content_length }
    end
    parse_data[:parse_count] += 1

    message = if parser.finished?
      if parser.has_body?
        body_buffer = data[:buffer].byteslice(nread..-1) || ""

        if max_body_size && parser.content_length > max_body_size
          data.merge(env: env, body: nil, parse_data: parse_data, complete: true, too_large: true)
        elsif env[HTTP_TRANSFER_ENCODING]&.include?(TRANSFER_ENCODING_CHUNKED)
          decoded_body, chunked_state = Raptor::Request.decode_chunked(body_buffer, max_body_size)

          case chunked_state
          when :complete
            env.delete(HTTP_TRANSFER_ENCODING)
            data.merge(env: env, body: decoded_body, parse_data: parse_data, complete: true)
          when :too_large
            data.merge(env: env, body: nil, parse_data: parse_data, complete: true, too_large: true)
          else
            data.merge(env: env, parse_data: parse_data)
          end
        elsif parser.content_length > body_buffer.bytesize
          data.merge(env: env, parse_data: parse_data)
        else
          data.merge(env: env, body: body_buffer, parse_data: parse_data, complete: true)
        end
      else
        data.merge(env: env, body: nil, parse_data: parse_data, complete: true)
      end
    else
      data.merge(env: env, parse_data: parse_data)
    end
    Ractor.make_shareable(message)
  end
end