Class: LlmCostTracker::Ingestion::Batch

Inherits:
Object
  • Object
show all
Defined in:
lib/llm_cost_tracker/ingestion/batch.rb

Constant Summary collapse

BATCH_SIZE =
100
LOCK_TIMEOUT_SECONDS =
30
TRANSIENT_PERSIST_ERRORS =
[
  ActiveRecord::Deadlocked,
  ActiveRecord::LockWaitTimeout,
  ActiveRecord::StatementTimeout,
  ActiveRecord::ConnectionNotEstablished
].freeze

Instance Method Summary collapse

Constructor Details

#initialize(identity:) ⇒ Batch

Returns a new instance of Batch.



18
19
20
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 18

def initialize(identity:)
  @identity = identity
end

Instance Method Details

#claimable?Boolean

Returns:

  • (Boolean)


42
43
44
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 42

def claimable?
  claimable_scope(Time.now.utc - LOCK_TIMEOUT_SECONDS).exists?
end

#error_message_for(error) ⇒ Object



68
69
70
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 68

def error_message_for(error)
  "#{error.class}: #{error.message}".byteslice(0, 1_000)
end

#ingestObject



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 22

def ingest
  rows = claim
  return 0 if rows.empty?

  valid_rows, events = decode(rows)
  persist(valid_rows, events) if events.any?
  rows.size
rescue StandardError => e
  rows_to_mark = valid_rows&.any? ? valid_rows : rows
  if rows_to_mark&.any?
    transient = valid_rows&.any? && TRANSIENT_PERSIST_ERRORS.any? { |klass| e.is_a?(klass) }
    mark_failed_with_message(rows_to_mark, error_message_for(e), decrement_attempts: transient)
  end
  raise
end

#mark_failed_with_message(rows, message, decrement_attempts: false) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 46

def mark_failed_with_message(rows, message, decrement_attempts: false)
  now = Time.now.utc
  scope = Ingestion::InboxEntry.where(id: rows.map(&:id), locked_by: identity)
  if decrement_attempts
    scope.update_all(
      Ingestion::InboxEntry.sanitize_sql_array(
        ["last_error = ?, locked_at = ?, locked_by = NULL, " \
         "attempts = GREATEST(attempts - 1, 0), updated_at = ?", message, now, now]
      )
    )
  else
    scope.update_all(last_error: message, locked_at: now, locked_by: nil, updated_at: now)
    warn_on_quarantine(rows)
  end
rescue StandardError => e
  LlmCostTracker::Logging.warn(
    "Inbox mark_failed_with_message failed for #{rows.size} rows: #{e.class}: #{e.message} " \
    "(attempted message: #{message.to_s.byteslice(0, 200)})"
  )
  nil
end

#pending?Boolean

Returns:

  • (Boolean)


38
39
40
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 38

def pending?
  Ingestion::InboxEntry.pending.exists?
end

#warn_on_quarantine(rows) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 72

def warn_on_quarantine(rows)
  threshold = Ingestion::InboxEntry::MAX_ATTEMPTS_BEFORE_QUARANTINE
  quarantined = rows.select { |row| row.attempts.to_i + 1 >= threshold }
  return if quarantined.empty?

  sample = quarantined.first(10).map(&:id).join(", ")
  sample += "..." if quarantined.size > 10
  LlmCostTracker::Logging.warn(
    "Ingestion::Batch: #{quarantined.size} inbox row(s) reached " \
    "MAX_ATTEMPTS_BEFORE_QUARANTINE=#{threshold} and will be skipped " \
    "on the next claim cycle (ids: #{sample})"
  )
end