Class: LlmCostTracker::Ingestion::Batch
- Inherits:
-
Object
- Object
- LlmCostTracker::Ingestion::Batch
- Defined in:
- lib/llm_cost_tracker/ingestion/batch.rb
Constant Summary collapse
- BATCH_SIZE =
100- LOCK_TIMEOUT_SECONDS =
30
Instance Method Summary collapse
- #claimable? ⇒ Boolean
- #ingest ⇒ Object
-
#initialize(identity:) ⇒ Batch
constructor
A new instance of Batch.
- #mark_failed(rows, error) ⇒ Object
- #pending? ⇒ Boolean
Constructor Details
#initialize(identity:) ⇒ Batch
Returns a new instance of Batch.
12 13 14 |
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 12 def initialize(identity:) @identity = identity end |
Instance Method Details
#claimable? ⇒ Boolean
33 34 35 |
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 33 def claimable? claimable_scope(Time.now.utc - LOCK_TIMEOUT_SECONDS).exists? end |
#ingest ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 16 def ingest rows = claim return 0 if rows.empty? valid_rows, events = decode(rows) persist(valid_rows, events) if events.any? rows.size rescue StandardError => e rows_to_mark = valid_rows&.any? ? valid_rows : rows mark_failed(rows_to_mark, e) if rows_to_mark&.any? raise end |
#mark_failed(rows, error) ⇒ Object
37 38 39 40 41 42 43 44 45 |
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 37 def mark_failed(rows, error) = "#{error.class}: #{error.}".byteslice(0, 1_000) now = Time.now.utc Ingestion::Event .where(id: rows.map(&:id), locked_by: identity) .update_all(last_error: , locked_at: now, locked_by: nil, updated_at: now) rescue StandardError nil end |
#pending? ⇒ Boolean
29 30 31 |
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 29 def pending? Ingestion::Event.where("attempts < ?", Ingestion::Event::MAX_ATTEMPTS).exists? end |