Class: AllStak::Transport::HttpTransport

Inherits:
Object
  • Object
show all
Defined in:
lib/allstak/transport/http_transport.rb

Overview

HTTP transport with retry/backoff and 401-disable.

Contract:

connect timeout = 3s   · read timeout = 3s
backoff         = 1s → 2s → 4s → 8s (+ jitter 0-500ms)
max attempts    = 5
401             → disable SDK
4xx (400/403/404/422) → no retry
5xx / network   → retry

Constant Summary collapse

NON_RETRYABLE_STATUSES =
[400, 401, 403, 404, 422].freeze
BACKOFF_DELAYS =
[1.0, 2.0, 4.0, 8.0].freeze
RETRY_AFTER_STATUSES =

Statuses for which we honor a server-provided Retry-After header.

[429, 503].freeze
MAX_RETRY_AFTER =

Upper bound on any honored Retry-After delay, in seconds.

300.0
NON_PERSISTABLE_PATHS =

Session lifecycle calls are best-effort LIVE-only — a replayed stale session would skew durations, so they are NEVER spooled to disk.

[
  "/ingest/v1/sessions/start",
  "/ingest/v1/sessions/end"
].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, logger) ⇒ HttpTransport

Returns a new instance of HttpTransport.



38
39
40
41
42
43
44
45
# File 'lib/allstak/transport/http_transport.rb', line 38

def initialize(config, logger)
  @config = config
  @logger = logger
  @base_url = config.host
  @api_key = config.api_key
  @disabled = false
  @spool = build_spool(config, logger)
end

Instance Attribute Details

#disabledObject (readonly)

Returns the value of attribute disabled.



36
37
38
# File 'lib/allstak/transport/http_transport.rb', line 36

def disabled
  @disabled
end

#spoolObject (readonly)

The offline spool, or nil when disabled / unavailable. Exposed for diagnostics + tests.



49
50
51
# File 'lib/allstak/transport/http_transport.rb', line 49

def spool
  @spool
end

Instance Method Details

#disabled?Boolean

Returns:

  • (Boolean)


111
112
113
# File 'lib/allstak/transport/http_transport.rb', line 111

def disabled?
  @disabled
end

#drain_spoolObject

Replay persisted envelopes through the live transport. An entry is removed only when it is ACCEPTED (2xx) or PERMANENTLY undeliverable (a 4xx that is not 429) — anything else (network error, 5xx, 429) leaves it on disk for a future drain. Honors the existing retry/backoff and the 401-disable circuit breaker. Fully fail-open; never raises.



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/allstak/transport/http_transport.rb', line 84

def drain_spool
  return unless @spool&.available?
  @spool.each do |path, payload, handle|
    break if @disabled
    begin
      status, _ = post(path, payload)
      # post() returns for 2xx and non-retryable 4xx; raises otherwise.
      if status && (status < 400 || (status >= 400 && status != 429))
        @spool.remove(handle)
      end
    rescue AllStakAuthError
      # 401 disabled the SDK mid-drain: stop, keep remaining entries.
      break
    rescue AllStakTransportError => e
      # Retries exhausted (network/5xx/429): keep the entry, stop draining
      # so we don't hammer a down endpoint.
      @logger.debug("[AllStak] drain stopped (still undeliverable): #{e.message}") if @config.debug
      break
    rescue StandardError => e
      @logger.debug("[AllStak] drain entry error: #{e.class}: #{e.message}") if @config.debug
      break
    end
  end
rescue StandardError => e
  @logger.debug("[AllStak] drain_spool swallowed: #{e.class}: #{e.message}") if @config.debug
end

#parse_retry_after(header, now = Time.now) ⇒ Object

Parse an HTTP ‘Retry-After` header into a non-negative delay in seconds.

Supports both forms from RFC 7231 §7.1.3:

- delta-seconds: an integer number of seconds ("120" → 120.0)
- HTTP-date:     an absolute date; returns the delta from `now`

Returns 0.0 when the header is absent, blank, malformed, or resolves to a non-positive delay (e.g. a date in the past). The result is clamped to MAX_RETRY_AFTER. Pure and side-effect free.



216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/allstak/transport/http_transport.rb', line 216

def parse_retry_after(header, now = Time.now)
  return 0.0 if header.nil?

  value = header.to_s.strip
  return 0.0 if value.empty?

  seconds =
    if value.match?(/\A\d+\z/)
      value.to_i.to_f
    else
      begin
        # HTTP-date (RFC 1123 / RFC 850 / asctime). httpdate raises on junk.
        target = Time.httpdate(value)
        target - now
      rescue ArgumentError
        return 0.0
      end
    end

  return 0.0 if seconds.nil? || seconds <= 0
  [seconds.to_f, MAX_RETRY_AFTER].min
end

#persist_failed(path, payload) ⇒ Object

Persist a payload that could not be delivered. The payload is scrubbed through the SAME PII sanitizer used on the wire BEFORE it touches disk —secrets never get persisted. Fail-open: returns false and never raises. Session lifecycle paths are skipped.



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/allstak/transport/http_transport.rb', line 63

def persist_failed(path, payload)
  return false unless persistable?(path)
  scrubbed =
    begin
      parsed = payload.is_a?(String) ? JSON.parse(payload) : payload
      AllStak::Sanitizer.scrub(parsed, **scrub_options)
    rescue StandardError => e
      @logger.debug("[AllStak] spool scrub failed; not persisting: #{e.class}: #{e.message}") if @config.debug
      return false
    end
  @spool.persist(path, scrubbed)
rescue StandardError => e
  @logger.debug("[AllStak] persist_failed swallowed: #{e.class}: #{e.message}") if @config.debug
  false
end

#persistable?(path) ⇒ Boolean

True when this path’s telemetry is eligible for the persistent spool. Excludes session lifecycle calls (live-only) and is a no-op when the spool is disabled/unavailable.

Returns:

  • (Boolean)


54
55
56
57
# File 'lib/allstak/transport/http_transport.rb', line 54

def persistable?(path)
  return false unless @spool&.available?
  !NON_PERSISTABLE_PATHS.include?(path.to_s)
end

#post(path, payload) ⇒ Object

Raises:



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/allstak/transport/http_transport.rb', line 115

def post(path, payload)
  raise AllStakAuthError, "SDK disabled" if @disabled

  wire_payload = serialize_payload(payload)

  uri = URI.parse("#{@base_url}#{path}")
  http = Net::HTTP.new(uri.host, uri.port)
  http.use_ssl = (uri.scheme == "https")
  http.open_timeout = @config.connect_timeout
  http.read_timeout = @config.read_timeout

  last_exc = nil
  last_status = 0
  retry_after_delay = nil
  max_attempts = [[@config.max_retries.to_i, 1].max, 5].min

  (1..max_attempts).each do |attempt|
    retry_after_delay = nil
    begin
      req = Net::HTTP::Post.new(uri.request_uri, {
        "Content-Type"   => "application/json",
        "X-AllStak-Key"  => @api_key,
        "User-Agent"     => "allstak-ruby/#{AllStak::VERSION}"
      })
      req.body = wire_payload
      @logger.debug("[AllStak] POST #{path} attempt=#{attempt}") if @config.debug

      resp = http.request(req)
      last_status = resp.code.to_i
      body = resp.body.to_s

      if last_status == 401
        @disabled = true
        @logger.warn("[AllStak] SDK disabled: invalid API key (401). No further events will be sent.")
        raise AllStakAuthError, "Invalid API key"
      end

      return [last_status, body] if NON_RETRYABLE_STATUSES.include?(last_status)
      return [last_status, body] if last_status < 400

      # 429 (rate limited) / 503 (unavailable) → honor Retry-After when present.
      if RETRY_AFTER_STATUSES.include?(last_status)
        parsed = parse_retry_after(resp["Retry-After"])
        retry_after_delay = parsed if parsed > 0
      end

      # 5xx / 429 → retry
    rescue AllStakAuthError
      raise
    rescue Net::OpenTimeout, Net::ReadTimeout, Errno::ECONNREFUSED, Errno::ECONNRESET,
           SocketError, EOFError, IOError => e
      last_exc = e
      @logger.debug("[AllStak] transport error attempt=#{attempt}: #{e.class}: #{e.message}") if @config.debug
    rescue => e
      last_exc = e
      @logger.debug("[AllStak] unexpected transport error attempt=#{attempt}: #{e.class}: #{e.message}") if @config.debug
    end

    if attempt < max_attempts
      if retry_after_delay
        # Server told us how long to wait; honor it (already clamped).
        sleep(retry_after_delay)
      else
        delay = BACKOFF_DELAYS[[attempt - 1, BACKOFF_DELAYS.length - 1].min]
        delay += rand * 0.5
        sleep(delay)
      end
    end
  end

  raise AllStakTransportError,
        "All #{max_attempts} attempts failed for POST #{path}. last_status=#{last_status} last_error=#{last_exc&.message}"
end

#scrub_optionsObject

Sanitizer options derived from config. Guarded with respond_to? so a bare/stub config (some transport unit tests) still scrubs with safe defaults: PII off (privacy-safe), no extra denylist.



200
201
202
203
204
205
# File 'lib/allstak/transport/http_transport.rb', line 200

def scrub_options
  {
    send_default_pii: @config.respond_to?(:send_default_pii?) ? @config.send_default_pii? : false,
    extra_denylist:   (@config.respond_to?(:extra_denylist) ? @config.extra_denylist : nil)
  }
end

#serialize_payload(payload) ⇒ Object



189
190
191
192
193
194
195
# File 'lib/allstak/transport/http_transport.rb', line 189

def serialize_payload(payload)
  parsed = payload.is_a?(String) ? JSON.parse(payload) : payload
  JSON.generate(AllStak::Sanitizer.scrub(parsed, **scrub_options))
rescue StandardError => san_err
  @logger.warn("[AllStak] sanitizer failed; dropping payload: #{san_err.class}: #{san_err.message}")
  raise AllStakTransportError, "sanitizer failed; payload dropped"
end