Class: LlmCostTracker::Ingestion::Worker
- Inherits:
-
Object
- Object
- LlmCostTracker::Ingestion::Worker
- Defined in:
- lib/llm_cost_tracker/ingestion/worker.rb
Constant Summary collapse
- INTERVAL_SECONDS =
0.25- IDLE_INTERVAL_SECONDS =
1.0- MAX_IDLE_INTERVAL_SECONDS =
5.0- LEASE_SECONDS =
10- FLUSH_TIMEOUT_SECONDS =
10
Class Method Summary collapse
- .ensure_started ⇒ Object
- .flush!(timeout: FLUSH_TIMEOUT_SECONDS, require_lease: false) ⇒ Object
- .ingest_once(require_lease: true) ⇒ Object
- .reset! ⇒ Object
- .shutdown!(timeout: FLUSH_TIMEOUT_SECONDS, drain: true) ⇒ Object
Class Method Details
.ensure_started ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/llm_cost_tracker/ingestion/worker.rb', line 20 def ensure_started thread = mutex.synchronize do reset_after_fork! unless @thread&.alive? @stop_requested = false @generation = @generation.to_i + 1 generation = @generation @thread = Thread.new { run(generation) } @thread.name = "llm_cost_tracker_ingestor" if @thread.respond_to?(:name=) @thread.report_on_exception = false if @thread.respond_to?(:report_on_exception=) end @thread end wake_thread(thread) end |
.flush!(timeout: FLUSH_TIMEOUT_SECONDS, require_lease: false) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/llm_cost_tracker/ingestion/worker.rb', line 36 def flush!(timeout: FLUSH_TIMEOUT_SECONDS, require_lease: false) Ingestion.ensure_current_schema! deadline = Time.now.utc + timeout loop do return true unless Ingestion::Batch.new(identity: identity).pending? return false if Time.now.utc >= deadline processed = ingest_once(require_lease: require_lease) next unless processed.zero? duration = [INTERVAL_SECONDS, deadline - Time.now.utc].min return false unless duration.positive? sleep(duration) end end |
.ingest_once(require_lease: true) ⇒ Object
83 84 85 86 87 88 89 90 91 92 |
# File 'lib/llm_cost_tracker/ingestion/worker.rb', line 83 def ingest_once(require_lease: true) batch = Ingestion::Batch.new(identity: identity) return 0 unless batch.claimable? return 0 if require_lease && !Ingestion::LeaseClaim.new(identity: identity, seconds: LEASE_SECONDS).acquire batch.ingest rescue StandardError => e handle_error(e) 0 end |
.reset! ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/llm_cost_tracker/ingestion/worker.rb', line 70 def reset! thread = mutex.synchronize do @stop_requested = true @generation = @generation.to_i + 1 thread = @thread @thread = nil @pid = nil @identity = nil thread end wake_thread(thread) end |
.shutdown!(timeout: FLUSH_TIMEOUT_SECONDS, drain: true) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/llm_cost_tracker/ingestion/worker.rb', line 54 def shutdown!(timeout: FLUSH_TIMEOUT_SECONDS, drain: true) thread = mutex.synchronize do @stop_requested = true @generation = @generation.to_i + 1 @thread end wake_thread(thread) thread&.join([timeout, 1].min) drain ? flush!(timeout: timeout, require_lease: true) : true rescue StandardError => e handle_error(e) false ensure mutex.synchronize { @thread = nil if @thread.equal?(thread) } end |