Class: TgVizor::DiskQueue
- Inherits:
-
Object
- Object
- TgVizor::DiskQueue
- Defined in:
- lib/tgvizor/disk_queue.rb
Overview
JSONL-based disk queue for persisting events when the ingestion API is unreachable.
Append-only writes during normal operation. On drain, reads the whole file and truncates. Corrupt lines (e.g. from a crash mid-write) are silently skipped — the SDK must never crash the host bot.
Capped at 10 MB to bound disk usage during prolonged outages; further events are silently dropped once the cap is reached.
Constant Summary collapse
- DEFAULT_PATH =
File.join(".tgvizor", "events.jsonl")
- MAX_FILE_BYTES =
10 MB
10 * 1024 * 1024
Instance Attribute Summary collapse
-
#path ⇒ Object
readonly
Returns the value of attribute path.
Instance Method Summary collapse
-
#any? ⇒ Boolean
O(1) check — used by the flush loop to skip empty disk queues without paying the file-scan cost of ‘size`.
-
#append(event) ⇒ Object
Append a single event.
-
#drain_all ⇒ Object
Read every event off disk, truncate the file, and return them.
- #file_size ⇒ Object
-
#initialize(path: nil) ⇒ DiskQueue
constructor
A new instance of DiskQueue.
-
#size ⇒ Object
Approximate number of events currently on disk.
Constructor Details
#initialize(path: nil) ⇒ DiskQueue
Returns a new instance of DiskQueue.
19 20 21 22 |
# File 'lib/tgvizor/disk_queue.rb', line 19 def initialize(path: nil) @path = path || File.join(Dir.pwd, DEFAULT_PATH) @mutex = Mutex.new end |
Instance Attribute Details
#path ⇒ Object (readonly)
Returns the value of attribute path.
86 87 88 |
# File 'lib/tgvizor/disk_queue.rb', line 86 def path @path end |
Instance Method Details
#any? ⇒ Boolean
O(1) check — used by the flush loop to skip empty disk queues without paying the file-scan cost of ‘size`.
72 73 74 |
# File 'lib/tgvizor/disk_queue.rb', line 72 def any? file_size.positive? end |
#append(event) ⇒ Object
Append a single event. Silently no-ops if the cap is reached or the write fails — observability code must not crash the host bot.
26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/tgvizor/disk_queue.rb', line 26 def append(event) @mutex.synchronize do return if file_size >= MAX_FILE_BYTES FileUtils.mkdir_p(File.dirname(@path)) File.open(@path, "a") do |f| f.write(JSON.generate(event)) f.write("\n") end end rescue StandardError # disk write failed — drop the event rather than raise end |
#drain_all ⇒ Object
Read every event off disk, truncate the file, and return them. Corrupt lines are skipped.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/tgvizor/disk_queue.rb', line 42 def drain_all @mutex.synchronize do return [] unless File.exist?(@path) content = File.read(@path) File.write(@path, "") # truncate immediately so concurrent drains don't double-read content.each_line.filter_map do |line| stripped = line.strip next if stripped.empty? begin JSON.parse(stripped, symbolize_names: true) rescue JSON::ParserError nil end end end rescue StandardError [] end |
#file_size ⇒ Object
64 65 66 67 68 |
# File 'lib/tgvizor/disk_queue.rb', line 64 def file_size File.exist?(@path) ? File.size(@path) : 0 rescue StandardError 0 end |
#size ⇒ Object
Approximate number of events currently on disk. Walks the file — only call this when you actually need the count, never as a hot-path guard.
78 79 80 81 82 83 84 |
# File 'lib/tgvizor/disk_queue.rb', line 78 def size return 0 unless File.exist?(@path) File.foreach(@path).count { |l| !l.strip.empty? } rescue StandardError 0 end |