Class: LlmCostTracker::Ingestion::Batch

Inherits:
Object
  • Object
show all
Defined in:
lib/llm_cost_tracker/ingestion/batch.rb

Constant Summary collapse

BATCH_SIZE =
100
LOCK_TIMEOUT_SECONDS =
30

Instance Method Summary collapse

Constructor Details

#initialize(identity:) ⇒ Batch

Returns a new instance of Batch.



12
13
14
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 12

def initialize(identity:)
  @identity = identity
end

Instance Method Details

#claimable?Boolean

Returns:

  • (Boolean)


33
34
35
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 33

def claimable?
  claimable_scope(Time.now.utc - LOCK_TIMEOUT_SECONDS).exists?
end

#ingestObject



16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 16

def ingest
  rows = claim
  return 0 if rows.empty?

  valid_rows, events = decode(rows)
  persist(valid_rows, events) if events.any?
  rows.size
rescue StandardError => e
  rows_to_mark = valid_rows&.any? ? valid_rows : rows
  mark_failed(rows_to_mark, e) if rows_to_mark&.any?
  raise
end

#mark_failed(rows, error) ⇒ Object



37
38
39
40
41
42
43
44
45
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 37

def mark_failed(rows, error)
  message = "#{error.class}: #{error.message}".byteslice(0, 1_000)
  now = Time.now.utc
  Ingestion::Event
    .where(id: rows.map(&:id), locked_by: identity)
    .update_all(last_error: message, locked_at: now, locked_by: nil, updated_at: now)
rescue StandardError
  nil
end

#pending?Boolean

Returns:

  • (Boolean)


29
30
31
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 29

def pending?
  Ingestion::Event.where("attempts < ?", Ingestion::Event::MAX_ATTEMPTS).exists?
end