Class: LlmCostTracker::Storage::ActiveRecordIngestor

Inherits:
Object
  • Object
show all
Defined in:
lib/llm_cost_tracker/storage/active_record_ingestor.rb

Constant Summary collapse

INTERVAL_SECONDS =
0.25
IDLE_INTERVAL_SECONDS =
1.0
MAX_IDLE_INTERVAL_SECONDS =
5.0
LEASE_SECONDS =
10
FLUSH_TIMEOUT_SECONDS =
10

Class Method Summary collapse

Class Method Details

.ensure_startedObject



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/llm_cost_tracker/storage/active_record_ingestor.rb', line 21

def ensure_started
  return unless ActiveRecordInbox.enabled?

  thread = mutex.synchronize do
    reset_after_fork!
    unless @thread&.alive?
      @stop_requested = false
      generation = next_generation
      @thread = Thread.new { run(generation) }
      @thread.name = "llm_cost_tracker_ingestor" if @thread.respond_to?(:name=)
      @thread.report_on_exception = false if @thread.respond_to?(:report_on_exception=)
    end
    @thread
  end
  wake_thread(thread)
end

.flush!(timeout: FLUSH_TIMEOUT_SECONDS, require_lease: false) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/llm_cost_tracker/storage/active_record_ingestor.rb', line 38

def flush!(timeout: FLUSH_TIMEOUT_SECONDS, require_lease: false)
  return true unless ActiveRecordInbox.enabled?

  deadline = Time.now.utc + timeout
  loop do
    return true unless pending_events?
    return false if Time.now.utc >= deadline

    processed = ingest_once(require_lease: require_lease)
    return false if processed.zero? && !sleep_until_next_flush(deadline)
  end
end

.ingest_once(require_lease: true) ⇒ Object



81
82
83
84
85
86
87
88
89
90
# File 'lib/llm_cost_tracker/storage/active_record_ingestor.rb', line 81

def ingest_once(require_lease: true)
  return 0 unless ActiveRecordInbox.enabled?
  return 0 unless claimable_events?
  return 0 if require_lease && !acquire_lease

  inbox_batch.ingest
rescue StandardError => e
  handle_error(e)
  0
end

.reset!Object



68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/llm_cost_tracker/storage/active_record_ingestor.rb', line 68

def reset!
  thread = mutex.synchronize do
    @stop_requested = true
    next_generation
    thread = @thread
    @thread = nil
    @pid = nil
    @identity = nil
    thread
  end
  wake_thread(thread)
end

.shutdown!(timeout: FLUSH_TIMEOUT_SECONDS, drain: true) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/llm_cost_tracker/storage/active_record_ingestor.rb', line 51

def shutdown!(timeout: FLUSH_TIMEOUT_SECONDS, drain: true)
  ActiveRecordInbox.reset!
  thread = mutex.synchronize do
    @stop_requested = true
    next_generation
    @thread
  end
  wake_thread(thread)
  thread&.join([timeout, 1].min)
  drain ? flush!(timeout: timeout, require_lease: true) : true
rescue StandardError => e
  handle_error(e)
  false
ensure
  mutex.synchronize { @thread = nil if @thread.equal?(thread) }
end