Class: LlmCostTracker::Ingestion::Batch
- Inherits:
-
Object
- Object
- LlmCostTracker::Ingestion::Batch
- Defined in:
- lib/llm_cost_tracker/ingestion/batch.rb
Constant Summary collapse
- BATCH_SIZE =
100- LOCK_TIMEOUT_SECONDS =
30
Instance Method Summary collapse
- #claimable? ⇒ Boolean
- #error_message_for(error) ⇒ Object
- #ingest ⇒ Object
-
#initialize(identity:) ⇒ Batch
constructor
A new instance of Batch.
- #mark_failed_with_message(rows, message) ⇒ Object
- #pending? ⇒ Boolean
- #warn_on_quarantine(rows) ⇒ Object
Constructor Details
#initialize(identity:) ⇒ Batch
Returns a new instance of Batch.
12 13 14 |
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 12 def initialize(identity:) @identity = identity end |
Instance Method Details
#claimable? ⇒ Boolean
33 34 35 |
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 33 def claimable? claimable_scope(Time.now.utc - LOCK_TIMEOUT_SECONDS).exists? end |
#error_message_for(error) ⇒ Object
51 52 53 |
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 51 def (error) "#{error.class}: #{error.}".byteslice(0, 1_000) end |
#ingest ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 16 def ingest rows = claim return 0 if rows.empty? valid_rows, events = decode(rows) persist(valid_rows, events) if events.any? rows.size rescue StandardError => e rows_to_mark = valid_rows&.any? ? valid_rows : rows (rows_to_mark, (e)) if rows_to_mark&.any? raise end |
#mark_failed_with_message(rows, message) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 37 def (rows, ) now = Time.now.utc Ingestion::InboxEntry .where(id: rows.map(&:id), locked_by: identity) .update_all(last_error: , locked_at: now, locked_by: nil, updated_at: now) warn_on_quarantine(rows) rescue StandardError => e LlmCostTracker::Logging.warn( "Inbox mark_failed_with_message failed for #{rows.size} rows: #{e.class}: #{e.} " \ "(attempted message: #{.to_s.byteslice(0, 200)})" ) nil end |
#pending? ⇒ Boolean
29 30 31 |
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 29 def pending? Ingestion::InboxEntry.pending.exists? end |
#warn_on_quarantine(rows) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 55 def warn_on_quarantine(rows) threshold = Ingestion::InboxEntry::MAX_ATTEMPTS_BEFORE_QUARANTINE quarantined = rows.select { |row| row.attempts.to_i + 1 >= threshold } return if quarantined.empty? sample = quarantined.first(10).map(&:id).join(", ") sample += "..." if quarantined.size > 10 LlmCostTracker::Logging.warn( "Ingestion::Batch: #{quarantined.size} inbox row(s) reached " \ "MAX_ATTEMPTS_BEFORE_QUARANTINE=#{threshold} and will be skipped " \ "on the next claim cycle (ids: #{sample})" ) end |