Class: LlmCostTracker::Storage::ActiveRecordInboxBatch
- Inherits:
-
Object
- Object
- LlmCostTracker::Storage::ActiveRecordInboxBatch
- Defined in:
- lib/llm_cost_tracker/storage/active_record_inbox_batch.rb
Constant Summary collapse
- BATCH_SIZE =
100- LOCK_TIMEOUT_SECONDS =
30
Instance Method Summary collapse
- #claimable? ⇒ Boolean
- #ingest ⇒ Object
-
#initialize(identity:) ⇒ ActiveRecordInboxBatch
constructor
A new instance of ActiveRecordInboxBatch.
- #mark_failed(rows, error) ⇒ Object
- #pending? ⇒ Boolean
Constructor Details
#initialize(identity:) ⇒ ActiveRecordInboxBatch
Returns a new instance of ActiveRecordInboxBatch.
13 14 15 |
# File 'lib/llm_cost_tracker/storage/active_record_inbox_batch.rb', line 13 def initialize(identity:) @identity = identity end |
Instance Method Details
#claimable? ⇒ Boolean
32 |
# File 'lib/llm_cost_tracker/storage/active_record_inbox_batch.rb', line 32 def claimable? = claimable_scope(Time.now.utc - LOCK_TIMEOUT_SECONDS).exists? |
#ingest ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/llm_cost_tracker/storage/active_record_inbox_batch.rb', line 17 def ingest rows = claim return 0 if rows.empty? valid_rows, events = decode(rows) persist(valid_rows, events) if events.any? rows.size rescue StandardError => e rows_to_mark = valid_rows&.any? ? valid_rows : rows mark_failed(rows_to_mark, e) if rows_to_mark&.any? raise end |
#mark_failed(rows, error) ⇒ Object
34 35 36 37 38 39 40 41 42 |
# File 'lib/llm_cost_tracker/storage/active_record_inbox_batch.rb', line 34 def mark_failed(rows, error) = "#{error.class}: #{error.}".byteslice(0, 1_000) now = Time.now.utc model .where(id: rows.map(&:id), locked_by: identity) .update_all(last_error: , locked_at: now, locked_by: nil, updated_at: now) rescue StandardError nil end |
#pending? ⇒ Boolean
30 |
# File 'lib/llm_cost_tracker/storage/active_record_inbox_batch.rb', line 30 def pending? = model.where("attempts < ?", ActiveRecordInbox::MAX_ATTEMPTS).exists? |