Class: Tep::Proxy
Overview
Retry behaviour for the buffered forward path (chunk 6.5). Returned by Tep::Proxy#retry_policy(req); fresh instance per request so the policy can be derived from the request (e.g. idempotent verbs get more attempts, POSTs none).
Backoff is integer-MILLISECONDS via Sock.sphttp_sleep_ms (a nanosleep-backed C helper). Sub-second pacing is the right default for HTTP retries – whole-second backoffs throw away throughput on transient blips that resolve quickly. Two setters for the base backoff:
* base_backoff_ms = 100 # int, direct ms.
* base_backoff_secs = 0.1 # Float, converted to ms.
Set whichever reads better at the call site; both feed the same ms-int through backoff_for. If both are set, the LAST write wins (whichever setter you called second).
Default shape: max_attempts=1 (no retry, back-compat).
Defined Under Namespace
Classes: ProxyStreamer, RetryPolicy, StreamChunk, StreamStats, UpstreamHead, UpstreamRequest
Instance Attribute Summary collapse
-
#max_request_body_bytes ⇒ Object
Body size caps (chunk 6.6).
-
#max_response_body_bytes ⇒ Object
Body size caps (chunk 6.6).
-
#timeout ⇒ Object
Returns the value of attribute timeout.
-
#upstream ⇒ Object
Returns the value of attribute upstream.
Class Method Summary collapse
-
.hop_by_hop?(name) ⇒ Boolean
RFC 7230 §6.1 hop-by-hop headers: meaningful only for a single transport-level connection, never forwarded by a proxy.
-
.read_upstream_head(fd) ⇒ Object
Read an upstream response head (status line + headers up to the blank line) cooperatively.
Instance Method Summary collapse
-
#after_forward(req, ures, res) ⇒ Object
Runs after the upstream responds, before ‘res` is written to the client.
-
#before_forward(req, res, ureq) ⇒ Object
Runs after the request body is fully received, before forwarding.
-
#dispatch_one(out, stats, relay_buf) ⇒ Object
Count one unit + dispatch it to on_stream_chunk via implicit self (polymorphic – reaches subclass overrides).
-
#drain_events(out, stats, body_buf) ⇒ Object
Split body_buf into complete “nn”-terminated SSE event records and dispatch each (the record includes the trailing blank line, per the doc’s “data: …nn” contract).
-
#handle(req, res) ⇒ Object
—- Tep::Handler interface —-.
-
#initialize(upstream) ⇒ Proxy
constructor
A new instance of Proxy.
-
#on_stream_chunk(chunk, out, stats) ⇒ Object
Per-chunk streaming hook (chunk 6.2).
-
#on_stream_end(req, out, stats) ⇒ Object
End-of-stream finalizer (chunk 6.2, #81).
-
#pick_upstream(req) ⇒ Object
Per-request upstream selection (chunk 6.4).
-
#retry_policy(req) ⇒ Object
Per-request retry policy (chunk 6.5).
-
#rewrite_path(path) ⇒ Object
Map the inbound request’s path+query to the upstream path+query.
-
#run_stream(out, fd, leftover, is_chunked, is_sse, req) ⇒ Object
The streaming pump, called from ProxyStreamer#pump as @proxy.run_stream(…).
-
#start_streaming_forward(req, res, ureq) ⇒ Object
Streaming forward (chunk 6.2).
-
#stream_request?(req) ⇒ Boolean
Streaming opt-in predicate.
Methods inherited from Handler
#is_regex?, #re_capture, #re_match?
Constructor Details
#initialize(upstream) ⇒ Proxy
Returns a new instance of Proxy.
142 143 144 145 146 147 |
# File 'lib/tep/proxy.rb', line 142 def initialize(upstream) @upstream = upstream @timeout = 30 @max_request_body_bytes = 1 * 1024 * 1024 @max_response_body_bytes = 8 * 1024 * 1024 end |
Instance Attribute Details
#max_request_body_bytes ⇒ Object
Body size caps (chunk 6.6). max_request_body_bytes bounds the inbound body the proxy will accept (over -> 413 Payload Too Large before any upstream call). max_response_body_bytes bounds the upstream response body the proxy will forward (over -> 502 with a proxy_error JSON). Defaults: 1 MiB request / 8 MiB response – enough for typical JSON-API gateway use, small enough that a malicious / malfunctioning peer can’t easily OOM the worker. Override in initialize() (or expose a block-DSL setter) for larger / smaller caps per deployment. Set either to 0 to disable that cap (not recommended).
140 141 142 |
# File 'lib/tep/proxy.rb', line 140 def max_request_body_bytes @max_request_body_bytes end |
#max_response_body_bytes ⇒ Object
Body size caps (chunk 6.6). max_request_body_bytes bounds the inbound body the proxy will accept (over -> 413 Payload Too Large before any upstream call). max_response_body_bytes bounds the upstream response body the proxy will forward (over -> 502 with a proxy_error JSON). Defaults: 1 MiB request / 8 MiB response – enough for typical JSON-API gateway use, small enough that a malicious / malfunctioning peer can’t easily OOM the worker. Override in initialize() (or expose a block-DSL setter) for larger / smaller caps per deployment. Set either to 0 to disable that cap (not recommended).
140 141 142 |
# File 'lib/tep/proxy.rb', line 140 def max_response_body_bytes @max_response_body_bytes end |
#timeout ⇒ Object
Returns the value of attribute timeout.
129 130 131 |
# File 'lib/tep/proxy.rb', line 129 def timeout @timeout end |
#upstream ⇒ Object
Returns the value of attribute upstream.
129 130 131 |
# File 'lib/tep/proxy.rb', line 129 def upstream @upstream end |
Class Method Details
.hop_by_hop?(name) ⇒ Boolean
RFC 7230 §6.1 hop-by-hop headers: meaningful only for a single transport-level connection, never forwarded by a proxy. Lower- cased compare since both inbound and upstream header names are downcased by tep’s parsers.
633 634 635 636 637 638 639 640 641 642 643 |
# File 'lib/tep/proxy.rb', line 633 def self.hop_by_hop?(name) lc = name.downcase lc == "connection" || lc == "keep-alive" || lc == "transfer-encoding" || lc == "upgrade" || lc == "proxy-authorization" || lc == "proxy-authenticate" || lc == "te" || lc == "trailer" end |
.read_upstream_head(fd) ⇒ Object
Read an upstream response head (status line + headers up to the blank line) cooperatively. Returns a Tep::Proxy::UpstreamHead carrying the parsed status, the per-name header bag, the chunked / SSE flags, the body bytes already read past the head (leftover – handed to the streamer so no bytes are lost), and an ok flag (false on timeout / EOF before the head completed).
601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 |
# File 'lib/tep/proxy.rb', line 601 def self.read_upstream_head(fd) out = Tep::Proxy::UpstreamHead.new buf = "" while true ready = Tep::Scheduler.io_wait(fd, Tep::Scheduler::READ, 60) if ready == 0 return out # timeout -- ok stays false end chunk = Sock.sphttp_recv_some(fd, 4096) if chunk.length == 0 return out # EOF before head completed end buf = buf + chunk eoh = Tep.str_find(buf, "\r\n\r\n", 0) if eoh >= 0 header_blob = buf[0, eoh] out.leftover = buf[eoh + 4, buf.length - eoh - 4] out.fill_from(header_blob) out.ok = true return out end if buf.length > 65535 return out # head too large -- bail end end out end |
Instance Method Details
#after_forward(req, ures, res) ⇒ Object
Runs after the upstream responds, before ‘res` is written to the client. `ures` is the Tep::Http::Response from upstream (status 0 + empty body on connect failure; an empty Response when a before_forward short-circuited). `res` is mutable and already carries the upstream status / headers / body. Use this to transform the final response or emit logs/metrics. Runs on the short-circuit path too, so audit logging sees rejected requests. Default: no-op.
231 232 233 |
# File 'lib/tep/proxy.rb', line 231 def after_forward(req, ures, res) 0 end |
#before_forward(req, res, ureq) ⇒ Object
Runs after the request body is fully received, before forwarding. ‘ureq` is a mutable Tep::Proxy::UpstreamRequest (verb / path / headers / body) pre-filled from the inbound request with hop-by-hop headers stripped. Mutate it to tweak what the upstream sees. Return `true` to short-circuit – the upstream call is skipped and `res` (which you set) is sent to the client. Return `false` to forward. Default: forward.
219 220 221 |
# File 'lib/tep/proxy.rb', line 219 def before_forward(req, res, ureq) false end |
#dispatch_one(out, stats, relay_buf) ⇒ Object
Count one unit + dispatch it to on_stream_chunk via implicit self (polymorphic – reaches subclass overrides). ‘relay_buf` is named distinctly from `chunk` / `frame`: spinel unifies param types by name file-wide, and both of those names carry foreign types (poly hook param / WS int-array) that would mis-type this String. See [[spinel-widening-dispatch]].
588 589 590 591 592 593 |
# File 'lib/tep/proxy.rb', line 588 def dispatch_one(out, stats, relay_buf) stats.byte_count = stats.byte_count + relay_buf.length stats.chunk_count = stats.chunk_count + 1 on_stream_chunk(Tep::Proxy::StreamChunk.new(relay_buf), out, stats) 0 end |
#drain_events(out, stats, body_buf) ⇒ Object
Split body_buf into complete “nn”-terminated SSE event records and dispatch each (the record includes the trailing blank line, per the doc’s “data: …nn” contract). Returns the unconsumed tail.
569 570 571 572 573 574 575 576 577 578 579 580 |
# File 'lib/tep/proxy.rb', line 569 def drain_events(out, stats, body_buf) while true sep = Tep.str_find(body_buf, "\n\n", 0) if sep < 0 return body_buf end relay_buf = body_buf[0, sep + 2] body_buf = body_buf[sep + 2, body_buf.length - sep - 2] dispatch_one(out, stats, relay_buf) end body_buf end |
#handle(req, res) ⇒ Object
—- Tep::Handler interface —-
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 |
# File 'lib/tep/proxy.rb', line 292 def handle(req, res) # Request-body cap (chunk 6.6). Reject oversize bodies BEFORE # any upstream call. 413 Payload Too Large with an OpenAI-shape # error JSON for symmetry with the other handler error paths. # max_request_body_bytes == 0 disables the cap. if @max_request_body_bytes > 0 && req.raw_body.length > @max_request_body_bytes res.set_status(413) res.headers["Content-Type"] = "application/json" err_body = "{\"error\":{" + Tep::Json.encode_pair_str("message", "request body exceeds proxy cap of " + @max_request_body_bytes.to_s + " bytes") + "," + Tep::Json.encode_pair_str("type", "payload_too_large") + "}}" res.set_body(err_body) return err_body end ureq = Tep::Proxy::UpstreamRequest.new ureq.verb = req.verb ureq.path = rewrite_path(req.raw_path) ureq.body = req.raw_body # Copy inbound headers minus: hop-by-hop (RFC 7230), `host` # (Tep::Http derives Host from the upstream URL -- forwarding # the client's Host would emit a duplicate, which nginx-class # upstreams 400), and `content-length` (Tep::Http computes its # own from the body, same duplicate risk). req.req_headers.each do |k, v| lc = k.downcase if !Tep::Proxy.hop_by_hop?(k) && lc != "host" && lc != "content-length" ureq.headers[k] = v end end short = before_forward(req, res, ureq) if short # Short-circuited: no upstream call. after_forward still # runs (audit), with an empty upstream Response. after_forward(req, Tep::Http::Response.new, res) return res.body end # Streaming branch (chunk 6.2). When the handler opts the # request into streaming, forward over a held-open connection # and pump the upstream body through on_stream_chunk to the # client, firing on_stream_end once at the end. Requires the # scheduled server (cooperative io_wait), same constraint as # WebSocket. after_forward is NOT run for streamed responses # (it's the non-streaming analog; on_stream_end is its # streaming counterpart). if stream_request?(req) return start_streaming_forward(req, res, ureq) end url = pick_upstream(req) + ureq.path policy = retry_policy(req) attempt = 0 ures = Tep::Http::Response.new while attempt < policy.max_attempts ures = Tep::Http.send_req(ureq.verb, url, ureq.body, ureq.headers) # Success or non-retriable failure -- done. if !policy.retriable?(ures.status) break end attempt += 1 # Sleep before the NEXT attempt, only if there is one. Backoff # is integer milliseconds via the nanosleep-backed C helper; # default 0 (no delay) keeps tests fast. if attempt < policy.max_attempts backoff = policy.backoff_for(attempt - 1) if backoff > 0 Sock.sphttp_sleep_ms(backoff) end end end # Expose retry count to observability filters via req.ivars. req.ivars["proxy_retry_count"] = attempt.to_s # Response-body cap (chunk 6.6). If the upstream returned more # bytes than the proxy will forward, fail with 502 + a # proxy_error JSON. The body has already been buffered by # Tep::Http (no streaming on the buffered path), so this is a # post-hoc reject -- worst case the worker briefly holds the # large body then drops it. A future streaming-aware cap can # bail mid-recv. if @max_response_body_bytes > 0 && ures.body.length > @max_response_body_bytes res.set_status(502) res.headers["Content-Type"] = "application/json" err_body = "{\"error\":{" + Tep::Json.encode_pair_str("message", "upstream response body exceeds proxy cap of " + @max_response_body_bytes.to_s + " bytes") + "," + Tep::Json.encode_pair_str("type", "upstream_body_too_large") + "}}" res.set_body(err_body) return err_body end if ures.status > 0 res.set_status(ures.status) else # Connect / send failure, or non-http upstream scheme. res.set_status(502) end # Copy upstream response headers, minus hop-by-hop AND # content-length: the tep server writer computes its own # Content-Length from res.body, so a copied one would # duplicate the header. ures.headers.each do |k, v| if !Tep::Proxy.hop_by_hop?(k) && k.downcase != "content-length" res.headers[k] = v end end # Force the body assignment through a Response method (self is # unambiguously Response there) -- a direct `res.body =` from # this poly-dispatched handle() mis-codegens under spinel. res.set_body(ures.body) after_forward(req, ures, res) res.body end |
#on_stream_chunk(chunk, out, stats) ⇒ Object
Per-chunk streaming hook (chunk 6.2). Called once per upstream body chunk – one dechunked HTTP chunk for a chunked upstream, or one complete SSE event record (“…nn”, including the trailing blank line) for a text/event-stream upstream. ‘out` is the Tep::Stream writer to the client; `stats` is a Tep::Proxy::StreamStats carried across the whole stream (the framework maintains stats.byte_count / stats.chunk_count; accumulate your own counters in stats.meta_bag). Default: pass the chunk through unchanged. Drop it by not calling out.write; transform by writing modified bytes; fan out by writing more than once.
‘chunk` is a Tep::Proxy::StreamChunk, NOT a bare String: read the bytes via `chunk.chunk_text`. The wrapper exists because spinel boxes a primitive String arg to poly when it flows through the poly-receiver dispatch into this overridable hook – a bare String param would arrive poly and block String methods (chunk.include? etc.). An object param survives the dispatch as a typed pointer (same reason Tep::WebSocket passes `evt` with an evt.data accessor). See [[spinel-widening-dispatch]].
275 276 277 278 |
# File 'lib/tep/proxy.rb', line 275 def on_stream_chunk(chunk, out, stats) out.write(chunk.chunk_text) 0 end |
#on_stream_end(req, out, stats) ⇒ Object
End-of-stream finalizer (chunk 6.2, #81). Fires exactly once after the last chunk has been emitted and the upstream closed (cleanly or via error – stats.errored distinguishes). ‘out` is still writable, so a finalizer can emit one last frame (e.g. a closing SSE event). `stats` is the same object on_stream_chunk accumulated into. Default: no-op.
286 287 288 |
# File 'lib/tep/proxy.rb', line 286 def on_stream_end(req, out, stats) 0 end |
#pick_upstream(req) ⇒ Object
Per-request upstream selection (chunk 6.4). Return the URL of the upstream this request should be forwarded to. Default returns @upstream (the constructor’s single-upstream value), preserving back-compat. Override to route by path / header / tenant / capability:
class ApiGateway < Tep::Proxy
def pick_upstream(req)
if req.path.start_with?("/api/v1/")
"http://upstream-v1.local:8080"
else
"http://upstream-v2.local:8080"
end
end
end
Also available as a block-DSL hook (lowered by bin/tep):
gw = Tep::Proxy.new("http://default.local:8080")
gw.pick_upstream do |req|
...
end
The returned URL is prefix-joined with the rewrite_path output, so it should NOT include the request path (just scheme://host:port + optional fixed prefix).
201 202 203 |
# File 'lib/tep/proxy.rb', line 201 def pick_upstream(req) @upstream end |
#retry_policy(req) ⇒ Object
Per-request retry policy (chunk 6.5). Return a Tep::Proxy::RetryPolicy whose max_attempts > 1 to retry the buffered forward on transient upstream failure. Default: 1 attempt (no retry). Override to enable retries; gate on the request shape so non-idempotent POSTs can skip retries while GETs use them:
class ApiGateway < Tep::Proxy
def retry_policy(req)
p = Tep::Proxy::RetryPolicy.new
p.max_attempts = 3
p.base_backoff_ms = 100 # exponential: 100ms, 200ms, 400ms
p
end
end
Also available as a block-DSL hook (lowered by bin/tep). Streaming requests don’t retry (the stream may have already written bytes to the client when failure occurs); only the buffered path consults the policy.
171 172 173 |
# File 'lib/tep/proxy.rb', line 171 def retry_policy(req) Tep::Proxy::RetryPolicy.new end |
#rewrite_path(path) ⇒ Object
Map the inbound request’s path+query to the upstream path+query. Default: forward verbatim. Override to strip a mount prefix, pin a fixed upstream path, etc.
208 209 210 |
# File 'lib/tep/proxy.rb', line 208 def rewrite_path(path) path end |
#run_stream(out, fd, leftover, is_chunked, is_sse, req) ⇒ Object
The streaming pump, called from ProxyStreamer#pump as @proxy.run_stream(…). It lives here, on Tep::Proxy, rather than on the streamer so that on_stream_chunk / on_stream_end below are invoked as plain (implicit-self) calls. spinel resolves an implicit-self call inside a base-class method polymorphically – it includes every subclass arm – so a subclass’s overrides are reached. A call through the streamer’s hooks. Same reason rewrite_path / stream_request? (implicit-self from handle) dispatch to overrides but a slot call would not.
Recv-loops the held-open upstream fd: dechunks (chunked upstream), splits SSE event records (text/event-stream), and dispatches each unit through dispatch_one. Fires on_stream_end once at EOF / timeout. Cooperative – parks on io_wait between recvs, so requires Tep::Server::Scheduled.
514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 |
# File 'lib/tep/proxy.rb', line 514 def run_stream(out, fd, leftover, is_chunked, is_sse, req) stats = Tep::Proxy::StreamStats.new buf = leftover # raw (possibly chunked) bytes body_buf = "" # dechunked bytes awaiting SSE split done = false while !done if is_chunked consumed = Tep::Llm.dechunk_consume(buf) buf = Tep::Llm.dechunk_leftover(buf) if consumed.length > 0 body_buf = body_buf + consumed end else body_buf = body_buf + buf buf = "" end if is_sse body_buf = drain_events(out, stats, body_buf) else if body_buf.length > 0 dispatch_one(out, stats, body_buf) body_buf = "" end end ready = Tep::Scheduler.io_wait(fd, Tep::Scheduler::READ, 60) if ready == 0 stats.errored = true done = true else more = Sock.sphttp_recv_some(fd, 4096) if more.length == 0 done = true # clean EOF else buf = buf + more end end end # Flush a trailing partial SSE event (some upstreams omit the # final blank line before closing). if is_sse && body_buf.length > 0 drain_events(out, stats, body_buf + "\n\n") end Sock.sphttp_close(fd) on_stream_end(req, out, stats) 0 end |
#start_streaming_forward(req, res, ureq) ⇒ Object
Streaming forward (chunk 6.2). Connects to the upstream, writes the request, reads just the response head, then hands the still- open fd to a ProxyStreamer via res.start_stream – the server later drives streamer.pump, which recv-loops the upstream body and dispatches it through on_stream_chunk / on_stream_end.
Returns “” (the streamed body goes out via the streamer, not the buffered res.body). On connect/scheme/head-read failure, sets a 502 and returns “” without starting a stream.
425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 |
# File 'lib/tep/proxy.rb', line 425 def start_streaming_forward(req, res, ureq) url = pick_upstream(req) + ureq.path parts = Tep::Url.split_url(url) if parts["scheme"] != "http" res.set_status(502) return "" end host = parts["host"] port = parts["port"].to_i path = parts["path"] if parts["query"].length > 0 path = path + "?" + parts["query"] end fd = Sock.sphttp_connect(host, port) if fd < 0 res.set_status(502) return "" end Sock.sphttp_set_nonblock(fd) head = ureq.verb + " " + path + " HTTP/1.1\r\n" + "Host: " + host + "\r\n" + "Connection: close\r\n" ureq.headers.each do |k, v| head = head + k + ": " + v + "\r\n" end if ureq.body.length > 0 head = head + "Content-Length: " + ureq.body.length.to_s + "\r\n" end head = head + "\r\n" if Sock.sphttp_write_str(fd, head) < 0 Sock.sphttp_close(fd) res.set_status(502) return "" end if ureq.body.length > 0 if Sock.sphttp_write_str(fd, ureq.body) < 0 Sock.sphttp_close(fd) res.set_status(502) return "" end end uh = Tep::Proxy.read_upstream_head(fd) if !uh.ok Sock.sphttp_close(fd) res.set_status(502) return "" end res.set_status(uh.status) # Copy upstream headers minus hop-by-hop, content-length (the # client side is chunked -- no fixed length), and transfer- # encoding (the server writer re-applies chunked itself). uh.headers.each do |k, v| lc = k.downcase if !Tep::Proxy.hop_by_hop?(k) && lc != "content-length" res.headers[k] = v end end streamer = Tep::Proxy::ProxyStreamer.new streamer.proxy = self streamer.fd = fd streamer.leftover = uh.leftover streamer.is_chunked = uh.is_chunked streamer.is_sse = uh.is_sse streamer.req = req res.start_stream(streamer) "" end |
#stream_request?(req) ⇒ Boolean
Streaming opt-in predicate. Return true to forward this request over a held-open connection and pump the upstream response through on_stream_chunk / on_stream_end (chunk 6.2) instead of the buffered before/after path. Default: false (buffered).
tep uses a request-side opt-in rather than sniffing the upstream response Content-Type because (a) it keeps the non-streaming path on the unchanged buffered Tep::Http.send_req (no manual-connect tax on the common case), and (b) it matches how streaming APIs actually signal intent – an OpenAI client sets ‘“stream”: true` in the request body, so the proxy knows before it connects. An LLM gateway typically overrides this as:
def stream_request?(req)
Tep::Json.get_bool(req.raw_body, "stream")
end
251 252 253 |
# File 'lib/tep/proxy.rb', line 251 def stream_request?(req) false end |