Class: Raptor::Reactor

Inherits:
Object
  • Object
show all
Defined in:
lib/raptor/reactor.rb

Overview

High-performance I/O reactor for managing client connections and timeouts.

Reactor uses NIO selectors for efficient I/O multiplexing and implements client timeouts using a red-black tree for O(log n) timeout management. It coordinates between ractor pools for CPU-intensive HTTP parsing and thread pools for blocking operations, and provides backlog metrics that the server uses for backpressure control to prevent overload.

Examples:

reactor = Reactor.new(ractor_pool, thread_pool, client_options: {
  first_data_timeout: 30,
  chunk_data_timeout: 10
})
reactor.run
reactor.add(id: client.object_id, socket: client)
# ... later
reactor.shutdown

Defined Under Namespace

Classes: TimeoutClient

Constant Summary collapse

CHUNK_SIZE =
64 * 1024
TIMEOUT_RESPONSE =
"HTTP/1.1 408 Request Timeout\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"

Instance Method Summary collapse

Constructor Details

#initialize(ractor_pool, thread_pool, client_options:) ⇒ void

Creates a new Reactor instance.

Parameters:

  • ractor_pool (RactorPool)

    ractor pool for HTTP parsing

  • thread_pool (AtomicThreadPool)

    thread pool for application processing

  • client_options (Hash)

    timeout configuration options

Options Hash (client_options:):

  • :first_data_timeout (Integer)

    timeout for initial data

  • :chunk_data_timeout (Integer)

    timeout for subsequent chunks

  • :persistent_data_timeout (Integer)

    timeout for keep-alive connections



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/raptor/reactor.rb', line 92

def initialize(ractor_pool, thread_pool, client_options:)
  @ractor_pool = ractor_pool
  @thread_pool = thread_pool
  @client_options = client_options

  @selector = NIO::Selector.new
  @queue = Queue.new
  @timeouts = RedBlackTree.new

  @id_to_socket = {}
  @socket_to_state = {}
  @id_to_timeout = {}
  @id_to_writer = {}
  @id_to_flow_control = {}
end

Instance Method Details

#add(state) ⇒ void

This method returns an undefined value.

Adds a new client connection to the reactor.

Parameters:

  • state (Hash)

    client connection state including socket and ID

Options Hash (state):

  • :socket (TCPSocket)

    the client socket

  • :id (Integer)

    unique identifier for the client



168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/raptor/reactor.rb', line 168

def add(state)
  socket = state[:socket]
  state.delete(:socket)
  writer = state.delete(:writer)
  flow_control = state.delete(:flow_control)
  @id_to_socket[state[:id]] = socket
  @socket_to_state[socket] = state
  @id_to_writer[state[:id]] = writer if writer
  @id_to_flow_control[state[:id]] = flow_control if flow_control

  read_and_queue_for_parse(socket, state)
end

#backlogInteger

Returns the number of complete requests either being processed or awaiting processing.

Returns:

  • (Integer)

    number of complete requests



339
340
341
# File 'lib/raptor/reactor.rb', line 339

def backlog
  @thread_pool.queue_size + @thread_pool.active_count
end

#close_connection(id) ⇒ void

This method returns an undefined value.

Closes the socket for the given connection and drops all reactor state associated with it. Used to terminate HTTP/2 connections after sending a GOAWAY frame.

Parameters:

  • id (Integer)

    unique client identifier



312
313
314
315
316
317
318
319
320
# File 'lib/raptor/reactor.rb', line 312

def close_connection(id)
  socket = @id_to_socket.delete(id)
  return unless socket

  @socket_to_state.delete(socket)
  @id_to_writer.delete(id)
  @id_to_flow_control.delete(id)
  socket.close rescue nil
end

#flow_control_for(id) ⇒ Object?

Returns the flow controller associated with a given connection, if one was supplied when the connection was added. Used by HTTP/2 stream dispatchers to honour the peer’s flow-control windows.

Parameters:

  • id (Integer)

    unique client identifier

Returns:

  • (Object, nil)

    the flow controller, if found



280
281
282
# File 'lib/raptor/reactor.rb', line 280

def flow_control_for(id)
  @id_to_flow_control[id]
end

#persist(socket, id, request_count, remote_addr:, url_scheme:) ⇒ void

This method returns an undefined value.

Re-registers a kept-alive connection for the next request cycle.

Called after successfully writing a response when keep-alive is active. Resets the connection state and re-queues the socket in the selector using the persistent data timeout.

Parameters:

  • socket (TCPSocket)

    the kept-alive client socket

  • id (Integer)

    the unique client identifier

  • request_count (Integer)

    number of requests handled on this connection

  • remote_addr (String)

    the client’s remote IP address

  • url_scheme (String)

    “http” or “https”



230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/raptor/reactor.rb', line 230

def persist(socket, id, request_count, remote_addr:, url_scheme:)
  state = {
    id: id,
    request_count: request_count,
    remote_addr: remote_addr,
    url_scheme: url_scheme,
    persisted: true
  }

  @id_to_socket[id] = socket
  @socket_to_state[socket] = state
  @queue << socket
  @selector.wakeup
rescue ClosedQueueError
  socket.close
end

#remove(id) ⇒ TCPSocket?

Drops the reactor’s references to a client whose parsed request has been handed off to the thread pool. The socket itself is kept open so the worker can write the response.

Parameters:

  • id (Integer)

    unique client identifier

Returns:

  • (TCPSocket, nil)

    the socket associated with ‘id`, if any



210
211
212
213
214
# File 'lib/raptor/reactor.rb', line 210

def remove(id)
  @id_to_socket.delete(id).tap do |socket|
    @socket_to_state.delete(socket)
  end
end

#runThread

Starts the reactor’s main event loop in a new thread.

The event loop handles I/O events, processes timeouts, manages the registration queue, and controls server connection acceptance. It continues until the queue is closed and emptied.

Returns:

  • (Thread)

    the thread running the reactor event loop



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/raptor/reactor.rb', line 117

def run
  Thread.new do
    Thread.current.name = self.class.name

    until @queue.closed? && @queue.empty?
      begin
        timeout = @timeouts.min&.timeout(Process.clock_gettime(Process::CLOCK_MONOTONIC))
        @selector.select(timeout) do |monitor|
          wakeup!(monitor.value)
        end

        now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
        expired = []
        @timeouts.traverse do |to_client|
          break unless to_client.timeout(now) == 0

          expired << to_client
        end

        expired.each do |to_client|
          @timeouts.delete!(to_client)
          id = to_client.client_data[:id]
          @id_to_timeout.delete(id)
          socket = @id_to_socket[id]
          next unless socket

          @selector.deregister(socket)
          socket.write(TIMEOUT_RESPONSE) rescue nil
          cleanup(socket)
        end

        until @queue.empty?
          register(@queue.pop)
        end
      rescue => error
        Log.rescued_error(error)
      end
    end

    @selector.close
  end
end

#shutdownvoid

This method returns an undefined value.

Closes the registration queue and wakes the selector so the event loop drains pending work and exits.



328
329
330
331
# File 'lib/raptor/reactor.rb', line 328

def shutdown
  @queue.close
  @selector.wakeup
end

#socket_for(id) ⇒ TCPSocket?

Returns the socket for a given client identifier without removing it.

Used by HTTP/2 connections where the socket remains registered across multiple stream requests.

Parameters:

  • id (Integer)

    unique client identifier

Returns:

  • (TCPSocket, nil)

    the socket, if found



256
257
258
# File 'lib/raptor/reactor.rb', line 256

def socket_for(id)
  @id_to_socket[id]
end

#update_http2_state(state) ⇒ void

This method returns an undefined value.

Updates connection state for an HTTP/2 connection after frame processing.

Re-registers the socket with the selector for further reads and stores the updated HPACK table and stream states.

Parameters:

  • state (Hash)

    updated connection state from the ractor pool



293
294
295
296
297
298
299
300
301
302
# File 'lib/raptor/reactor.rb', line 293

def update_http2_state(state)
  socket = @id_to_socket[state[:id]]
  return unless socket

  @socket_to_state[socket] = state
  @queue << socket
  @selector.wakeup
rescue ClosedQueueError
  socket.close
end

#update_state(state) ⇒ void

This method returns an undefined value.

Updates the state of an existing client connection.

Called when an incomplete HTTP request needs to be re-registered with the reactor for further processing.

Parameters:

  • state (Hash)

    updated client connection state

Options Hash (state):

  • :id (Integer)

    client identifier



191
192
193
194
195
196
197
198
199
200
# File 'lib/raptor/reactor.rb', line 191

def update_state(state)
  socket = @id_to_socket[state[:id]]
  return unless socket

  @socket_to_state[socket] = state
  @queue << socket
  @selector.wakeup
rescue ClosedQueueError
  socket.close
end

#writer_for(id) ⇒ Object?

Returns the writer object associated with a given connection, if one was supplied when the connection was added. Used by protocol handlers that need to coordinate concurrent socket writes.

Parameters:

  • id (Integer)

    unique client identifier

Returns:

  • (Object, nil)

    the writer, if found



268
269
270
# File 'lib/raptor/reactor.rb', line 268

def writer_for(id)
  @id_to_writer[id]
end