Class: LlmCostTracker::Ingestion::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/llm_cost_tracker/ingestion/worker.rb

Constant Summary collapse

INTERVAL_SECONDS =
0.25
IDLE_INTERVAL_SECONDS =
1.0
MAX_IDLE_INTERVAL_SECONDS =
5.0
LEASE_SECONDS =
10
FLUSH_TIMEOUT_SECONDS =
10
MUTEX =
Mutex.new

Class Method Summary collapse

Class Method Details

.ensure_startedObject



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/llm_cost_tracker/ingestion/worker.rb', line 22

def ensure_started
  return unless Ingestion.async?

  thread = MUTEX.synchronize do
    reset_after_fork!
    break @thread if @stop_requested || @thread&.alive?

    @generation = @generation.to_i + 1
    generation = @generation
    @thread = Thread.new { run(generation) }
    @thread.name = "llm_cost_tracker_ingestor"
    @thread.report_on_exception = false
    @thread
  end
  wake_thread(thread)
end

.flush!(timeout: nil, require_lease: false) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/llm_cost_tracker/ingestion/worker.rb', line 39

def flush!(timeout: nil, require_lease: false)
  return true unless Ingestion.async?

  Ingestion.ensure_current_schema!
  MUTEX.synchronize { reset_after_fork! }

  deadline = Time.now.utc + flush_timeout_seconds(timeout)
  loop do
    return true unless Ingestion::Batch.new(identity: identity).pending?
    return false if Time.now.utc >= deadline

    processed = ingest_once(require_lease: require_lease)
    next unless processed.zero?

    duration = [INTERVAL_SECONDS, deadline - Time.now.utc].min
    return false unless duration.positive?

    sleep(duration)
  end
end

.flush_timeout_seconds(timeout) ⇒ Object



94
95
96
97
98
99
# File 'lib/llm_cost_tracker/ingestion/worker.rb', line 94

def flush_timeout_seconds(timeout)
  numeric = Float(timeout, exception: false)
  return FLUSH_TIMEOUT_SECONDS unless numeric&.finite? && numeric.positive?

  numeric
end

.ingest_once(require_lease: true) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/llm_cost_tracker/ingestion/worker.rb', line 101

def ingest_once(require_lease: true)
  Ingestion.ensure_current_schema!
  MUTEX.synchronize { reset_after_fork! }
  batch = Ingestion::Batch.new(identity: identity)
  return 0 unless batch.claimable?
  return 0 if require_lease && !Ingestion::LeaseClaim.new(identity: identity, seconds: LEASE_SECONDS).acquire

  batch.ingest
rescue StandardError => e
  handle_error(e)
  0
end

.reset!Object



81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/llm_cost_tracker/ingestion/worker.rb', line 81

def reset!
  thread = MUTEX.synchronize do
    @stop_requested = false
    @generation = @generation.to_i + 1
    thread = @thread
    @thread = nil
    @pid = nil
    @identity = nil
    thread
  end
  wake_thread(thread)
end

.shutdown!(timeout: nil, drain: true) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/llm_cost_tracker/ingestion/worker.rb', line 60

def shutdown!(timeout: nil, drain: true)
  return true unless Ingestion.async?

  timeout ||= FLUSH_TIMEOUT_SECONDS
  thread = MUTEX.synchronize do
    @stop_requested = true
    @generation = @generation.to_i + 1
    @thread
  end
  wake_thread(thread)
  thread&.join(timeout)
  drain ? flush!(timeout: timeout, require_lease: true) : true
rescue StandardError => e
  handle_error(e)
  false
ensure
  MUTEX.synchronize do
    @thread = nil if @thread.equal?(thread) && !thread&.alive?
  end
end