Class: Tep::Proxy

Inherits:
Handler show all
Defined in:
lib/tep/proxy.rb,
lib/tep/proxy.rb

Overview

Retry behaviour for the buffered forward path (chunk 6.5). Returned by Tep::Proxy#retry_policy(req); fresh instance per request so the policy can be derived from the request (e.g. idempotent verbs get more attempts, POSTs none).

Backoff is integer-MILLISECONDS via Sock.sphttp_sleep_ms (a nanosleep-backed C helper). Sub-second pacing is the right default for HTTP retries – whole-second backoffs throw away throughput on transient blips that resolve quickly. Two setters for the base backoff:

* base_backoff_ms = 100              # int, direct ms.
* base_backoff_secs = 0.1            # Float, converted to ms.

Set whichever reads better at the call site; both feed the same ms-int through backoff_for. If both are set, the LAST write wins (whichever setter you called second).

Default shape: max_attempts=1 (no retry, back-compat).

Defined Under Namespace

Classes: ProxyStreamer, RetryPolicy, StreamChunk, StreamStats, UpstreamHead, UpstreamRequest

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Handler

#is_regex?, #re_capture, #re_match?

Constructor Details

#initialize(upstream) ⇒ Proxy

Returns a new instance of Proxy.



142
143
144
145
146
147
# File 'lib/tep/proxy.rb', line 142

def initialize(upstream)
  @upstream = upstream
  @timeout  = 30
  @max_request_body_bytes  = 1 * 1024 * 1024
  @max_response_body_bytes = 8 * 1024 * 1024
end

Instance Attribute Details

#max_request_body_bytesObject

Body size caps (chunk 6.6). max_request_body_bytes bounds the inbound body the proxy will accept (over -> 413 Payload Too Large before any upstream call). max_response_body_bytes bounds the upstream response body the proxy will forward (over -> 502 with a proxy_error JSON). Defaults: 1 MiB request / 8 MiB response – enough for typical JSON-API gateway use, small enough that a malicious / malfunctioning peer can’t easily OOM the worker. Override in initialize() (or expose a block-DSL setter) for larger / smaller caps per deployment. Set either to 0 to disable that cap (not recommended).



140
141
142
# File 'lib/tep/proxy.rb', line 140

def max_request_body_bytes
  @max_request_body_bytes
end

#max_response_body_bytesObject

Body size caps (chunk 6.6). max_request_body_bytes bounds the inbound body the proxy will accept (over -> 413 Payload Too Large before any upstream call). max_response_body_bytes bounds the upstream response body the proxy will forward (over -> 502 with a proxy_error JSON). Defaults: 1 MiB request / 8 MiB response – enough for typical JSON-API gateway use, small enough that a malicious / malfunctioning peer can’t easily OOM the worker. Override in initialize() (or expose a block-DSL setter) for larger / smaller caps per deployment. Set either to 0 to disable that cap (not recommended).



140
141
142
# File 'lib/tep/proxy.rb', line 140

def max_response_body_bytes
  @max_response_body_bytes
end

#timeoutObject

Returns the value of attribute timeout.



129
130
131
# File 'lib/tep/proxy.rb', line 129

def timeout
  @timeout
end

#upstreamObject

Returns the value of attribute upstream.



129
130
131
# File 'lib/tep/proxy.rb', line 129

def upstream
  @upstream
end

Class Method Details

.hop_by_hop?(name) ⇒ Boolean

RFC 7230 §6.1 hop-by-hop headers: meaningful only for a single transport-level connection, never forwarded by a proxy. Lower- cased compare since both inbound and upstream header names are downcased by tep’s parsers.

Returns:

  • (Boolean)


633
634
635
636
637
638
639
640
641
642
643
# File 'lib/tep/proxy.rb', line 633

def self.hop_by_hop?(name)
  lc = name.downcase
  lc == "connection" ||
    lc == "keep-alive" ||
    lc == "transfer-encoding" ||
    lc == "upgrade" ||
    lc == "proxy-authorization" ||
    lc == "proxy-authenticate" ||
    lc == "te" ||
    lc == "trailer"
end

.read_upstream_head(fd) ⇒ Object

Read an upstream response head (status line + headers up to the blank line) cooperatively. Returns a Tep::Proxy::UpstreamHead carrying the parsed status, the per-name header bag, the chunked / SSE flags, the body bytes already read past the head (leftover – handed to the streamer so no bytes are lost), and an ok flag (false on timeout / EOF before the head completed).



601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
# File 'lib/tep/proxy.rb', line 601

def self.read_upstream_head(fd)
  out = Tep::Proxy::UpstreamHead.new
  buf = ""
  while true
    ready = Tep::Scheduler.io_wait(fd, Tep::Scheduler::READ, 60)
    if ready == 0
      return out          # timeout -- ok stays false
    end
    chunk = Sock.sphttp_recv_some(fd, 4096)
    if chunk.length == 0
      return out          # EOF before head completed
    end
    buf = buf + chunk
    eoh = Tep.str_find(buf, "\r\n\r\n", 0)
    if eoh >= 0
      header_blob = buf[0, eoh]
      out.leftover = buf[eoh + 4, buf.length - eoh - 4]
      out.fill_from(header_blob)
      out.ok = true
      return out
    end
    if buf.length > 65535
      return out          # head too large -- bail
    end
  end
  out
end

Instance Method Details

#after_forward(req, ures, res) ⇒ Object

Runs after the upstream responds, before ‘res` is written to the client. `ures` is the Tep::Http::Response from upstream (status 0 + empty body on connect failure; an empty Response when a before_forward short-circuited). `res` is mutable and already carries the upstream status / headers / body. Use this to transform the final response or emit logs/metrics. Runs on the short-circuit path too, so audit logging sees rejected requests. Default: no-op.



231
232
233
# File 'lib/tep/proxy.rb', line 231

def after_forward(req, ures, res)
  0
end

#before_forward(req, res, ureq) ⇒ Object

Runs after the request body is fully received, before forwarding. ‘ureq` is a mutable Tep::Proxy::UpstreamRequest (verb / path / headers / body) pre-filled from the inbound request with hop-by-hop headers stripped. Mutate it to tweak what the upstream sees. Return `true` to short-circuit – the upstream call is skipped and `res` (which you set) is sent to the client. Return `false` to forward. Default: forward.



219
220
221
# File 'lib/tep/proxy.rb', line 219

def before_forward(req, res, ureq)
  false
end

#dispatch_one(out, stats, relay_buf) ⇒ Object

Count one unit + dispatch it to on_stream_chunk via implicit self (polymorphic – reaches subclass overrides). ‘relay_buf` is named distinctly from `chunk` / `frame`: spinel unifies param types by name file-wide, and both of those names carry foreign types (poly hook param / WS int-array) that would mis-type this String. See [[spinel-widening-dispatch]].



588
589
590
591
592
593
# File 'lib/tep/proxy.rb', line 588

def dispatch_one(out, stats, relay_buf)
  stats.byte_count  = stats.byte_count + relay_buf.length
  stats.chunk_count = stats.chunk_count + 1
  on_stream_chunk(Tep::Proxy::StreamChunk.new(relay_buf), out, stats)
  0
end

#drain_events(out, stats, body_buf) ⇒ Object

Split body_buf into complete “nn”-terminated SSE event records and dispatch each (the record includes the trailing blank line, per the doc’s “data: …nn” contract). Returns the unconsumed tail.



569
570
571
572
573
574
575
576
577
578
579
580
# File 'lib/tep/proxy.rb', line 569

def drain_events(out, stats, body_buf)
  while true
    sep = Tep.str_find(body_buf, "\n\n", 0)
    if sep < 0
      return body_buf
    end
    relay_buf = body_buf[0, sep + 2]
    body_buf  = body_buf[sep + 2, body_buf.length - sep - 2]
    dispatch_one(out, stats, relay_buf)
  end
  body_buf
end

#handle(req, res) ⇒ Object

—- Tep::Handler interface —-



292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
# File 'lib/tep/proxy.rb', line 292

def handle(req, res)
  # Request-body cap (chunk 6.6). Reject oversize bodies BEFORE
  # any upstream call. 413 Payload Too Large with an OpenAI-shape
  # error JSON for symmetry with the other handler error paths.
  # max_request_body_bytes == 0 disables the cap.
  if @max_request_body_bytes > 0 && req.raw_body.length > @max_request_body_bytes
    res.set_status(413)
    res.headers["Content-Type"] = "application/json"
    err_body = "{\"error\":{" +
      Tep::Json.encode_pair_str("message",
        "request body exceeds proxy cap of " +
        @max_request_body_bytes.to_s + " bytes") + "," +
      Tep::Json.encode_pair_str("type", "payload_too_large") +
    "}}"
    res.set_body(err_body)
    return err_body
  end

  ureq = Tep::Proxy::UpstreamRequest.new
  ureq.verb = req.verb
  ureq.path = rewrite_path(req.raw_path)
  ureq.body = req.raw_body
  # Copy inbound headers minus: hop-by-hop (RFC 7230), `host`
  # (Tep::Http derives Host from the upstream URL -- forwarding
  # the client's Host would emit a duplicate, which nginx-class
  # upstreams 400), and `content-length` (Tep::Http computes its
  # own from the body, same duplicate risk).
  req.req_headers.each do |k, v|
    lc = k.downcase
    if !Tep::Proxy.hop_by_hop?(k) && lc != "host" && lc != "content-length"
      ureq.headers[k] = v
    end
  end

  short = before_forward(req, res, ureq)
  if short
    # Short-circuited: no upstream call. after_forward still
    # runs (audit), with an empty upstream Response.
    after_forward(req, Tep::Http::Response.new, res)
    return res.body
  end

  # Streaming branch (chunk 6.2). When the handler opts the
  # request into streaming, forward over a held-open connection
  # and pump the upstream body through on_stream_chunk to the
  # client, firing on_stream_end once at the end. Requires the
  # scheduled server (cooperative io_wait), same constraint as
  # WebSocket. after_forward is NOT run for streamed responses
  # (it's the non-streaming analog; on_stream_end is its
  # streaming counterpart).
  if stream_request?(req)
    return start_streaming_forward(req, res, ureq)
  end

  url    = pick_upstream(req) + ureq.path
  policy = retry_policy(req)
  attempt = 0
  ures = Tep::Http::Response.new
  while attempt < policy.max_attempts
    ures = Tep::Http.send_req(ureq.verb, url, ureq.body, ureq.headers)
    # Success or non-retriable failure -- done.
    if !policy.retriable?(ures.status)
      break
    end
    attempt += 1
    # Sleep before the NEXT attempt, only if there is one. Backoff
    # is integer milliseconds via the nanosleep-backed C helper;
    # default 0 (no delay) keeps tests fast.
    if attempt < policy.max_attempts
      backoff = policy.backoff_for(attempt - 1)
      if backoff > 0
        Sock.sphttp_sleep_ms(backoff)
      end
    end
  end
  # Expose retry count to observability filters via req.ivars.
  req.ivars["proxy_retry_count"] = attempt.to_s

  # Response-body cap (chunk 6.6). If the upstream returned more
  # bytes than the proxy will forward, fail with 502 + a
  # proxy_error JSON. The body has already been buffered by
  # Tep::Http (no streaming on the buffered path), so this is a
  # post-hoc reject -- worst case the worker briefly holds the
  # large body then drops it. A future streaming-aware cap can
  # bail mid-recv.
  if @max_response_body_bytes > 0 && ures.body.length > @max_response_body_bytes
    res.set_status(502)
    res.headers["Content-Type"] = "application/json"
    err_body = "{\"error\":{" +
      Tep::Json.encode_pair_str("message",
        "upstream response body exceeds proxy cap of " +
        @max_response_body_bytes.to_s + " bytes") + "," +
      Tep::Json.encode_pair_str("type", "upstream_body_too_large") +
    "}}"
    res.set_body(err_body)
    return err_body
  end

  if ures.status > 0
    res.set_status(ures.status)
  else
    # Connect / send failure, or non-http upstream scheme.
    res.set_status(502)
  end

  # Copy upstream response headers, minus hop-by-hop AND
  # content-length: the tep server writer computes its own
  # Content-Length from res.body, so a copied one would
  # duplicate the header.
  ures.headers.each do |k, v|
    if !Tep::Proxy.hop_by_hop?(k) && k.downcase != "content-length"
      res.headers[k] = v
    end
  end

  # Force the body assignment through a Response method (self is
  # unambiguously Response there) -- a direct `res.body =` from
  # this poly-dispatched handle() mis-codegens under spinel.
  res.set_body(ures.body)

  after_forward(req, ures, res)
  res.body
end

#on_stream_chunk(chunk, out, stats) ⇒ Object

Per-chunk streaming hook (chunk 6.2). Called once per upstream body chunk – one dechunked HTTP chunk for a chunked upstream, or one complete SSE event record (“…nn”, including the trailing blank line) for a text/event-stream upstream. ‘out` is the Tep::Stream writer to the client; `stats` is a Tep::Proxy::StreamStats carried across the whole stream (the framework maintains stats.byte_count / stats.chunk_count; accumulate your own counters in stats.meta_bag). Default: pass the chunk through unchanged. Drop it by not calling out.write; transform by writing modified bytes; fan out by writing more than once.

‘chunk` is a Tep::Proxy::StreamChunk, NOT a bare String: read the bytes via `chunk.chunk_text`. The wrapper exists because spinel boxes a primitive String arg to poly when it flows through the poly-receiver dispatch into this overridable hook – a bare String param would arrive poly and block String methods (chunk.include? etc.). An object param survives the dispatch as a typed pointer (same reason Tep::WebSocket passes `evt` with an evt.data accessor). See [[spinel-widening-dispatch]].



275
276
277
278
# File 'lib/tep/proxy.rb', line 275

def on_stream_chunk(chunk, out, stats)
  out.write(chunk.chunk_text)
  0
end

#on_stream_end(req, out, stats) ⇒ Object

End-of-stream finalizer (chunk 6.2, #81). Fires exactly once after the last chunk has been emitted and the upstream closed (cleanly or via error – stats.errored distinguishes). ‘out` is still writable, so a finalizer can emit one last frame (e.g. a closing SSE event). `stats` is the same object on_stream_chunk accumulated into. Default: no-op.



286
287
288
# File 'lib/tep/proxy.rb', line 286

def on_stream_end(req, out, stats)
  0
end

#pick_upstream(req) ⇒ Object

Per-request upstream selection (chunk 6.4). Return the URL of the upstream this request should be forwarded to. Default returns @upstream (the constructor’s single-upstream value), preserving back-compat. Override to route by path / header / tenant / capability:

class ApiGateway < Tep::Proxy
  def pick_upstream(req)
    if req.path.start_with?("/api/v1/")
      "http://upstream-v1.local:8080"
    else
      "http://upstream-v2.local:8080"
    end
  end
end

Also available as a block-DSL hook (lowered by bin/tep):

gw = Tep::Proxy.new("http://default.local:8080")
gw.pick_upstream do |req|
  ...
end

The returned URL is prefix-joined with the rewrite_path output, so it should NOT include the request path (just scheme://host:port + optional fixed prefix).



201
202
203
# File 'lib/tep/proxy.rb', line 201

def pick_upstream(req)
  @upstream
end

#retry_policy(req) ⇒ Object

Per-request retry policy (chunk 6.5). Return a Tep::Proxy::RetryPolicy whose max_attempts > 1 to retry the buffered forward on transient upstream failure. Default: 1 attempt (no retry). Override to enable retries; gate on the request shape so non-idempotent POSTs can skip retries while GETs use them:

class ApiGateway < Tep::Proxy
  def retry_policy(req)
    p = Tep::Proxy::RetryPolicy.new
    p.max_attempts     = 3
    p.base_backoff_ms  = 100   # exponential: 100ms, 200ms, 400ms
    p
  end
end

Also available as a block-DSL hook (lowered by bin/tep). Streaming requests don’t retry (the stream may have already written bytes to the client when failure occurs); only the buffered path consults the policy.



171
172
173
# File 'lib/tep/proxy.rb', line 171

def retry_policy(req)
  Tep::Proxy::RetryPolicy.new
end

#rewrite_path(path) ⇒ Object

Map the inbound request’s path+query to the upstream path+query. Default: forward verbatim. Override to strip a mount prefix, pin a fixed upstream path, etc.



208
209
210
# File 'lib/tep/proxy.rb', line 208

def rewrite_path(path)
  path
end

#run_stream(out, fd, leftover, is_chunked, is_sse, req) ⇒ Object

The streaming pump, called from ProxyStreamer#pump as @proxy.run_stream(…). It lives here, on Tep::Proxy, rather than on the streamer so that on_stream_chunk / on_stream_end below are invoked as plain (implicit-self) calls. spinel resolves an implicit-self call inside a base-class method polymorphically – it includes every subclass arm – so a subclass’s overrides are reached. A call through the streamer’s hooks. Same reason rewrite_path / stream_request? (implicit-self from handle) dispatch to overrides but a slot call would not.

Recv-loops the held-open upstream fd: dechunks (chunked upstream), splits SSE event records (text/event-stream), and dispatches each unit through dispatch_one. Fires on_stream_end once at EOF / timeout. Cooperative – parks on io_wait between recvs, so requires Tep::Server::Scheduled.



514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
# File 'lib/tep/proxy.rb', line 514

def run_stream(out, fd, leftover, is_chunked, is_sse, req)
  stats    = Tep::Proxy::StreamStats.new
  buf      = leftover    # raw (possibly chunked) bytes
  body_buf = ""          # dechunked bytes awaiting SSE split
  done     = false
  while !done
    if is_chunked
      consumed = Tep::Llm.dechunk_consume(buf)
      buf      = Tep::Llm.dechunk_leftover(buf)
      if consumed.length > 0
        body_buf = body_buf + consumed
      end
    else
      body_buf = body_buf + buf
      buf      = ""
    end

    if is_sse
      body_buf = drain_events(out, stats, body_buf)
    else
      if body_buf.length > 0
        dispatch_one(out, stats, body_buf)
        body_buf = ""
      end
    end

    ready = Tep::Scheduler.io_wait(fd, Tep::Scheduler::READ, 60)
    if ready == 0
      stats.errored = true
      done = true
    else
      more = Sock.sphttp_recv_some(fd, 4096)
      if more.length == 0
        done = true        # clean EOF
      else
        buf = buf + more
      end
    end
  end

  # Flush a trailing partial SSE event (some upstreams omit the
  # final blank line before closing).
  if is_sse && body_buf.length > 0
    drain_events(out, stats, body_buf + "\n\n")
  end

  Sock.sphttp_close(fd)
  on_stream_end(req, out, stats)
  0
end

#start_streaming_forward(req, res, ureq) ⇒ Object

Streaming forward (chunk 6.2). Connects to the upstream, writes the request, reads just the response head, then hands the still- open fd to a ProxyStreamer via res.start_stream – the server later drives streamer.pump, which recv-loops the upstream body and dispatches it through on_stream_chunk / on_stream_end.

Returns “” (the streamed body goes out via the streamer, not the buffered res.body). On connect/scheme/head-read failure, sets a 502 and returns “” without starting a stream.



425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
# File 'lib/tep/proxy.rb', line 425

def start_streaming_forward(req, res, ureq)
  url   = pick_upstream(req) + ureq.path
  parts = Tep::Url.split_url(url)
  if parts["scheme"] != "http"
    res.set_status(502)
    return ""
  end
  host = parts["host"]
  port = parts["port"].to_i
  path = parts["path"]
  if parts["query"].length > 0
    path = path + "?" + parts["query"]
  end

  fd = Sock.sphttp_connect(host, port)
  if fd < 0
    res.set_status(502)
    return ""
  end
  Sock.sphttp_set_nonblock(fd)

  head = ureq.verb + " " + path + " HTTP/1.1\r\n" +
         "Host: " + host + "\r\n" +
         "Connection: close\r\n"
  ureq.headers.each do |k, v|
    head = head + k + ": " + v + "\r\n"
  end
  if ureq.body.length > 0
    head = head + "Content-Length: " + ureq.body.length.to_s + "\r\n"
  end
  head = head + "\r\n"
  if Sock.sphttp_write_str(fd, head) < 0
    Sock.sphttp_close(fd)
    res.set_status(502)
    return ""
  end
  if ureq.body.length > 0
    if Sock.sphttp_write_str(fd, ureq.body) < 0
      Sock.sphttp_close(fd)
      res.set_status(502)
      return ""
    end
  end

  uh = Tep::Proxy.read_upstream_head(fd)
  if !uh.ok
    Sock.sphttp_close(fd)
    res.set_status(502)
    return ""
  end

  res.set_status(uh.status)
  # Copy upstream headers minus hop-by-hop, content-length (the
  # client side is chunked -- no fixed length), and transfer-
  # encoding (the server writer re-applies chunked itself).
  uh.headers.each do |k, v|
    lc = k.downcase
    if !Tep::Proxy.hop_by_hop?(k) && lc != "content-length"
      res.headers[k] = v
    end
  end

  streamer = Tep::Proxy::ProxyStreamer.new
  streamer.proxy      = self
  streamer.fd         = fd
  streamer.leftover   = uh.leftover
  streamer.is_chunked = uh.is_chunked
  streamer.is_sse     = uh.is_sse
  streamer.req        = req
  res.start_stream(streamer)
  ""
end

#stream_request?(req) ⇒ Boolean

Streaming opt-in predicate. Return true to forward this request over a held-open connection and pump the upstream response through on_stream_chunk / on_stream_end (chunk 6.2) instead of the buffered before/after path. Default: false (buffered).

tep uses a request-side opt-in rather than sniffing the upstream response Content-Type because (a) it keeps the non-streaming path on the unchanged buffered Tep::Http.send_req (no manual-connect tax on the common case), and (b) it matches how streaming APIs actually signal intent – an OpenAI client sets ‘“stream”: true` in the request body, so the proxy knows before it connects. An LLM gateway typically overrides this as:

def stream_request?(req)
  Tep::Json.get_bool(req.raw_body, "stream")
end

Returns:

  • (Boolean)


251
252
253
# File 'lib/tep/proxy.rb', line 251

def stream_request?(req)
  false
end