Class: Tep::Server::Scheduled

Inherits:
Object
  • Object
show all
Defined in:
lib/tep/server_scheduled.rb

Constant Summary collapse

MAX_REQUEST_BYTES =

Max bytes accepted from a single request’s start-line + headers. Bigger requests get 413; matches the blocking server’s SPHTTP_BUFSIZE cap (64 KiB).

65535
KEEPALIVE_TIMEOUT =

Idle keep-alive timeout between requests on the same connection. 30s matches nginx; bump from app code as needed.

30
HEADER_READ_TIMEOUT =

Slow-headers DoS guard.

10

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(app) ⇒ Scheduled

Returns a new instance of Scheduled.



39
40
41
# File 'lib/tep/server_scheduled.rb', line 39

def initialize(app)
  @app = app
end

Instance Attribute Details

#appObject

Returns the value of attribute app.



37
38
39
# File 'lib/tep/server_scheduled.rb', line 37

def app
  @app
end

Class Method Details

.accept_loop(sfd) ⇒ Object

Accept loop. Each accepted connection becomes its own fiber that closes over the just-accepted ‘client` fd.



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/tep/server_scheduled.rb', line 115

def self.accept_loop(sfd)
  while true
    # SIGTERM/SIGINT: sphttp's term flag is set by the signal
    # handler; check before parking on io_wait so we don't sleep
    # past a shutdown request. The 1s io_wait timeout below
    # bounds the sleep-side latency. The parent (or this same
    # process for workers=1) emits the aggregated run_end after
    # all workers exit (#128).
    if Sock.sphttp_shutdown_requested != 0
      break
    end
    # Bounded wait so the flag check above runs once per second
    # even when traffic is idle (was -1 = wait forever).
    ready = Tep::Scheduler.io_wait(sfd, Tep::Scheduler::READ, 1)
    if ready == 0
      next
    end
    client = Sock.sphttp_accept_nb(sfd)
    if client < 0
      next
    end
    Sock.sphttp_set_nonblock(client)
    conn = Fiber.new { Tep::Server::Scheduled.handle_connection(client) }
    Tep::Scheduler.spawn_fiber(conn)
  end
end

.handle_connection(client) ⇒ Object

Per-connection lifecycle.



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/tep/server_scheduled.rb', line 177

def self.handle_connection(client)
  # Inbound TLS: complete a non-blocking server handshake before
  # reading anything. Runs inside this per-connection fiber so a
  # slow handshake parks cooperatively instead of blocking the
  # accept loop. On failure (incl. a plaintext client hitting the
  # TLS port) drop the connection.
  if Tep::APP.tls_cert.length > 0
    if Tep::Server::Scheduled.tls_handshake(client) == 0
      Sock.sphttp_close(client)
      return 0
    end
  end
  keep_going = true
  while keep_going
    blob = Tep::Server::Scheduled.read_request_blob(client, KEEPALIVE_TIMEOUT)
    if blob.length == 0
      break
    end
    req = Parser.parse(blob)
    if req == nil
      Tep::Server::Scheduled.send_simple(client, 400, "bad request")
      break
    end

    req.consume_body_via_scheduler(client)

    res = Response.new
    Tep::APP.dispatch(req, res)

    # Streaming responses use chunked Connection: close (same
    # simplification as the prefork server) -- force the
    # keep-alive loop to end after this response so the stream's
    # terminator isn't followed by a stale read on the same fd.
    keep_alive = req.keep_alive? && !res.halted_close? && !res.streaming
    Tep::Server::Scheduled.write_response(client, req, res, keep_alive)
    keep_going = keep_alive
  end
  Sock.sphttp_close(client)
  0
end

.read_request_blob(fd, timeout_seconds) ⇒ Object

Non-blocking request reader. Returns the accumulated blob once “rnrn” is seen, or “” on timeout / EOF / oversize.



220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/tep/server_scheduled.rb', line 220

def self.read_request_blob(fd, timeout_seconds)
  buf = ""
  deadline = Time.now.to_i + timeout_seconds
  while buf.length < MAX_REQUEST_BYTES
    remaining = deadline - Time.now.to_i
    if remaining <= 0
      return ""
    end
    ready = Tep::Scheduler.io_wait(fd, Tep::Scheduler::READ, remaining)
    if ready == 0
      return ""
    end
    chunk = Sock.sphttp_recv_some(fd, 4096)
    if chunk.length == 0
      # Over TLS an empty read can be a partial record (SSL_read
      # WANT_READ/WANT_WRITE), not EOF -- re-park on the indicated
      # direction and retry rather than dropping the request. The
      # loop top re-applies the deadline on the want-read path.
      st = Sock.sphttp_io_status
      if st == 1
        next
      end
      if st == 2
        Tep::Scheduler.io_wait(fd, Tep::Scheduler::WRITE, remaining)
        next
      end
      return ""
    end
    buf = buf + chunk
    if buf.length >= 4 && buf.include?("\r\n\r\n")
      return buf
    end
  end
  ""
end

.run_worker(sfd) ⇒ Object

Spawn the accept fiber + pump the scheduler. Called inside each prefork child. Loops directly on ‘tick` rather than `run_until_empty` because the accept fiber parks on io_wait indefinitely – run_until_empty bails when no fiber is ready to resume THIS pass; we need to keep polling so parked accept-on-sfd fibers get woken when a connection arrives.



104
105
106
107
108
109
110
111
# File 'lib/tep/server_scheduled.rb', line 104

def self.run_worker(sfd)
  f = Fiber.new { Tep::Server::Scheduled.accept_loop(sfd) }
  Tep::Scheduler.spawn_fiber(f)
  while Tep::Scheduler.alive_count > 0
    Tep::Scheduler.tick(1000)
  end
  0
end

.send_simple(client, status, msg) ⇒ Object



365
366
367
368
369
370
371
372
# File 'lib/tep/server_scheduled.rb', line 365

def self.send_simple(client, status, msg)
  reason = Tep.reason(status)
  head = "HTTP/1.0 " + status.to_s + " " + reason + "\r\n" +
         "Content-Length: " + msg.length.to_s + "\r\n" +
         "Connection: close\r\n\r\n" + msg
  Sock.sphttp_write_str(client, head)
  0
end

.tls_handshake(client) ⇒ Object

Non-blocking server-side TLS handshake on an accepted fd. Returns 1 on success (SSL* registered – reads/writes are now transparent), 0 on failure. Drives SSL_do_handshake, parking on io_wait for the direction OpenSSL asks for, bounded by HEADER_READ_TIMEOUT so a connection that opens but never completes the handshake (port probe, slowloris, plain-HTTP client) can’t pin the fiber.



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/tep/server_scheduled.rb', line 149

def self.tls_handshake(client)
  if Sock.sphttp_tls_accept_start(client) < 0
    return 0
  end
  deadline = Time.now.to_i + HEADER_READ_TIMEOUT
  hs = Sock.sphttp_tls_handshake_step(client)
  while hs == 1 || hs == 2
    remaining = deadline - Time.now.to_i
    if remaining <= 0
      return 0
    end
    mode = Tep::Scheduler::READ
    if hs == 2
      mode = Tep::Scheduler::WRITE
    end
    ready = Tep::Scheduler.io_wait(client, mode, remaining)
    if ready == 0
      return 0
    end
    hs = Sock.sphttp_tls_handshake_step(client)
  end
  if hs < 0
    return 0
  end
  1
end

.write_response(client, req, res, keep_alive) ⇒ Object

Body-shape mirror of Tep::Server#write_response. Lifted into a cmeth so the connection fiber can call it without a captured ‘self`.



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
# File 'lib/tep/server_scheduled.rb', line 259

def self.write_response(client, req, res, keep_alive)
  # WebSocket upgrade branch. Set by res.start_websocket in the
  # user's handler after a successful Handshake.check. Writes
  # the 101 Switching Protocols head, then assigns the client
  # fd onto the driver and runs the recv loop. The recv loop
  # returns when the connection closes (peer EOF, idle timeout,
  # or a CLOSE frame round-trip). After return, the caller's
  # handle_connection closes the fd as usual.
  if res.upgrading_ws
    head = Tep::WebSocket::Handshake.build_response(
      res.ws_accept_key, res.ws_driver.subprotocol)
    Sock.sphttp_write_str(client, head)
    res.ws_driver.set_fd(client)
    conn = Tep::WebSocket::Connection.new(res.ws_driver)
    conn.run
    return 0
  end

  # Streaming branch -- cooperative mirror of Tep::Server's
  # streaming path (server.rb). Set by res.start_stream(streamer)
  # in the handler. Writes a chunked-encoding head immediately,
  # hands a Tep::Stream writer to the user's Streamer#pump, then
  # emits the end-of-stream terminator. pump runs cooperatively:
  # it parks on Tep::Scheduler.io_wait between writes (e.g. the
  # proxy streamer waits on the upstream fd), so other fibers keep
  # running while this stream is in flight. Connection: close --
  # chunked keep-alive is legal but we keep it simple, matching
  # the prefork server.
  if res.streaming
    res.headers["Transfer-Encoding"] = "chunked"
    if !res.headers.key?("Content-Type")
      res.headers["Content-Type"] = "text/event-stream"
    end
    reason = Tep.reason(res.status)
    head = req.http_version + " " + res.status.to_s + " " + reason + "\r\n"
    res.headers.each do |k, v|
      head = head + k + ": " + v + "\r\n"
    end
    res.set_cookies.each do |line|
      head = head + "Set-Cookie: " + line + "\r\n"
    end
    head = head + "Connection: close\r\n\r\n"
    Sock.sphttp_write_str(client, head)
    out = Tep::Stream.new(client)
    res.streamer.pump(out)
    Sock.sphttp_write_chunk_end(client)
    return 0
  end

  # File validators for cache revalidation (#152): a size-mtime
  # ETag + Last-Modified, set before headers are serialized below.
  if res.file_path.length > 0
    fsz = Sock.sphttp_filesize(res.file_path)
    fmt = Sock.sphttp_file_mtime(res.file_path)
    if fsz >= 0 && fmt >= 0
      res.etag(fsz.to_s + "-" + fmt.to_s)
      res.last_modified(fmt)
    end
  end

  # Conditional GET (issue #152): 304 + no body when the request's
  # precondition matches the response's validator (ETag /
  # Last-Modified, whether set by the handler or for a file above).
  # For a file we also clear file_path so the sendfile branch below
  # is skipped and the empty 304 goes out the inline-body path.
  if Tep::Cache.not_modified?(req, res)
    res.set_status(304)
    res.set_body("")
    res.file_path = ""
  end

  # Default Content-Type for inline-body responses. Matches
  # Tep::Server#send; without it, the Security::Headers nosniff
  # default leaves the browser refusing to interpret an erb
  # response as HTML.
  if res.file_path.length == 0 && res.body.length > 0 && !res.headers.key?("Content-Type")
    res.headers["Content-Type"] = "text/html; charset=utf-8"
  end
  reason = Tep.reason(res.status)
  head = req.http_version + " " + res.status.to_s + " " + reason + "\r\n"
  res.headers.each do |k, v|
    head = head + k + ": " + v + "\r\n"
  end
  res.set_cookies.each do |line|
    head = head + "Set-Cookie: " + line + "\r\n"
  end
  if keep_alive
    head = head + "Connection: keep-alive\r\n"
  else
    head = head + "Connection: close\r\n"
  end
  if res.file_path.length > 0
    fs = Sock.sphttp_filesize(res.file_path)
    head = head + "Content-Length: " + fs.to_s + "\r\n\r\n"
    Sock.sphttp_write_str(client, head)
    Sock.sphttp_sendfile(client, res.file_path)
  else
    head = head + "Content-Length: " + res.body.length.to_s + "\r\n\r\n"
    Sock.sphttp_write_str(client, head)
    if res.body.length > 0
      Sock.sphttp_write_str(client, res.body)
    end
  end
  0
end

Instance Method Details

#run(port, workers, quiet) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/tep/server_scheduled.rb', line 43

def run(port, workers, quiet)
  sfd = Sock.sphttp_listen(port, workers > 1 ? 1 : 0)
  if sfd < 0
    return 1
  end
  Sock.sphttp_set_nonblock(sfd)

  # Install SIGTERM/SIGINT handlers BEFORE fork so children
  # inherit them; accept_loop checks the term flag once per
  # second and runs Tep.on_shutdown (run_end + future hooks).
  Sock.sphttp_install_term_handlers

  # Inbound TLS (tep#148 phase 2, scheduled variant): load the
  # server cert/key once before forking so every worker inherits
  # the SSL_CTX. A bad cert/key is fatal -- never silently serve
  # plaintext on a port the operator believes is TLS. The handshake
  # itself runs non-blocking per-connection (handle_connection).
  if Tep::APP.tls_cert.length > 0 && Tep::APP.tls_key.length > 0
    if Sock.sphttp_tls_server_init(Tep::APP.tls_cert, Tep::APP.tls_key) < 0
      return 1
    end
  end

  if workers > 1
    i = 0
    while i < workers
      pid = Sock.sphttp_fork
      if pid == 0
        Tep::Server::Scheduled.run_worker(sfd)
        Sock.sphttp_exit(0)
      end
      i += 1
    end
    # Reap children until none remain. After all workers exit,
    # emit the single aggregated run_end (see #128 / Tep::Events
    # #run_end_aggregated).
    loop do
      gone = Sock.sphttp_wait_any
      if gone < 0
        break
      end
    end
    if Sock.sphttp_shutdown_requested != 0
      Tep.on_shutdown
    end
  else
    Tep::Server::Scheduled.run_worker(sfd)
    # Single-process: this IS the parent; emit run_end here.
    if Sock.sphttp_shutdown_requested != 0
      Tep.on_shutdown
    end
  end
  0
end