Class: LlmCostTracker::Storage::ActiveRecordInboxBatch

Inherits:
Object
  • Object
show all
Defined in:
lib/llm_cost_tracker/storage/active_record_inbox_batch.rb

Constant Summary collapse

BATCH_SIZE =
100
LOCK_TIMEOUT_SECONDS =
30

Instance Method Summary collapse

Constructor Details

#initialize(identity:) ⇒ ActiveRecordInboxBatch

Returns a new instance of ActiveRecordInboxBatch.



13
14
15
# File 'lib/llm_cost_tracker/storage/active_record_inbox_batch.rb', line 13

def initialize(identity:)
  @identity = identity
end

Instance Method Details

#claimable?Boolean

Returns:

  • (Boolean)


32
# File 'lib/llm_cost_tracker/storage/active_record_inbox_batch.rb', line 32

def claimable? = claimable_scope(Time.now.utc - LOCK_TIMEOUT_SECONDS).exists?

#ingestObject



17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/llm_cost_tracker/storage/active_record_inbox_batch.rb', line 17

def ingest
  rows = claim
  return 0 if rows.empty?

  valid_rows, events = decode(rows)
  persist(valid_rows, events) if events.any?
  rows.size
rescue StandardError => e
  rows_to_mark = valid_rows&.any? ? valid_rows : rows
  mark_failed(rows_to_mark, e) if rows_to_mark&.any?
  raise
end

#mark_failed(rows, error) ⇒ Object



34
35
36
37
38
39
40
41
42
# File 'lib/llm_cost_tracker/storage/active_record_inbox_batch.rb', line 34

def mark_failed(rows, error)
  message = "#{error.class}: #{error.message}".byteslice(0, 1_000)
  now = Time.now.utc
  model
    .where(id: rows.map(&:id), locked_by: identity)
    .update_all(last_error: message, locked_at: now, locked_by: nil, updated_at: now)
rescue StandardError
  nil
end

#pending?Boolean

Returns:

  • (Boolean)


30
# File 'lib/llm_cost_tracker/storage/active_record_inbox_batch.rb', line 30

def pending? = model.where("attempts < ?", ActiveRecordInbox::MAX_ATTEMPTS).exists?