Class: AllStak::Transport::EventSpool
- Inherits:
-
Object
- Object
- AllStak::Transport::EventSpool
- Defined in:
- lib/allstak/transport/event_spool.rb
Overview
Filesystem spool for un-sent telemetry envelopes (offline / outage / shutdown durability). One JSON file per envelope so a partially written file can be discarded without losing the rest of the queue.
Wire-on-disk shape (already-scrubbed payload):
{ "v" => 1, "path" => "/ingest/v1/errors", "payload" => {...},
"ts" => 1700000000.123 }
Sentry parity: this is the Ruby analogue of Sentry’s offline/transport cache dir — events that cannot be delivered are persisted (PII-scrubbed) and replayed on the next init.
HARD invariants:
* Fail-open everywhere. A read-only FS, a sandboxed/serverless runtime,
or any IO error degrades silently to in-memory behavior — never
raises, never blocks capture or init.
* Bounded by COUNT, BYTES, and AGE. When over a limit the OLDEST entry
is dropped first (files are time-ordered by an embedded counter).
* Caller scrubs BEFORE persist — the spool stores bytes verbatim and
does no scrubbing itself.
Constant Summary collapse
- FILE_PREFIX =
"allstak-evt-"- FILE_SUFFIX =
".json"- FORMAT_VERSION =
1- DEFAULT_MAX_ENTRIES =
Sane server defaults: a few MB / ~100 envelopes / 48h.
100- DEFAULT_MAX_BYTES =
4 MiB
4 * 1024 * 1024
- DEFAULT_MAX_AGE_S =
48 hours
48 * 60 * 60
Instance Attribute Summary collapse
-
#available ⇒ Object
readonly
Returns the value of attribute available.
-
#dir ⇒ Object
readonly
Returns the value of attribute dir.
Instance Method Summary collapse
- #available? ⇒ Boolean
-
#each ⇒ Object
Yield each persisted entry (oldest first) as [path, payload, handle].
-
#initialize(dir: nil, max_entries: DEFAULT_MAX_ENTRIES, max_bytes: DEFAULT_MAX_BYTES, max_age_s: DEFAULT_MAX_AGE_S, logger: nil) ⇒ EventSpool
constructor
A new instance of EventSpool.
-
#persist(path, payload) ⇒ Object
Persist one already-scrubbed envelope.
-
#remove(handle) ⇒ Object
Delete one entry by its handle (the file path yielded by #each).
-
#size ⇒ Object
Current persisted entry count (best-effort; for tests/diagnostics).
Constructor Details
#initialize(dir: nil, max_entries: DEFAULT_MAX_ENTRIES, max_bytes: DEFAULT_MAX_BYTES, max_age_s: DEFAULT_MAX_AGE_S, logger: nil) ⇒ EventSpool
Returns a new instance of EventSpool.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/allstak/transport/event_spool.rb', line 42 def initialize(dir: nil, max_entries: DEFAULT_MAX_ENTRIES, max_bytes: DEFAULT_MAX_BYTES, max_age_s: DEFAULT_MAX_AGE_S, logger: nil) @logger = logger @max_entries = [max_entries.to_i, 1].max @max_bytes = [max_bytes.to_i, 1].max @max_age_s = max_age_s.to_i @mutex = Mutex.new # Monotonic-ish ordering within a process so oldest-first eviction holds # even when two writes land in the same wall-clock millisecond. @seq = 0 @dir = resolve_dir(dir) @available = ensure_dir(@dir) end |
Instance Attribute Details
#available ⇒ Object (readonly)
Returns the value of attribute available.
40 41 42 |
# File 'lib/allstak/transport/event_spool.rb', line 40 def available @available end |
#dir ⇒ Object (readonly)
Returns the value of attribute dir.
40 41 42 |
# File 'lib/allstak/transport/event_spool.rb', line 40 def dir @dir end |
Instance Method Details
#available? ⇒ Boolean
57 58 59 |
# File 'lib/allstak/transport/event_spool.rb', line 57 def available? @available end |
#each ⇒ Object
Yield each persisted entry (oldest first) as [path, payload, handle]. The caller decides per entry whether to #remove(handle) it (delivered or permanently undeliverable) or leave it for a future drain. Stale entries past max-age are dropped (and never yielded). Fail-open: yields nothing on any error.
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/allstak/transport/event_spool.rb', line 107 def each return unless @available return unless block_given? entries = @mutex.synchronize do drop_stale_locked sorted_files_locked end entries.each do |file| record = begin JSON.parse(File.read(file)) rescue StandardError => e @logger&.debug("[AllStak] spool entry unreadable, dropping: #{e.class}: #{e.}") remove(file) next end path = record.is_a?(Hash) ? record["path"] : nil payload = record.is_a?(Hash) ? record["payload"] : nil if path.to_s.empty? || payload.nil? remove(file) next end yield(path, payload, file) end end |
#persist(path, payload) ⇒ Object
Persist one already-scrubbed envelope. ‘payload` MUST be a Ruby object (Hash/Array) that has ALREADY been run through the PII sanitizer by the caller. Fail-open: returns true on write, false on any problem.
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/allstak/transport/event_spool.rb', line 64 def persist(path, payload) return false unless @available @mutex.synchronize do begin seq = (@seq += 1) record = { "v" => FORMAT_VERSION, "path" => path.to_s, "payload" => payload, "ts" => Time.now.to_f, "seq" => seq } json = JSON.generate(record) return false if json.bytesize > @max_bytes # single oversized entry — skip name = format("%s%013d-%06d-%s%s", FILE_PREFIX, (Time.now.to_f * 1000).to_i, seq, SecureRandom.hex(4), FILE_SUFFIX) tmp = File.join(@dir, "." + name + ".tmp") dest = File.join(@dir, name) File.open(tmp, "wb") { |f| f.write(json) } File.rename(tmp, dest) # atomic publish enforce_bounds_locked true rescue StandardError => e @logger&.debug("[AllStak] spool persist failed: #{e.class}: #{e.}") # Best-effort cleanup of a stray temp file. begin File.unlink(tmp) if tmp && File.exist?(tmp) rescue StandardError nil end false end end end |
#remove(handle) ⇒ Object
Delete one entry by its handle (the file path yielded by #each). Fail-open.
138 139 140 141 142 143 |
# File 'lib/allstak/transport/event_spool.rb', line 138 def remove(handle) return unless @available File.unlink(handle) if handle && File.exist?(handle) rescue StandardError => e @logger&.debug("[AllStak] spool remove failed: #{e.class}: #{e.}") end |
#size ⇒ Object
Current persisted entry count (best-effort; for tests/diagnostics).
146 147 148 149 150 151 |
# File 'lib/allstak/transport/event_spool.rb', line 146 def size return 0 unless @available @mutex.synchronize { sorted_files_locked.length } rescue StandardError 0 end |