Class: LlmCostTracker::Ingestion::Batch
- Inherits:
-
Object
- Object
- LlmCostTracker::Ingestion::Batch
- Defined in:
- lib/llm_cost_tracker/ingestion/batch.rb
Constant Summary collapse
- BATCH_SIZE =
100- LOCK_TIMEOUT_SECONDS =
30- TRANSIENT_PERSIST_ERRORS =
[ ActiveRecord::Deadlocked, ActiveRecord::LockWaitTimeout, ActiveRecord::StatementTimeout, ActiveRecord::ConnectionNotEstablished ].freeze
Instance Method Summary collapse
- #claimable? ⇒ Boolean
- #error_message_for(error) ⇒ Object
- #ingest ⇒ Object
-
#initialize(identity:) ⇒ Batch
constructor
A new instance of Batch.
- #mark_failed_with_message(rows, message, decrement_attempts: false) ⇒ Object
- #pending? ⇒ Boolean
- #warn_on_quarantine(rows) ⇒ Object
Constructor Details
#initialize(identity:) ⇒ Batch
Returns a new instance of Batch.
18 19 20 |
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 18 def initialize(identity:) @identity = identity end |
Instance Method Details
#claimable? ⇒ Boolean
42 43 44 |
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 42 def claimable? claimable_scope(Time.now.utc - LOCK_TIMEOUT_SECONDS).exists? end |
#error_message_for(error) ⇒ Object
68 69 70 |
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 68 def (error) "#{error.class}: #{error.}".byteslice(0, 1_000) end |
#ingest ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 22 def ingest rows = claim return 0 if rows.empty? valid_rows, events = decode(rows) persist(valid_rows, events) if events.any? rows.size rescue StandardError => e rows_to_mark = valid_rows&.any? ? valid_rows : rows if rows_to_mark&.any? transient = valid_rows&.any? && TRANSIENT_PERSIST_ERRORS.any? { |klass| e.is_a?(klass) } (rows_to_mark, (e), decrement_attempts: transient) end raise end |
#mark_failed_with_message(rows, message, decrement_attempts: false) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 46 def (rows, , decrement_attempts: false) now = Time.now.utc scope = Ingestion::InboxEntry.where(id: rows.map(&:id), locked_by: identity) if decrement_attempts scope.update_all( Ingestion::InboxEntry.sanitize_sql_array( ["last_error = ?, locked_at = ?, locked_by = NULL, " \ "attempts = GREATEST(attempts - 1, 0), updated_at = ?", , now, now] ) ) else scope.update_all(last_error: , locked_at: now, locked_by: nil, updated_at: now) warn_on_quarantine(rows) end rescue StandardError => e LlmCostTracker::Logging.warn( "Inbox mark_failed_with_message failed for #{rows.size} rows: #{e.class}: #{e.} " \ "(attempted message: #{.to_s.byteslice(0, 200)})" ) nil end |
#pending? ⇒ Boolean
38 39 40 |
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 38 def pending? Ingestion::InboxEntry.pending.exists? end |
#warn_on_quarantine(rows) ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/llm_cost_tracker/ingestion/batch.rb', line 72 def warn_on_quarantine(rows) threshold = Ingestion::InboxEntry::MAX_ATTEMPTS_BEFORE_QUARANTINE quarantined = rows.select { |row| row.attempts.to_i + 1 >= threshold } return if quarantined.empty? sample = quarantined.first(10).map(&:id).join(", ") sample += "..." if quarantined.size > 10 LlmCostTracker::Logging.warn( "Ingestion::Batch: #{quarantined.size} inbox row(s) reached " \ "MAX_ATTEMPTS_BEFORE_QUARANTINE=#{threshold} and will be skipped " \ "on the next claim cycle (ids: #{sample})" ) end |