Class: LlmCostTracker::Ingestion::Worker
- Inherits:
-
Object
- Object
- LlmCostTracker::Ingestion::Worker
- Defined in:
- lib/llm_cost_tracker/ingestion/worker.rb
Constant Summary collapse
- INTERVAL_SECONDS =
0.25- IDLE_INTERVAL_SECONDS =
1.0- MAX_IDLE_INTERVAL_SECONDS =
5.0- LEASE_SECONDS =
10- FLUSH_TIMEOUT_SECONDS =
10
Class Method Summary collapse
- .ensure_started ⇒ Object
- .flush!(timeout: nil, require_lease: false) ⇒ Object
- .flush_timeout_seconds(timeout) ⇒ Object
- .ingest_once(require_lease: true) ⇒ Object
- .reset! ⇒ Object
- .shutdown!(timeout: nil, drain: true) ⇒ Object
Class Method Details
.ensure_started ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/llm_cost_tracker/ingestion/worker.rb', line 20 def ensure_started thread = mutex.synchronize do reset_after_fork! unless @thread&.alive? @stop_requested = false @generation = @generation.to_i + 1 generation = @generation @thread = Thread.new { run(generation) } @thread.name = "llm_cost_tracker_ingestor" @thread.report_on_exception = false end @thread end wake_thread(thread) end |
.flush!(timeout: nil, require_lease: false) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/llm_cost_tracker/ingestion/worker.rb', line 36 def flush!(timeout: nil, require_lease: false) Ingestion.ensure_current_schema! deadline = Time.now.utc + flush_timeout_seconds(timeout) loop do return true unless Ingestion::Batch.new(identity: identity).pending? return false if Time.now.utc >= deadline processed = ingest_once(require_lease: require_lease) next unless processed.zero? duration = [INTERVAL_SECONDS, deadline - Time.now.utc].min return false unless duration.positive? sleep(duration) end end |
.flush_timeout_seconds(timeout) ⇒ Object
84 85 86 87 88 89 |
# File 'lib/llm_cost_tracker/ingestion/worker.rb', line 84 def flush_timeout_seconds(timeout) numeric = Float(timeout, exception: false) return FLUSH_TIMEOUT_SECONDS unless numeric&.finite? && numeric.positive? numeric end |
.ingest_once(require_lease: true) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/llm_cost_tracker/ingestion/worker.rb', line 91 def ingest_once(require_lease: true) Ingestion.ensure_current_schema! batch = Ingestion::Batch.new(identity: identity) return 0 unless batch.claimable? return 0 if require_lease && !Ingestion::LeaseClaim.new(identity: identity, seconds: LEASE_SECONDS).acquire batch.ingest rescue StandardError => e handle_error(e) 0 end |
.reset! ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/llm_cost_tracker/ingestion/worker.rb', line 71 def reset! thread = mutex.synchronize do @stop_requested = true @generation = @generation.to_i + 1 thread = @thread @thread = nil @pid = nil @identity = nil thread end wake_thread(thread) end |
.shutdown!(timeout: nil, drain: true) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/llm_cost_tracker/ingestion/worker.rb', line 54 def shutdown!(timeout: nil, drain: true) timeout ||= FLUSH_TIMEOUT_SECONDS thread = mutex.synchronize do @stop_requested = true @generation = @generation.to_i + 1 @thread end wake_thread(thread) thread&.join([timeout, 1].min) drain ? flush!(timeout: timeout, require_lease: true) : true rescue StandardError => e handle_error(e) false ensure mutex.synchronize { @thread = nil if @thread.equal?(thread) } end |