Class: AllStak::Transport::HttpTransport
- Inherits:
-
Object
- Object
- AllStak::Transport::HttpTransport
- Defined in:
- lib/allstak/transport/http_transport.rb
Overview
HTTP transport with retry/backoff and 401-disable.
Contract:
connect timeout = 3s · read timeout = 3s
backoff = 1s → 2s → 4s → 8s (+ jitter 0-500ms)
max attempts = 5
401 → disable SDK
4xx (400/403/404/422) → no retry
5xx / network → retry
Constant Summary collapse
- NON_RETRYABLE_STATUSES =
[400, 401, 403, 404, 422].freeze
- BACKOFF_DELAYS =
[1.0, 2.0, 4.0, 8.0].freeze
- RETRY_AFTER_STATUSES =
Statuses for which we honor a server-provided Retry-After header.
[429, 503].freeze
- MAX_RETRY_AFTER =
Upper bound on any honored Retry-After delay, in seconds.
300.0- NON_PERSISTABLE_PATHS =
Session lifecycle calls are best-effort LIVE-only — a replayed stale session would skew durations, so they are NEVER spooled to disk.
[ "/ingest/v1/sessions/start", "/ingest/v1/sessions/end" ].freeze
Instance Attribute Summary collapse
-
#disabled ⇒ Object
readonly
Returns the value of attribute disabled.
-
#spool ⇒ Object
readonly
The offline spool, or nil when disabled / unavailable.
Instance Method Summary collapse
- #disabled? ⇒ Boolean
-
#drain_spool ⇒ Object
Replay persisted envelopes through the live transport.
-
#initialize(config, logger) ⇒ HttpTransport
constructor
A new instance of HttpTransport.
-
#parse_retry_after(header, now = Time.now) ⇒ Object
Parse an HTTP ‘Retry-After` header into a non-negative delay in seconds.
-
#persist_failed(path, payload) ⇒ Object
Persist a payload that could not be delivered.
-
#persistable?(path) ⇒ Boolean
True when this path’s telemetry is eligible for the persistent spool.
- #post(path, payload) ⇒ Object
-
#scrub_options ⇒ Object
Sanitizer options derived from config.
- #serialize_payload(payload) ⇒ Object
Constructor Details
#initialize(config, logger) ⇒ HttpTransport
Returns a new instance of HttpTransport.
38 39 40 41 42 43 44 45 |
# File 'lib/allstak/transport/http_transport.rb', line 38 def initialize(config, logger) @config = config @logger = logger @base_url = config.host @api_key = config.api_key @disabled = false @spool = build_spool(config, logger) end |
Instance Attribute Details
#disabled ⇒ Object (readonly)
Returns the value of attribute disabled.
36 37 38 |
# File 'lib/allstak/transport/http_transport.rb', line 36 def disabled @disabled end |
#spool ⇒ Object (readonly)
The offline spool, or nil when disabled / unavailable. Exposed for diagnostics + tests.
49 50 51 |
# File 'lib/allstak/transport/http_transport.rb', line 49 def spool @spool end |
Instance Method Details
#disabled? ⇒ Boolean
111 112 113 |
# File 'lib/allstak/transport/http_transport.rb', line 111 def disabled? @disabled end |
#drain_spool ⇒ Object
Replay persisted envelopes through the live transport. An entry is removed only when it is ACCEPTED (2xx) or PERMANENTLY undeliverable (a 4xx that is not 429) — anything else (network error, 5xx, 429) leaves it on disk for a future drain. Honors the existing retry/backoff and the 401-disable circuit breaker. Fully fail-open; never raises.
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/allstak/transport/http_transport.rb', line 84 def drain_spool return unless @spool&.available? @spool.each do |path, payload, handle| break if @disabled begin status, _ = post(path, payload) # post() returns for 2xx and non-retryable 4xx; raises otherwise. if status && (status < 400 || (status >= 400 && status != 429)) @spool.remove(handle) end rescue AllStakAuthError # 401 disabled the SDK mid-drain: stop, keep remaining entries. break rescue AllStakTransportError => e # Retries exhausted (network/5xx/429): keep the entry, stop draining # so we don't hammer a down endpoint. @logger.debug("[AllStak] drain stopped (still undeliverable): #{e.}") if @config.debug break rescue StandardError => e @logger.debug("[AllStak] drain entry error: #{e.class}: #{e.}") if @config.debug break end end rescue StandardError => e @logger.debug("[AllStak] drain_spool swallowed: #{e.class}: #{e.}") if @config.debug end |
#parse_retry_after(header, now = Time.now) ⇒ Object
Parse an HTTP ‘Retry-After` header into a non-negative delay in seconds.
Supports both forms from RFC 7231 §7.1.3:
- delta-seconds: an integer number of seconds ("120" → 120.0)
- HTTP-date: an absolute date; returns the delta from `now`
Returns 0.0 when the header is absent, blank, malformed, or resolves to a non-positive delay (e.g. a date in the past). The result is clamped to MAX_RETRY_AFTER. Pure and side-effect free.
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/allstak/transport/http_transport.rb', line 216 def parse_retry_after(header, now = Time.now) return 0.0 if header.nil? value = header.to_s.strip return 0.0 if value.empty? seconds = if value.match?(/\A\d+\z/) value.to_i.to_f else begin # HTTP-date (RFC 1123 / RFC 850 / asctime). httpdate raises on junk. target = Time.httpdate(value) target - now rescue ArgumentError return 0.0 end end return 0.0 if seconds.nil? || seconds <= 0 [seconds.to_f, MAX_RETRY_AFTER].min end |
#persist_failed(path, payload) ⇒ Object
Persist a payload that could not be delivered. The payload is scrubbed through the SAME PII sanitizer used on the wire BEFORE it touches disk —secrets never get persisted. Fail-open: returns false and never raises. Session lifecycle paths are skipped.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/allstak/transport/http_transport.rb', line 63 def persist_failed(path, payload) return false unless persistable?(path) scrubbed = begin parsed = payload.is_a?(String) ? JSON.parse(payload) : payload AllStak::Sanitizer.scrub(parsed, **) rescue StandardError => e @logger.debug("[AllStak] spool scrub failed; not persisting: #{e.class}: #{e.}") if @config.debug return false end @spool.persist(path, scrubbed) rescue StandardError => e @logger.debug("[AllStak] persist_failed swallowed: #{e.class}: #{e.}") if @config.debug false end |
#persistable?(path) ⇒ Boolean
True when this path’s telemetry is eligible for the persistent spool. Excludes session lifecycle calls (live-only) and is a no-op when the spool is disabled/unavailable.
54 55 56 57 |
# File 'lib/allstak/transport/http_transport.rb', line 54 def persistable?(path) return false unless @spool&.available? !NON_PERSISTABLE_PATHS.include?(path.to_s) end |
#post(path, payload) ⇒ Object
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/allstak/transport/http_transport.rb', line 115 def post(path, payload) raise AllStakAuthError, "SDK disabled" if @disabled wire_payload = serialize_payload(payload) uri = URI.parse("#{@base_url}#{path}") http = Net::HTTP.new(uri.host, uri.port) http.use_ssl = (uri.scheme == "https") http.open_timeout = @config.connect_timeout http.read_timeout = @config.read_timeout last_exc = nil last_status = 0 retry_after_delay = nil max_attempts = [[@config.max_retries.to_i, 1].max, 5].min (1..max_attempts).each do |attempt| retry_after_delay = nil begin req = Net::HTTP::Post.new(uri.request_uri, { "Content-Type" => "application/json", "X-AllStak-Key" => @api_key, "User-Agent" => "allstak-ruby/#{AllStak::VERSION}" }) req.body = wire_payload @logger.debug("[AllStak] POST #{path} attempt=#{attempt}") if @config.debug resp = http.request(req) last_status = resp.code.to_i body = resp.body.to_s if last_status == 401 @disabled = true @logger.warn("[AllStak] SDK disabled: invalid API key (401). No further events will be sent.") raise AllStakAuthError, "Invalid API key" end return [last_status, body] if NON_RETRYABLE_STATUSES.include?(last_status) return [last_status, body] if last_status < 400 # 429 (rate limited) / 503 (unavailable) → honor Retry-After when present. if RETRY_AFTER_STATUSES.include?(last_status) parsed = parse_retry_after(resp["Retry-After"]) retry_after_delay = parsed if parsed > 0 end # 5xx / 429 → retry rescue AllStakAuthError raise rescue Net::OpenTimeout, Net::ReadTimeout, Errno::ECONNREFUSED, Errno::ECONNRESET, SocketError, EOFError, IOError => e last_exc = e @logger.debug("[AllStak] transport error attempt=#{attempt}: #{e.class}: #{e.}") if @config.debug rescue => e last_exc = e @logger.debug("[AllStak] unexpected transport error attempt=#{attempt}: #{e.class}: #{e.}") if @config.debug end if attempt < max_attempts if retry_after_delay # Server told us how long to wait; honor it (already clamped). sleep(retry_after_delay) else delay = BACKOFF_DELAYS[[attempt - 1, BACKOFF_DELAYS.length - 1].min] delay += rand * 0.5 sleep(delay) end end end raise AllStakTransportError, "All #{max_attempts} attempts failed for POST #{path}. last_status=#{last_status} last_error=#{last_exc&.}" end |
#scrub_options ⇒ Object
Sanitizer options derived from config. Guarded with respond_to? so a bare/stub config (some transport unit tests) still scrubs with safe defaults: PII off (Sentry parity), no extra denylist.
200 201 202 203 204 205 |
# File 'lib/allstak/transport/http_transport.rb', line 200 def { send_default_pii: @config.respond_to?(:send_default_pii?) ? @config.send_default_pii? : false, extra_denylist: (@config.respond_to?(:extra_denylist) ? @config.extra_denylist : nil) } end |
#serialize_payload(payload) ⇒ Object
189 190 191 192 193 194 195 |
# File 'lib/allstak/transport/http_transport.rb', line 189 def serialize_payload(payload) parsed = payload.is_a?(String) ? JSON.parse(payload) : payload JSON.generate(AllStak::Sanitizer.scrub(parsed, **)) rescue StandardError => san_err @logger.warn("[AllStak] sanitizer failed; dropping payload: #{san_err.class}: #{san_err.}") raise AllStakTransportError, "sanitizer failed; payload dropped" end |