Class: LlmCostTracker::Storage::ActiveRecordIngestor
- Inherits:
-
Object
- Object
- LlmCostTracker::Storage::ActiveRecordIngestor
- Defined in:
- lib/llm_cost_tracker/storage/active_record_ingestor.rb
Constant Summary collapse
- INTERVAL_SECONDS =
0.25- IDLE_INTERVAL_SECONDS =
1.0- MAX_IDLE_INTERVAL_SECONDS =
5.0- LEASE_SECONDS =
10- FLUSH_TIMEOUT_SECONDS =
10
Class Method Summary collapse
- .ensure_started ⇒ Object
- .flush!(timeout: FLUSH_TIMEOUT_SECONDS, require_lease: false) ⇒ Object
- .ingest_once(require_lease: true) ⇒ Object
- .reset! ⇒ Object
- .shutdown!(timeout: FLUSH_TIMEOUT_SECONDS, drain: true) ⇒ Object
Class Method Details
.ensure_started ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/llm_cost_tracker/storage/active_record_ingestor.rb', line 21 def ensure_started return unless ActiveRecordInbox.enabled? thread = mutex.synchronize do reset_after_fork! unless @thread&.alive? @stop_requested = false generation = next_generation @thread = Thread.new { run(generation) } @thread.name = "llm_cost_tracker_ingestor" if @thread.respond_to?(:name=) @thread.report_on_exception = false if @thread.respond_to?(:report_on_exception=) end @thread end wake_thread(thread) end |
.flush!(timeout: FLUSH_TIMEOUT_SECONDS, require_lease: false) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/llm_cost_tracker/storage/active_record_ingestor.rb', line 38 def flush!(timeout: FLUSH_TIMEOUT_SECONDS, require_lease: false) return true unless ActiveRecordInbox.enabled? deadline = Time.now.utc + timeout loop do return true unless pending_events? return false if Time.now.utc >= deadline processed = ingest_once(require_lease: require_lease) return false if processed.zero? && !sleep_until_next_flush(deadline) end end |
.ingest_once(require_lease: true) ⇒ Object
81 82 83 84 85 86 87 88 89 90 |
# File 'lib/llm_cost_tracker/storage/active_record_ingestor.rb', line 81 def ingest_once(require_lease: true) return 0 unless ActiveRecordInbox.enabled? return 0 unless claimable_events? return 0 if require_lease && !acquire_lease inbox_batch.ingest rescue StandardError => e handle_error(e) 0 end |
.reset! ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/llm_cost_tracker/storage/active_record_ingestor.rb', line 68 def reset! thread = mutex.synchronize do @stop_requested = true next_generation thread = @thread @thread = nil @pid = nil @identity = nil thread end wake_thread(thread) end |
.shutdown!(timeout: FLUSH_TIMEOUT_SECONDS, drain: true) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/llm_cost_tracker/storage/active_record_ingestor.rb', line 51 def shutdown!(timeout: FLUSH_TIMEOUT_SECONDS, drain: true) ActiveRecordInbox.reset! thread = mutex.synchronize do @stop_requested = true next_generation @thread end wake_thread(thread) thread&.join([timeout, 1].min) drain ? flush!(timeout: timeout, require_lease: true) : true rescue StandardError => e handle_error(e) false ensure mutex.synchronize { @thread = nil if @thread.equal?(thread) } end |