Class: AllStak::Transport::EventSpool

Inherits:
Object
  • Object
show all
Defined in:
lib/allstak/transport/event_spool.rb

Overview

Filesystem spool for un-sent telemetry envelopes (offline / outage / shutdown durability). One JSON file per envelope so a partially written file can be discarded without losing the rest of the queue.

Wire-on-disk shape (already-scrubbed payload):

{ "v" => 1, "path" => "/ingest/v1/errors", "payload" => {...},
  "ts" => 1700000000.123 }

Sentry parity: this is the Ruby analogue of Sentry’s offline/transport cache dir — events that cannot be delivered are persisted (PII-scrubbed) and replayed on the next init.

HARD invariants:

* Fail-open everywhere. A read-only FS, a sandboxed/serverless runtime,
  or any IO error degrades silently to in-memory behavior — never
  raises, never blocks capture or init.
* Bounded by COUNT, BYTES, and AGE. When over a limit the OLDEST entry
  is dropped first (files are time-ordered by an embedded counter).
* Caller scrubs BEFORE persist — the spool stores bytes verbatim and
  does no scrubbing itself.

Constant Summary collapse

FILE_PREFIX =
"allstak-evt-"
FILE_SUFFIX =
".json"
FORMAT_VERSION =
1
DEFAULT_MAX_ENTRIES =

Sane server defaults: a few MB / ~100 envelopes / 48h.

100
DEFAULT_MAX_BYTES =

4 MiB

4 * 1024 * 1024
DEFAULT_MAX_AGE_S =

48 hours

48 * 60 * 60

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(dir: nil, max_entries: DEFAULT_MAX_ENTRIES, max_bytes: DEFAULT_MAX_BYTES, max_age_s: DEFAULT_MAX_AGE_S, logger: nil) ⇒ EventSpool

Returns a new instance of EventSpool.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/allstak/transport/event_spool.rb', line 42

def initialize(dir: nil, max_entries: DEFAULT_MAX_ENTRIES,
               max_bytes: DEFAULT_MAX_BYTES, max_age_s: DEFAULT_MAX_AGE_S,
               logger: nil)
  @logger      = logger
  @max_entries = [max_entries.to_i, 1].max
  @max_bytes   = [max_bytes.to_i, 1].max
  @max_age_s   = max_age_s.to_i
  @mutex       = Mutex.new
  # Monotonic-ish ordering within a process so oldest-first eviction holds
  # even when two writes land in the same wall-clock millisecond.
  @seq         = 0
  @dir         = resolve_dir(dir)
  @available   = ensure_dir(@dir)
end

Instance Attribute Details

#availableObject (readonly)

Returns the value of attribute available.



40
41
42
# File 'lib/allstak/transport/event_spool.rb', line 40

def available
  @available
end

#dirObject (readonly)

Returns the value of attribute dir.



40
41
42
# File 'lib/allstak/transport/event_spool.rb', line 40

def dir
  @dir
end

Instance Method Details

#available?Boolean

Returns:

  • (Boolean)


57
58
59
# File 'lib/allstak/transport/event_spool.rb', line 57

def available?
  @available
end

#eachObject

Yield each persisted entry (oldest first) as [path, payload, handle]. The caller decides per entry whether to #remove(handle) it (delivered or permanently undeliverable) or leave it for a future drain. Stale entries past max-age are dropped (and never yielded). Fail-open: yields nothing on any error.



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/allstak/transport/event_spool.rb', line 107

def each
  return unless @available
  return unless block_given?

  entries =
    @mutex.synchronize do
      drop_stale_locked
      sorted_files_locked
    end

  entries.each do |file|
    record =
      begin
        JSON.parse(File.read(file))
      rescue StandardError => e
        @logger&.debug("[AllStak] spool entry unreadable, dropping: #{e.class}: #{e.message}")
        remove(file)
        next
      end
    path    = record.is_a?(Hash) ? record["path"] : nil
    payload = record.is_a?(Hash) ? record["payload"] : nil
    if path.to_s.empty? || payload.nil?
      remove(file)
      next
    end
    yield(path, payload, file)
  end
end

#persist(path, payload) ⇒ Object

Persist one already-scrubbed envelope. ‘payload` MUST be a Ruby object (Hash/Array) that has ALREADY been run through the PII sanitizer by the caller. Fail-open: returns true on write, false on any problem.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/allstak/transport/event_spool.rb', line 64

def persist(path, payload)
  return false unless @available

  @mutex.synchronize do
    begin
      seq = (@seq += 1)
      record = {
        "v"       => FORMAT_VERSION,
        "path"    => path.to_s,
        "payload" => payload,
        "ts"      => Time.now.to_f,
        "seq"     => seq
      }
      json = JSON.generate(record)
      return false if json.bytesize > @max_bytes # single oversized entry — skip

      name = format("%s%013d-%06d-%s%s",
                    FILE_PREFIX, (Time.now.to_f * 1000).to_i, seq,
                    SecureRandom.hex(4), FILE_SUFFIX)
      tmp  = File.join(@dir, "." + name + ".tmp")
      dest = File.join(@dir, name)
      File.open(tmp, "wb") { |f| f.write(json) }
      File.rename(tmp, dest) # atomic publish
      enforce_bounds_locked
      true
    rescue StandardError => e
      @logger&.debug("[AllStak] spool persist failed: #{e.class}: #{e.message}")
      # Best-effort cleanup of a stray temp file.
      begin
        File.unlink(tmp) if tmp && File.exist?(tmp)
      rescue StandardError
        nil
      end
      false
    end
  end
end

#remove(handle) ⇒ Object

Delete one entry by its handle (the file path yielded by #each). Fail-open.



138
139
140
141
142
143
# File 'lib/allstak/transport/event_spool.rb', line 138

def remove(handle)
  return unless @available
  File.unlink(handle) if handle && File.exist?(handle)
rescue StandardError => e
  @logger&.debug("[AllStak] spool remove failed: #{e.class}: #{e.message}")
end

#sizeObject

Current persisted entry count (best-effort; for tests/diagnostics).



146
147
148
149
150
151
# File 'lib/allstak/transport/event_spool.rb', line 146

def size
  return 0 unless @available
  @mutex.synchronize { sorted_files_locked.length }
rescue StandardError
  0
end