Class: AllStak::Transport::HttpTransport

Inherits:
Object
  • Object
show all
Defined in:
lib/allstak/transport/http_transport.rb

Overview

HTTP transport with retry/backoff and 401-disable.

Contract:

connect timeout = 3s   · read timeout = 3s
backoff         = 1s → 2s → 4s → 8s (+ jitter 0-500ms)
max attempts    = 5
401             → disable SDK
4xx except 429  → no retry
5xx / network   → retry

Constant Summary collapse

NON_RETRYABLE_STATUSES =
[400, 401, 403, 404, 422].freeze
BACKOFF_DELAYS =
[1.0, 2.0, 4.0, 8.0].freeze
RETRY_AFTER_STATUSES =

Statuses for which we honor a server-provided Retry-After header.

[429, 503].freeze
MAX_RETRY_AFTER =

Upper bound on any honored Retry-After delay, in seconds.

300.0
COMPRESSION_THRESHOLD_BYTES =
1024
NON_PERSISTABLE_PATHS =

Session lifecycle calls are best-effort LIVE-only — a replayed stale session would skew durations, so they are NEVER spooled to disk.

[
  "/ingest/v1/sessions/start",
  "/ingest/v1/sessions/end"
].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, logger) ⇒ HttpTransport

Returns a new instance of HttpTransport.



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/allstak/transport/http_transport.rb', line 41

def initialize(config, logger)
  @config = config
  @logger = logger
  @base_url = config.host
  @api_key = config.api_key
  @disabled = false
  @spool = build_spool(config, logger)
  @stats_mutex = Mutex.new
  @sent_count = 0
  @failed_count = 0
  @dropped_count = 0
  @persisted_count = 0
  @replayed_count = 0
  @retry_attempt_count = 0
  @rate_limited_count = 0
  @compressed_count = 0
  @uncompressed_count = 0
  @compression_bytes_saved = 0
end

Instance Attribute Details

#disabledObject (readonly)

Returns the value of attribute disabled.



39
40
41
# File 'lib/allstak/transport/http_transport.rb', line 39

def disabled
  @disabled
end

#spoolObject (readonly)

The offline spool, or nil when disabled / unavailable. Exposed for diagnostics + tests.



63
64
65
# File 'lib/allstak/transport/http_transport.rb', line 63

def spool
  @spool
end

Instance Method Details

#diagnosticsObject



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/allstak/transport/http_transport.rb', line 239

def diagnostics
  snapshot = @stats_mutex.synchronize do
    {
      sent: @sent_count,
      failed: @failed_count,
      dropped: @dropped_count,
      persisted: @persisted_count,
      replayed: @replayed_count,
      retry_attempts: @retry_attempt_count,
      rate_limited: @rate_limited_count,
      compressed: @compressed_count,
      uncompressed: @uncompressed_count,
      compression_bytes_saved: @compression_bytes_saved
    }
  end
  snapshot[:queue_size] = @spool&.size.to_i
  snapshot[:disabled] = disabled?
  snapshot
rescue StandardError
  {
    sent: 0, failed: 0, dropped: 0, persisted: 0, replayed: 0,
    retry_attempts: 0, rate_limited: 0, queue_size: 0,
    compressed: 0, uncompressed: 0, compression_bytes_saved: 0,
    disabled: disabled?
  }
end

#disabled?Boolean

Returns:

  • (Boolean)


131
132
133
# File 'lib/allstak/transport/http_transport.rb', line 131

def disabled?
  @disabled
end

#drain_spoolObject

Replay persisted envelopes through the live transport. An entry is removed only when it is ACCEPTED (2xx) or PERMANENTLY undeliverable (a 4xx that is not 429) — anything else (network error, 5xx, 429) leaves it on disk for a future drain. Honors the existing retry/backoff and the 401-disable circuit breaker. Fully fail-open; never raises.



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/allstak/transport/http_transport.rb', line 102

def drain_spool
  return unless @spool&.available?
  @spool.each do |path, payload, handle|
    break if @disabled
    begin
      status, _ = post(path, payload)
      # post() returns for 2xx and non-retryable 4xx; raises otherwise.
      if status && (status < 400 || (status >= 400 && status != 429))
        increment(:@replayed_count) if status < 400
        increment(:@dropped_count) if status >= 400
        @spool.remove(handle)
      end
    rescue AllStakAuthError
      # 401 disabled the SDK mid-drain: stop, keep remaining entries.
      break
    rescue AllStakTransportError => e
      # Retries exhausted (network/5xx/429): keep the entry, stop draining
      # so we don't hammer a down endpoint.
      @logger.debug("[AllStak] drain stopped (still undeliverable): #{e.message}") if @config.debug
      break
    rescue StandardError => e
      @logger.debug("[AllStak] drain entry error: #{e.class}: #{e.message}") if @config.debug
      break
    end
  end
rescue StandardError => e
  @logger.debug("[AllStak] drain_spool swallowed: #{e.class}: #{e.message}") if @config.debug
end

#parse_retry_after(header, now = Time.now) ⇒ Object

Parse an HTTP ‘Retry-After` header into a non-negative delay in seconds.

Supports both forms from RFC 7231 §7.1.3:

- delta-seconds: an integer number of seconds ("120" → 120.0)
- HTTP-date:     an absolute date; returns the delta from `now`

Returns 0.0 when the header is absent, blank, malformed, or resolves to a non-positive delay (e.g. a date in the past). The result is clamped to MAX_RETRY_AFTER. Pure and side-effect free.



293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/allstak/transport/http_transport.rb', line 293

def parse_retry_after(header, now = Time.now)
  return 0.0 if header.nil?

  value = header.to_s.strip
  return 0.0 if value.empty?

  seconds =
    if value.match?(/\A\d+\z/)
      value.to_i.to_f
    else
      begin
        # HTTP-date (RFC 1123 / RFC 850 / asctime). httpdate raises on junk.
        target = Time.httpdate(value)
        target - now
      rescue ArgumentError
        return 0.0
      end
    end

  return 0.0 if seconds.nil? || seconds <= 0
  [seconds.to_f, MAX_RETRY_AFTER].min
end

#persist_failed(path, payload) ⇒ Object

Persist a payload that could not be delivered. The payload is scrubbed through the SAME PII sanitizer used on the wire BEFORE it touches disk —secrets never get persisted. Fail-open: returns false and never raises. Session lifecycle paths are skipped.



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/allstak/transport/http_transport.rb', line 77

def persist_failed(path, payload)
  return false unless persistable?(path)
  scrubbed =
    begin
      parsed = payload.is_a?(String) ? JSON.parse(payload) : payload
      AllStak::Sanitizer.scrub(parsed, **scrub_options)
    rescue StandardError => e
      @logger.debug("[AllStak] spool scrub failed; not persisting: #{e.class}: #{e.message}") if @config.debug
      increment(:@dropped_count)
      return false
    end
  persisted = @spool.persist(path, scrubbed)
  increment(persisted ? :@persisted_count : :@dropped_count)
  persisted
rescue StandardError => e
  @logger.debug("[AllStak] persist_failed swallowed: #{e.class}: #{e.message}") if @config.debug
  increment(:@dropped_count)
  false
end

#persistable?(path) ⇒ Boolean

True when this path’s telemetry is eligible for the persistent spool. Excludes session lifecycle calls (live-only) and is a no-op when the spool is disabled/unavailable.

Returns:

  • (Boolean)


68
69
70
71
# File 'lib/allstak/transport/http_transport.rb', line 68

def persistable?(path)
  return false unless @spool&.available?
  !NON_PERSISTABLE_PATHS.include?(path.to_s)
end

#post(path, payload) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/allstak/transport/http_transport.rb', line 135

def post(path, payload)
  if @disabled
    increment(:@dropped_count)
    raise AllStakAuthError, "SDK disabled"
  end

  wire_payload =
    begin
      serialize_payload(payload)
    rescue AllStakTransportError
      increment(:@failed_count)
      increment(:@dropped_count)
      raise
    end
  wire_body, compressed, bytes_saved = prepare_body(wire_payload)
  if compressed
    increment(:@compressed_count)
    increment(:@compression_bytes_saved, bytes_saved)
  else
    increment(:@uncompressed_count)
  end

  uri = URI.parse("#{@base_url}#{path}")
  http = Net::HTTP.new(uri.host, uri.port)
  http.use_ssl = (uri.scheme == "https")
  http.open_timeout = @config.connect_timeout
  http.read_timeout = @config.read_timeout

  last_exc = nil
  last_status = 0
  retry_after_delay = nil
  max_attempts = [[@config.max_retries.to_i, 1].max, 5].min

  (1..max_attempts).each do |attempt|
    retry_after_delay = nil
    begin
      req = Net::HTTP::Post.new(uri.request_uri, {
        "Content-Type"   => "application/json",
        "X-AllStak-Key"  => @api_key,
        "User-Agent"     => "allstak-ruby/#{AllStak::VERSION}"
      })
      req["Content-Encoding"] = "gzip" if compressed
      req.body = wire_body
      @logger.debug("[AllStak] POST #{path} attempt=#{attempt}") if @config.debug

      resp = http.request(req)
      last_status = resp.code.to_i
      body = resp.body.to_s

      if last_status == 401
        @disabled = true
        increment(:@failed_count)
        increment(:@dropped_count)
        @logger.warn("[AllStak] SDK disabled: invalid API key (401). No further events will be sent.")
        raise AllStakAuthError, "Invalid API key"
      end

      if NON_RETRYABLE_STATUSES.include?(last_status) ||
         (last_status >= 400 && last_status < 500 && last_status != 429)
        increment(:@dropped_count) if last_status >= 400
        return [last_status, body]
      end
      if last_status < 400
        increment(:@sent_count)
        return [last_status, body]
      end

      # 429 (rate limited) / 503 (unavailable) → honor Retry-After when present.
      if RETRY_AFTER_STATUSES.include?(last_status)
        increment(:@rate_limited_count) if last_status == 429
        parsed = parse_retry_after(resp["Retry-After"])
        retry_after_delay = parsed if parsed > 0
      end

      # 5xx / 429 → retry
    rescue AllStakAuthError
      raise
    rescue Net::OpenTimeout, Net::ReadTimeout, Errno::ECONNREFUSED, Errno::ECONNRESET,
           SocketError, EOFError, IOError => e
      last_exc = e
      @logger.debug("[AllStak] transport error attempt=#{attempt}: #{e.class}: #{e.message}") if @config.debug
    rescue => e
      last_exc = e
      @logger.debug("[AllStak] unexpected transport error attempt=#{attempt}: #{e.class}: #{e.message}") if @config.debug
    end

    if attempt < max_attempts
      increment(:@retry_attempt_count)
      if retry_after_delay
        # Server told us how long to wait; honor it (already clamped).
        sleep(retry_after_delay)
      else
        delay = BACKOFF_DELAYS[[attempt - 1, BACKOFF_DELAYS.length - 1].min]
        delay += rand * 0.5
        sleep(delay)
      end
    end
  end

  increment(:@failed_count)
  raise AllStakTransportError,
        "All #{max_attempts} attempts failed for POST #{path}. last_status=#{last_status} last_error=#{last_exc&.message}"
end

#scrub_optionsObject

Sanitizer options derived from config. Guarded with respond_to? so a bare/stub config (some transport unit tests) still scrubs with safe defaults: PII off (privacy-safe), no extra denylist.



277
278
279
280
281
282
# File 'lib/allstak/transport/http_transport.rb', line 277

def scrub_options
  {
    send_default_pii: @config.respond_to?(:send_default_pii?) ? @config.send_default_pii? : false,
    extra_denylist:   (@config.respond_to?(:extra_denylist) ? @config.extra_denylist : nil)
  }
end

#serialize_payload(payload) ⇒ Object



266
267
268
269
270
271
272
# File 'lib/allstak/transport/http_transport.rb', line 266

def serialize_payload(payload)
  parsed = payload.is_a?(String) ? JSON.parse(payload) : payload
  JSON.generate(AllStak::Sanitizer.scrub(parsed, **scrub_options))
rescue StandardError => san_err
  @logger.warn("[AllStak] sanitizer failed; dropping payload: #{san_err.class}: #{san_err.message}")
  raise AllStakTransportError, "sanitizer failed; payload dropped"
end