Class: Tep::Http
- Inherits:
-
Object
- Object
- Tep::Http
- Defined in:
- lib/tep/http.rb
Defined Under Namespace
Classes: FramedResp, Pool, Response
Constant Summary collapse
- COOP_RECV_TIMEOUT =
Per-recv timeout in the cooperative path. Bounds how long a parked fiber will wait for the next chunk from the peer before giving up and returning status=0. 30s matches the scheduled server’s KEEPALIVE_TIMEOUT; loud failure beats a wedged fiber.
30- COOP_RESPONSE_MAX =
Hard cap on total response bytes accumulated by the cooperative path. Mirrors sphttp_recv_all’s static-buffer cap (~64 KiB) so the two paths impose the same upper bound. Bigger responses need streaming, which v1 doesn’t ship.
65535- POOL_RECV_TIMEOUT_MS =
Recv timeout (ms) on pooled keep-alive sockets. Bounds a response read so a no-Content-Length / chunked keep-alive upstream can’t hang the worker waiting for an EOF that never comes (the recv returns and we bail with what we have, un-pooled). 30s matches COOP_RECV_TIMEOUT.
30000
Instance Attribute Summary collapse
-
#base_url ⇒ Object
Returns the value of attribute base_url.
-
#default_headers ⇒ Object
Returns the value of attribute default_headers.
Class Method Summary collapse
- .delete(url) ⇒ Object
- .empty_headers ⇒ Object
-
.get(url) ⇒ Object
Class-level one-shots.
- .head(url) ⇒ Object
-
.parse_response(raw) ⇒ Object
Parse a raw HTTP/1.0 or 1.1 response.
- .patch(url, body) ⇒ Object
- .post(url, body) ⇒ Object
- .put(url, body) ⇒ Object
-
.recv_framed(fd) ⇒ Object
Read a full HTTP response, framing the body by Content-Length when present so a kept-alive socket stops at the message boundary and stays reusable.
-
.send_req(verb, url, body, headers) ⇒ Object
The workhorse.
- .send_req_blocking(verb, url, body, headers) ⇒ Object
-
.send_req_coop(verb, url, body, headers) ⇒ Object
Cooperative variant.
Instance Method Summary collapse
- #do_delete(path) ⇒ Object
-
#do_get(path) ⇒ Object
Instance verbs.
- #do_head(path) ⇒ Object
- #do_patch(path, body) ⇒ Object
- #do_post(path, body) ⇒ Object
- #do_put(path, body) ⇒ Object
- #do_req(path, verb, body) ⇒ Object
-
#initialize(base_url) ⇒ Http
constructor
A new instance of Http.
- #set_header(k, v) ⇒ Object
Constructor Details
Instance Attribute Details
#base_url ⇒ Object
Returns the value of attribute base_url.
59 60 61 |
# File 'lib/tep/http.rb', line 59 def base_url @base_url end |
#default_headers ⇒ Object
Returns the value of attribute default_headers.
59 60 61 |
# File 'lib/tep/http.rb', line 59 def default_headers @default_headers end |
Class Method Details
.delete(url) ⇒ Object
94 |
# File 'lib/tep/http.rb', line 94 def self.delete(url); Http.send_req("DELETE", url, "", Http.empty_headers); end |
.empty_headers ⇒ Object
99 100 101 |
# File 'lib/tep/http.rb', line 99 def self.empty_headers Tep.str_hash end |
.get(url) ⇒ Object
Class-level one-shots. Build a default empty headers hash and dispatch through send_req.
92 |
# File 'lib/tep/http.rb', line 92 def self.get(url); Http.send_req("GET", url, "", Http.empty_headers); end |
.head(url) ⇒ Object
93 |
# File 'lib/tep/http.rb', line 93 def self.head(url); Http.send_req("HEAD", url, "", Http.empty_headers); end |
.parse_response(raw) ⇒ Object
Parse a raw HTTP/1.0 or 1.1 response. Status line + headers (terminated by rnrn) + body. Header names are downcased so callers don’t have to worry about case. Allocates and returns a fresh Response (rather than mutating one passed in by reference – the mutation pattern widens the param to poly in spinel’s analyzer).
397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 |
# File 'lib/tep/http.rb', line 397 def self.parse_response(raw) out = Tep::Http::Response.new if raw.length < 12 return out end # Status line: "HTTP/1.x SSS reason\r\n" eol = Tep.str_find(raw, "\r\n", 0) if eol < 0 return out end line = raw[0, eol] sp1 = Tep.str_find(line, " ", 0) if sp1 < 0 return out end rest = line[sp1 + 1, line.length - sp1 - 1] sp2 = Tep.str_find(rest, " ", 0) code_str = "" if sp2 >= 0 code_str = rest[0, sp2] else code_str = rest end out.status = code_str.to_i # Walk header lines until empty line. pos = eol + 2 while pos < raw.length next_eol = Tep.str_find(raw, "\r\n", pos) if next_eol < 0 return out end if next_eol == pos # blank line -- body starts at pos+2 body_start = pos + 2 if body_start < raw.length out.body = raw[body_start, raw.length - body_start] end return out end line2 = raw[pos, next_eol - pos] ci = Tep.str_find(line2, ":", 0) if ci > 0 k = line2[0, ci].downcase # Skip leading space after the colon. v_start = ci + 1 if v_start < line2.length && line2[v_start] == " " v_start += 1 end v = line2[v_start, line2.length - v_start] out.headers[k] = v end pos = next_eol + 2 end out end |
.patch(url, body) ⇒ Object
97 |
# File 'lib/tep/http.rb', line 97 def self.patch(url, body); Http.send_req("PATCH", url, body, Http.empty_headers); end |
.post(url, body) ⇒ Object
95 |
# File 'lib/tep/http.rb', line 95 def self.post(url, body); Http.send_req("POST", url, body, Http.empty_headers); end |
.put(url, body) ⇒ Object
96 |
# File 'lib/tep/http.rb', line 96 def self.put(url, body); Http.send_req("PUT", url, body, Http.empty_headers); end |
.recv_framed(fd) ⇒ Object
Read a full HTTP response, framing the body by Content-Length when present so a kept-alive socket stops at the message boundary and stays reusable. Without Content-Length we read until EOF or the recv timeout (socket not reusable). Returns a FramedResp. Bounded at 4 MiB (matches sphttp’s SPHTTP_RESP_MAX) so a runaway upstream can’t grow buf unboundedly.
460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 |
# File 'lib/tep/http.rb', line 460 def self.recv_framed(fd) out = Tep::Http::FramedResp.new buf = "" hdr_end = -1 clen = -1 conn_close = false while buf.length < 4194304 if hdr_end < 0 idx = Tep.str_find(buf, "\r\n\r\n", 0) if idx >= 0 hdr_end = idx + 4 # Header-block scanning is inlined (not extracted to helper # methods): spinel types a param by name file-wide, so a # String param that isn't forced String at the boundary # defaults to mrb_int and the call mismatches. Operating on # `buf` slices here keeps everything unambiguously String. lowh = buf[0, hdr_end].downcase # Content-Length (to_i tolerates the leading space, stops at CR). ci = Tep.str_find(lowh, "content-length:", 0) if ci >= 0 crest = lowh[ci + 15, lowh.length] ceol = Tep.str_find(crest, "\r\n", 0) cline = crest if ceol >= 0 cline = crest[0, ceol] end clen = cline.to_i end # Connection: close ki = Tep.str_find(lowh, "connection:", 0) if ki >= 0 krest = lowh[ki + 11, lowh.length] keol = Tep.str_find(krest, "\r\n", 0) kline = krest if keol >= 0 kline = krest[0, keol] end if Tep.str_find(kline, "close", 0) >= 0 conn_close = true end end end end if hdr_end >= 0 && clen >= 0 && (buf.length - hdr_end) >= clen break end chunk = Sock.sphttp_recv_some(fd, 65536) if chunk.length == 0 break end buf = buf + chunk end framed = false if hdr_end >= 0 && clen >= 0 && (buf.length - hdr_end) == clen framed = true end out.raw = buf out.framed_clean = framed out.conn_close = conn_close out end |
.send_req(verb, url, body, headers) ⇒ Object
The workhorse. Returns a Tep::Http::Response in all cases – on connect or send failure, ‘.status` is 0 and `.body` is “”.
When called from inside a Tep::Scheduler fiber (i.e. running under Tep::Server::Scheduled), routes through ‘send_req_coop`, which parks on `Tep::Scheduler.io_wait` between recv calls so the worker fiber doesn’t hog the scheduler while waiting for peer bytes. Outside scheduler context the call falls through to ‘send_req_blocking`, which is the original sphttp_recv_all path.
Why split rather than always-async: outside a scheduled context (the default Tep::Server prefork model, scripts, REPL), io_wait falls back to a single-shot poll per call which would add an extra poll(2) round per chunk for no benefit. Keeping the blocking path keeps the cheap case cheap.
137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/tep/http.rb', line 137 def self.send_req(verb, url, body, headers) # Under Tep::Server::Scheduled, route BOTH http and https through # the cooperative path. Plaintext parks on io_wait between recvs; # https (tep#150) additionally drives a non-blocking TLS handshake # (sphttp_tls_connect_start + handshake_step) and a want-aware # SSL_read loop, so an outbound HTTPS call no longer blocks the # whole worker. Outside a scheduled context the blocking path stays # cheap (no extra poll(2) per chunk). if Tep::Scheduler.scheduled_context? Http.send_req_coop(verb, url, body, headers) else Http.send_req_blocking(verb, url, body, headers) end end |
.send_req_blocking(verb, url, body, headers) ⇒ Object
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 |
# File 'lib/tep/http.rb', line 152 def self.send_req_blocking(verb, url, body, headers) out = Tep::Http::Response.new parts = Tep::Url.split_url(url) scheme = parts["scheme"] if scheme != "http" && scheme != "https" # Unknown scheme. return out end host = parts["host"] port = parts["port"].to_i path = parts["path"] if parts["query"].length > 0 path = path + "?" + parts["query"] end # HTTPS: no pooling. The fd carries an SSL* in sphttp's registry; # pooling TLS sockets is out of scope for 6.7b (#126). HTTP/1.0 + # Connection: close + recv-until-EOF over a fresh verified socket. if scheme == "https" fd = Sock.sphttp_connect_tls(host, port) # port 443 via Tep::Url if fd < 0 return out end # Head inlined (not a helper): spinel picks one type per param # name file-wide, and path/host collide with int uses elsewhere. head = verb + " " + path + " HTTP/1.0\r\n" + "Host: " + host + "\r\n" + "Connection: close\r\n" headers.each do |k, v| head = head + k + ": " + v + "\r\n" end if body.length > 0 head = head + "Content-Length: " + body.length.to_s + "\r\n" end head = head + "\r\n" if Sock.sphttp_write_str(fd, head) < 0 Sock.sphttp_close(fd) return out end if body.length > 0 if Sock.sphttp_write_str(fd, body) < 0 Sock.sphttp_close(fd) return out end end raw = Sock.sphttp_recv_all(fd, 0) Sock.sphttp_close(fd) return Http.parse_response(raw) end # HTTP: HTTP/1.1 keep-alive over a pooled (reused) socket (6.7b). # Claim an idle fd for (host, port) or connect fresh; frame the # response by Content-Length; reuse the socket (return it to the # pool) only when it's cleanly framed, the peer didn't ask to # close, and the status isn't a retry-worthy 5xx. A pool HIT that # fails (stale socket the upstream already closed) is retried once # on a fresh connection. attempt = 0 while attempt < 2 from_pool = 0 fd = Tep::Http::Pool.claim(host, port) if fd >= 0 from_pool = 1 else fd = Sock.sphttp_connect(host, port) end if fd < 0 return out end Sock.sphttp_set_recv_timeout(fd, Http::POOL_RECV_TIMEOUT_MS) head = verb + " " + path + " HTTP/1.1\r\n" + "Host: " + host + "\r\n" + "Connection: keep-alive\r\n" headers.each do |k, v| head = head + k + ": " + v + "\r\n" end if body.length > 0 head = head + "Content-Length: " + body.length.to_s + "\r\n" end head = head + "\r\n" wrote = Sock.sphttp_write_str(fd, head) if wrote >= 0 && body.length > 0 wrote = Sock.sphttp_write_str(fd, body) end if wrote < 0 Sock.sphttp_close(fd) if from_pool == 0 return out end attempt = attempt + 1 # stale pooled socket -- retry fresh else fr = Http.recv_framed(fd) if fr.raw.length == 0 && from_pool == 1 Sock.sphttp_close(fd) attempt = attempt + 1 # stale pooled socket gave nothing -- retry fresh else resp = Http.parse_response(fr.raw) reuse = fr.framed_clean && !fr.conn_close && resp.status > 0 && resp.status < 500 if reuse Tep::Http::Pool.release(fd, host, port) else Sock.sphttp_close(fd) end return resp end end end return out end |
.send_req_coop(verb, url, body, headers) ⇒ Object
Cooperative variant. Same wire shape, same parse, but:
* flips the fd to non-blocking after connect, and
* replaces the synchronous sphttp_recv_all with a parked
io_wait(READ) + sphttp_recv_some loop that yields the
worker fiber back to the scheduler between recvs.
This is what closes the macOS self-call deadlock: while the outer handler fiber is parked here, the worker’s accept fiber can run, accept the inner request, dispatch its handler, and write the response – which unblocks our io_wait. See docs/MACOS-CONCURRENCY.md for the why.
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 |
# File 'lib/tep/http.rb', line 276 def self.send_req_coop(verb, url, body, headers) out = Tep::Http::Response.new parts = Tep::Url.split_url(url) scheme = parts["scheme"] if scheme != "http" && scheme != "https" return out end host = parts["host"] port = parts["port"].to_i path = parts["path"] if parts["query"].length > 0 path = path + "?" + parts["query"] end # Connect. https (tep#150): non-blocking TLS -- set up the SSL then # drive SSL_do_handshake, parking on io_wait for the direction # OpenSSL asks for until the handshake completes. http: a plain # non-blocking connect. Either way `fd` ends up non-blocking with # write/recv routed appropriately (SSL_* for https via the registry). if scheme == "https" fd = Sock.sphttp_tls_connect_start(host, port) if fd < 0 return out end hs = Sock.sphttp_tls_handshake_step(fd) while hs == 1 || hs == 2 mode = Tep::Scheduler::READ if hs == 2 mode = Tep::Scheduler::WRITE end ready = Tep::Scheduler.io_wait(fd, mode, COOP_RECV_TIMEOUT) if ready == 0 Sock.sphttp_close(fd) return out end hs = Sock.sphttp_tls_handshake_step(fd) end if hs < 0 # Handshake/verify failure -- handshake_step already freed the # SSL*; close the fd. (cert/hostname mismatch lands here.) Sock.sphttp_close(fd) return out end else fd = Sock.sphttp_connect(host, port) if fd < 0 return out end Sock.sphttp_set_nonblock(fd) end # Same head shape as send_req_blocking; inlined for the same # spinel-type-inference reason (see that path's comment). head = verb + " " + path + " HTTP/1.0\r\n" + "Host: " + host + "\r\n" + "Connection: close\r\n" headers.each do |k, v| head = head + k + ": " + v + "\r\n" end if body.length > 0 head = head + "Content-Length: " + body.length.to_s + "\r\n" end head = head + "\r\n" # send(2) on a non-blocking localhost socket with a small # request (start line + few headers, well under the kernel's # ~16 KiB socket buffer) returns immediately. If it ever # surfaces EAGAIN we'll need a write-side park; for v1 the # bounded request size makes that path dead code. if Sock.sphttp_write_str(fd, head) < 0 Sock.sphttp_close(fd) return out end if body.length > 0 if Sock.sphttp_write_str(fd, body) < 0 Sock.sphttp_close(fd) return out end end raw = "" while raw.length < COOP_RESPONSE_MAX ready = Tep::Scheduler.io_wait(fd, Tep::Scheduler::READ, COOP_RECV_TIMEOUT) if ready == 0 # Timeout -- bail with whatever we have so far. An # incomplete response will surface as status=0 from # parse_response if the status line never arrived. break end chunk = Sock.sphttp_recv_some(fd, 4096) if chunk.length == 0 # An empty read is ambiguous: for TLS it may mean "no full # record decoded yet" (SSL_read WANT_READ/WANT_WRITE), not EOF. # Consult the want-status: 1 = want-read (re-park on READ at the # loop top), 2 = want-write (TLS renegotiation -- park on WRITE # then retry), anything else (3 EOF / -1 error, or a plaintext # peer close) is the end of the HTTP/1.0 + Connection: close # response. st = Sock.sphttp_io_status if st == 1 next end if st == 2 Tep::Scheduler.io_wait(fd, Tep::Scheduler::WRITE, COOP_RECV_TIMEOUT) next end break end raw = raw + chunk end Sock.sphttp_close(fd) Http.parse_response(raw) end |
Instance Method Details
#do_delete(path) ⇒ Object
77 |
# File 'lib/tep/http.rb', line 77 def do_delete(path); do_req(path, "DELETE", ""); end |
#do_get(path) ⇒ Object
Instance verbs. ‘path` is appended to `base_url` if it starts with “/”, or used as-is if it’s a full URL. Prefixed with ‘do_` to avoid the cmeth / imeth ambiguity at the call site: `http.get(path)` reads like a Sinatra route in apps, whereas `http.do_get(path)` does not.
75 |
# File 'lib/tep/http.rb', line 75 def do_get(path); do_req(path, "GET", ""); end |
#do_head(path) ⇒ Object
76 |
# File 'lib/tep/http.rb', line 76 def do_head(path); do_req(path, "HEAD", ""); end |
#do_patch(path, body) ⇒ Object
80 |
# File 'lib/tep/http.rb', line 80 def do_patch(path, body); do_req(path, "PATCH", body); end |
#do_post(path, body) ⇒ Object
78 |
# File 'lib/tep/http.rb', line 78 def do_post(path, body); do_req(path, "POST", body); end |
#do_put(path, body) ⇒ Object
79 |
# File 'lib/tep/http.rb', line 79 def do_put(path, body); do_req(path, "PUT", body); end |
#do_req(path, verb, body) ⇒ Object
82 83 84 85 86 87 88 |
# File 'lib/tep/http.rb', line 82 def do_req(path, verb, body) url = path if path.length > 0 && path[0] == "/" url = @base_url + path end Http.send_req(verb, url, body, @default_headers) end |
#set_header(k, v) ⇒ Object
66 67 68 |
# File 'lib/tep/http.rb', line 66 def set_header(k, v) @default_headers[k] = v end |