Class: LlmCostTracker::Ingestion::Batch

Inherits:
Object
  • Object
show all
Defined in:
lib/llm_cost_tracker/ingestion/batch.rb

Constant Summary collapse

BATCH_SIZE =
100
LOCK_TIMEOUT_SECONDS =
30

Instance Method Summary collapse

Constructor Details

#initialize(identity:) ⇒ Batch

Returns a new instance of Batch.



12
13
14
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 12

def initialize(identity:)
  @identity = identity
end

Instance Method Details

#claimable?Boolean

Returns:

  • (Boolean)


33
34
35
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 33

def claimable?
  claimable_scope(Time.now.utc - LOCK_TIMEOUT_SECONDS).exists?
end

#error_message_for(error) ⇒ Object



51
52
53
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 51

def error_message_for(error)
  "#{error.class}: #{error.message}".byteslice(0, 1_000)
end

#ingestObject



16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 16

def ingest
  rows = claim
  return 0 if rows.empty?

  valid_rows, events = decode(rows)
  persist(valid_rows, events) if events.any?
  rows.size
rescue StandardError => e
  rows_to_mark = valid_rows&.any? ? valid_rows : rows
  mark_failed_with_message(rows_to_mark, error_message_for(e)) if rows_to_mark&.any?
  raise
end

#mark_failed_with_message(rows, message) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 37

def mark_failed_with_message(rows, message)
  now = Time.now.utc
  Ingestion::InboxEntry
    .where(id: rows.map(&:id), locked_by: identity)
    .update_all(last_error: message, locked_at: now, locked_by: nil, updated_at: now)
  warn_on_quarantine(rows)
rescue StandardError => e
  LlmCostTracker::Logging.warn(
    "Inbox mark_failed_with_message failed for #{rows.size} rows: #{e.class}: #{e.message} " \
    "(attempted message: #{message.to_s.byteslice(0, 200)})"
  )
  nil
end

#pending?Boolean

Returns:

  • (Boolean)


29
30
31
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 29

def pending?
  Ingestion::InboxEntry.pending.exists?
end

#warn_on_quarantine(rows) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 55

def warn_on_quarantine(rows)
  threshold = Ingestion::InboxEntry::MAX_ATTEMPTS_BEFORE_QUARANTINE
  quarantined = rows.select { |row| row.attempts.to_i + 1 >= threshold }
  return if quarantined.empty?

  sample = quarantined.first(10).map(&:id).join(", ")
  sample += "..." if quarantined.size > 10
  LlmCostTracker::Logging.warn(
    "Ingestion::Batch: #{quarantined.size} inbox row(s) reached " \
    "MAX_ATTEMPTS_BEFORE_QUARANTINE=#{threshold} and will be skipped " \
    "on the next claim cycle (ids: #{sample})"
  )
end