Class: Tep::Server::Scheduled
- Inherits:
-
Object
- Object
- Tep::Server::Scheduled
- Defined in:
- lib/tep/server_scheduled.rb
Constant Summary collapse
- MAX_REQUEST_BYTES =
Max bytes accepted from a single request’s start-line + headers. Bigger requests get 413; matches the blocking server’s SPHTTP_BUFSIZE cap (64 KiB).
65535- KEEPALIVE_TIMEOUT =
Idle keep-alive timeout between requests on the same connection. 30s matches nginx; bump from app code as needed.
30- HEADER_READ_TIMEOUT =
Slow-headers DoS guard.
10
Instance Attribute Summary collapse
-
#app ⇒ Object
Returns the value of attribute app.
Class Method Summary collapse
-
.accept_loop(sfd) ⇒ Object
Accept loop.
-
.handle_connection(client) ⇒ Object
Per-connection lifecycle.
-
.read_request_blob(fd, timeout_seconds) ⇒ Object
Non-blocking request reader.
-
.run_worker(sfd) ⇒ Object
Spawn the accept fiber + pump the scheduler.
- .send_simple(client, status, msg) ⇒ Object
-
.tls_handshake(client) ⇒ Object
Non-blocking server-side TLS handshake on an accepted fd.
-
.write_response(client, req, res, keep_alive) ⇒ Object
Body-shape mirror of Tep::Server#write_response.
Instance Method Summary collapse
-
#initialize(app) ⇒ Scheduled
constructor
A new instance of Scheduled.
- #run(port, workers, quiet) ⇒ Object
Constructor Details
#initialize(app) ⇒ Scheduled
Returns a new instance of Scheduled.
39 40 41 |
# File 'lib/tep/server_scheduled.rb', line 39 def initialize(app) @app = app end |
Instance Attribute Details
#app ⇒ Object
Returns the value of attribute app.
37 38 39 |
# File 'lib/tep/server_scheduled.rb', line 37 def app @app end |
Class Method Details
.accept_loop(sfd) ⇒ Object
Accept loop. Each accepted connection becomes its own fiber that closes over the just-accepted ‘client` fd.
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/tep/server_scheduled.rb', line 115 def self.accept_loop(sfd) while true # SIGTERM/SIGINT: sphttp's term flag is set by the signal # handler; check before parking on io_wait so we don't sleep # past a shutdown request. The 1s io_wait timeout below # bounds the sleep-side latency. The parent (or this same # process for workers=1) emits the aggregated run_end after # all workers exit (#128). if Sock.sphttp_shutdown_requested != 0 break end # Bounded wait so the flag check above runs once per second # even when traffic is idle (was -1 = wait forever). ready = Tep::Scheduler.io_wait(sfd, Tep::Scheduler::READ, 1) if ready == 0 next end client = Sock.sphttp_accept_nb(sfd) if client < 0 next end Sock.sphttp_set_nonblock(client) conn = Fiber.new { Tep::Server::Scheduled.handle_connection(client) } Tep::Scheduler.spawn_fiber(conn) end end |
.handle_connection(client) ⇒ Object
Per-connection lifecycle.
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/tep/server_scheduled.rb', line 177 def self.handle_connection(client) # Inbound TLS: complete a non-blocking server handshake before # reading anything. Runs inside this per-connection fiber so a # slow handshake parks cooperatively instead of blocking the # accept loop. On failure (incl. a plaintext client hitting the # TLS port) drop the connection. if Tep::APP.tls_cert.length > 0 if Tep::Server::Scheduled.tls_handshake(client) == 0 Sock.sphttp_close(client) return 0 end end keep_going = true while keep_going blob = Tep::Server::Scheduled.read_request_blob(client, KEEPALIVE_TIMEOUT) if blob.length == 0 break end req = Parser.parse(blob) if req == nil Tep::Server::Scheduled.send_simple(client, 400, "bad request") break end req.consume_body_via_scheduler(client) res = Response.new Tep::APP.dispatch(req, res) # Streaming responses use chunked Connection: close (same # simplification as the prefork server) -- force the # keep-alive loop to end after this response so the stream's # terminator isn't followed by a stale read on the same fd. keep_alive = req.keep_alive? && !res.halted_close? && !res.streaming Tep::Server::Scheduled.write_response(client, req, res, keep_alive) keep_going = keep_alive end Sock.sphttp_close(client) 0 end |
.read_request_blob(fd, timeout_seconds) ⇒ Object
Non-blocking request reader. Returns the accumulated blob once “rnrn” is seen, or “” on timeout / EOF / oversize.
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 |
# File 'lib/tep/server_scheduled.rb', line 220 def self.read_request_blob(fd, timeout_seconds) buf = "" deadline = Time.now.to_i + timeout_seconds while buf.length < MAX_REQUEST_BYTES remaining = deadline - Time.now.to_i if remaining <= 0 return "" end ready = Tep::Scheduler.io_wait(fd, Tep::Scheduler::READ, remaining) if ready == 0 return "" end chunk = Sock.sphttp_recv_some(fd, 4096) if chunk.length == 0 # Over TLS an empty read can be a partial record (SSL_read # WANT_READ/WANT_WRITE), not EOF -- re-park on the indicated # direction and retry rather than dropping the request. The # loop top re-applies the deadline on the want-read path. st = Sock.sphttp_io_status if st == 1 next end if st == 2 Tep::Scheduler.io_wait(fd, Tep::Scheduler::WRITE, remaining) next end return "" end buf = buf + chunk if buf.length >= 4 && buf.include?("\r\n\r\n") return buf end end "" end |
.run_worker(sfd) ⇒ Object
Spawn the accept fiber + pump the scheduler. Called inside each prefork child. Loops directly on ‘tick` rather than `run_until_empty` because the accept fiber parks on io_wait indefinitely – run_until_empty bails when no fiber is ready to resume THIS pass; we need to keep polling so parked accept-on-sfd fibers get woken when a connection arrives.
104 105 106 107 108 109 110 111 |
# File 'lib/tep/server_scheduled.rb', line 104 def self.run_worker(sfd) f = Fiber.new { Tep::Server::Scheduled.accept_loop(sfd) } Tep::Scheduler.spawn_fiber(f) while Tep::Scheduler.alive_count > 0 Tep::Scheduler.tick(1000) end 0 end |
.send_simple(client, status, msg) ⇒ Object
365 366 367 368 369 370 371 372 |
# File 'lib/tep/server_scheduled.rb', line 365 def self.send_simple(client, status, msg) reason = Tep.reason(status) head = "HTTP/1.0 " + status.to_s + " " + reason + "\r\n" + "Content-Length: " + msg.length.to_s + "\r\n" + "Connection: close\r\n\r\n" + msg Sock.sphttp_write_str(client, head) 0 end |
.tls_handshake(client) ⇒ Object
Non-blocking server-side TLS handshake on an accepted fd. Returns 1 on success (SSL* registered – reads/writes are now transparent), 0 on failure. Drives SSL_do_handshake, parking on io_wait for the direction OpenSSL asks for, bounded by HEADER_READ_TIMEOUT so a connection that opens but never completes the handshake (port probe, slowloris, plain-HTTP client) can’t pin the fiber.
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/tep/server_scheduled.rb', line 149 def self.tls_handshake(client) if Sock.sphttp_tls_accept_start(client) < 0 return 0 end deadline = Time.now.to_i + HEADER_READ_TIMEOUT hs = Sock.sphttp_tls_handshake_step(client) while hs == 1 || hs == 2 remaining = deadline - Time.now.to_i if remaining <= 0 return 0 end mode = Tep::Scheduler::READ if hs == 2 mode = Tep::Scheduler::WRITE end ready = Tep::Scheduler.io_wait(client, mode, remaining) if ready == 0 return 0 end hs = Sock.sphttp_tls_handshake_step(client) end if hs < 0 return 0 end 1 end |
.write_response(client, req, res, keep_alive) ⇒ Object
Body-shape mirror of Tep::Server#write_response. Lifted into a cmeth so the connection fiber can call it without a captured ‘self`.
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 |
# File 'lib/tep/server_scheduled.rb', line 259 def self.write_response(client, req, res, keep_alive) # WebSocket upgrade branch. Set by res.start_websocket in the # user's handler after a successful Handshake.check. Writes # the 101 Switching Protocols head, then assigns the client # fd onto the driver and runs the recv loop. The recv loop # returns when the connection closes (peer EOF, idle timeout, # or a CLOSE frame round-trip). After return, the caller's # handle_connection closes the fd as usual. if res.upgrading_ws head = Tep::WebSocket::Handshake.build_response( res.ws_accept_key, res.ws_driver.subprotocol) Sock.sphttp_write_str(client, head) res.ws_driver.set_fd(client) conn = Tep::WebSocket::Connection.new(res.ws_driver) conn.run return 0 end # Streaming branch -- cooperative mirror of Tep::Server's # streaming path (server.rb). Set by res.start_stream(streamer) # in the handler. Writes a chunked-encoding head immediately, # hands a Tep::Stream writer to the user's Streamer#pump, then # emits the end-of-stream terminator. pump runs cooperatively: # it parks on Tep::Scheduler.io_wait between writes (e.g. the # proxy streamer waits on the upstream fd), so other fibers keep # running while this stream is in flight. Connection: close -- # chunked keep-alive is legal but we keep it simple, matching # the prefork server. if res.streaming res.headers["Transfer-Encoding"] = "chunked" if !res.headers.key?("Content-Type") res.headers["Content-Type"] = "text/event-stream" end reason = Tep.reason(res.status) head = req.http_version + " " + res.status.to_s + " " + reason + "\r\n" res.headers.each do |k, v| head = head + k + ": " + v + "\r\n" end res..each do |line| head = head + "Set-Cookie: " + line + "\r\n" end head = head + "Connection: close\r\n\r\n" Sock.sphttp_write_str(client, head) out = Tep::Stream.new(client) res.streamer.pump(out) Sock.sphttp_write_chunk_end(client) return 0 end # File validators for cache revalidation (#152): a size-mtime # ETag + Last-Modified, set before headers are serialized below. if res.file_path.length > 0 fsz = Sock.sphttp_filesize(res.file_path) fmt = Sock.sphttp_file_mtime(res.file_path) if fsz >= 0 && fmt >= 0 res.etag(fsz.to_s + "-" + fmt.to_s) res.last_modified(fmt) end end # Conditional GET (issue #152): 304 + no body when the request's # precondition matches the response's validator (ETag / # Last-Modified, whether set by the handler or for a file above). # For a file we also clear file_path so the sendfile branch below # is skipped and the empty 304 goes out the inline-body path. if Tep::Cache.not_modified?(req, res) res.set_status(304) res.set_body("") res.file_path = "" end # Default Content-Type for inline-body responses. Matches # Tep::Server#send; without it, the Security::Headers nosniff # default leaves the browser refusing to interpret an erb # response as HTML. if res.file_path.length == 0 && res.body.length > 0 && !res.headers.key?("Content-Type") res.headers["Content-Type"] = "text/html; charset=utf-8" end reason = Tep.reason(res.status) head = req.http_version + " " + res.status.to_s + " " + reason + "\r\n" res.headers.each do |k, v| head = head + k + ": " + v + "\r\n" end res..each do |line| head = head + "Set-Cookie: " + line + "\r\n" end if keep_alive head = head + "Connection: keep-alive\r\n" else head = head + "Connection: close\r\n" end if res.file_path.length > 0 fs = Sock.sphttp_filesize(res.file_path) head = head + "Content-Length: " + fs.to_s + "\r\n\r\n" Sock.sphttp_write_str(client, head) Sock.sphttp_sendfile(client, res.file_path) else head = head + "Content-Length: " + res.body.length.to_s + "\r\n\r\n" Sock.sphttp_write_str(client, head) if res.body.length > 0 Sock.sphttp_write_str(client, res.body) end end 0 end |
Instance Method Details
#run(port, workers, quiet) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/tep/server_scheduled.rb', line 43 def run(port, workers, quiet) sfd = Sock.sphttp_listen(port, workers > 1 ? 1 : 0) if sfd < 0 return 1 end Sock.sphttp_set_nonblock(sfd) # Install SIGTERM/SIGINT handlers BEFORE fork so children # inherit them; accept_loop checks the term flag once per # second and runs Tep.on_shutdown (run_end + future hooks). Sock.sphttp_install_term_handlers # Inbound TLS (tep#148 phase 2, scheduled variant): load the # server cert/key once before forking so every worker inherits # the SSL_CTX. A bad cert/key is fatal -- never silently serve # plaintext on a port the operator believes is TLS. The handshake # itself runs non-blocking per-connection (handle_connection). if Tep::APP.tls_cert.length > 0 && Tep::APP.tls_key.length > 0 if Sock.sphttp_tls_server_init(Tep::APP.tls_cert, Tep::APP.tls_key) < 0 return 1 end end if workers > 1 i = 0 while i < workers pid = Sock.sphttp_fork if pid == 0 Tep::Server::Scheduled.run_worker(sfd) Sock.sphttp_exit(0) end i += 1 end # Reap children until none remain. After all workers exit, # emit the single aggregated run_end (see #128 / Tep::Events # #run_end_aggregated). loop do gone = Sock.sphttp_wait_any if gone < 0 break end end if Sock.sphttp_shutdown_requested != 0 Tep.on_shutdown end else Tep::Server::Scheduled.run_worker(sfd) # Single-process: this IS the parent; emit run_end here. if Sock.sphttp_shutdown_requested != 0 Tep.on_shutdown end end 0 end |