Class: TgVizor::DiskQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/tgvizor/disk_queue.rb

Overview

JSONL-based disk queue for persisting events when the ingestion API is unreachable.

Append-only writes during normal operation. On drain, reads the whole file and truncates. Corrupt lines (e.g. from a crash mid-write) are silently skipped — the SDK must never crash the host bot.

Capped at 10 MB to bound disk usage during prolonged outages; further events are silently dropped once the cap is reached.

Constant Summary collapse

DEFAULT_PATH =
File.join(".tgvizor", "events.jsonl")
MAX_FILE_BYTES =

10 MB

10 * 1024 * 1024

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path: nil) ⇒ DiskQueue

Returns a new instance of DiskQueue.



19
20
21
22
# File 'lib/tgvizor/disk_queue.rb', line 19

def initialize(path: nil)
  @path  = path || File.join(Dir.pwd, DEFAULT_PATH)
  @mutex = Mutex.new
end

Instance Attribute Details

#pathObject (readonly)

Returns the value of attribute path.



86
87
88
# File 'lib/tgvizor/disk_queue.rb', line 86

def path
  @path
end

Instance Method Details

#any?Boolean

O(1) check — used by the flush loop to skip empty disk queues without paying the file-scan cost of ‘size`.

Returns:

  • (Boolean)


72
73
74
# File 'lib/tgvizor/disk_queue.rb', line 72

def any?
  file_size.positive?
end

#append(event) ⇒ Object

Append a single event. Silently no-ops if the cap is reached or the write fails — observability code must not crash the host bot.



26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/tgvizor/disk_queue.rb', line 26

def append(event)
  @mutex.synchronize do
    return if file_size >= MAX_FILE_BYTES

    FileUtils.mkdir_p(File.dirname(@path))
    File.open(@path, "a") do |f|
      f.write(JSON.generate(event))
      f.write("\n")
    end
  end
rescue StandardError
  # disk write failed — drop the event rather than raise
end

#drain_allObject

Read every event off disk, truncate the file, and return them. Corrupt lines are skipped.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/tgvizor/disk_queue.rb', line 42

def drain_all
  @mutex.synchronize do
    return [] unless File.exist?(@path)

    content = File.read(@path)
    File.write(@path, "") # truncate immediately so concurrent drains don't double-read

    content.each_line.filter_map do |line|
      stripped = line.strip
      next if stripped.empty?

      begin
        JSON.parse(stripped, symbolize_names: true)
      rescue JSON::ParserError
        nil
      end
    end
  end
rescue StandardError
  []
end

#file_sizeObject



64
65
66
67
68
# File 'lib/tgvizor/disk_queue.rb', line 64

def file_size
  File.exist?(@path) ? File.size(@path) : 0
rescue StandardError
  0
end

#sizeObject

Approximate number of events currently on disk. Walks the file — only call this when you actually need the count, never as a hot-path guard.



78
79
80
81
82
83
84
# File 'lib/tgvizor/disk_queue.rb', line 78

def size
  return 0 unless File.exist?(@path)

  File.foreach(@path).count { |l| !l.strip.empty? }
rescue StandardError
  0
end