Module: Legion::Extensions::Llm::Ledger::Backfill::LegacyLlmRecords
- Extended by:
- Logging::Helper
- Defined in:
- lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb
Constant Summary collapse
- LEGACY_TABLES =
%i[ llm_prompt_records llm_metering_records llm_tool_records llm_registry_availability_records ].freeze
Class Method Summary collapse
- .backfill_metering(row) ⇒ Object
- .backfill_prompt(row) ⇒ Object
- .backfill_registry(row) ⇒ Object
- .backfill_row(table, row) ⇒ Object
- .backfill_table(table, limit:) ⇒ Object
- .backfill_tool(row) ⇒ Object
- .db ⇒ Object
- .ensure_no_legacy_writer_mode!(mode) ⇒ Object
- .insert_row(table, attributes, operation:) ⇒ Object
- .json_load(value) ⇒ Object
- .metering_payload(row) ⇒ Object
- .next_tool_index(response_id) ⇒ Object
- .official_metric_exists?(payload) ⇒ Boolean
- .official_metric_uuid(payload) ⇒ Object
- .prompt_payload(row) ⇒ Object
- .registry_status(row) ⇒ Object
- .response_for_request(request_id) ⇒ Object
- .run(limit: nil, writer_mode: :official) ⇒ Object
- .table_present?(table) ⇒ Boolean
Class Method Details
.backfill_metering(row) ⇒ Object
68 69 70 71 72 73 74 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 68 def backfill_metering(row) payload = metering_payload(row) return 0 if official_metric_exists?(payload) Writers::OfficialMeteringWriter.write(payload) 1 end |
.backfill_prompt(row) ⇒ Object
60 61 62 63 64 65 66 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 60 def backfill_prompt(row) payload = prompt_payload(row) return 0 if official_metric_exists?(payload) Writers::OfficialPromptWriter.write(payload) 1 end |
.backfill_registry(row) ⇒ Object
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 154 def backfill_registry(row) uuid = Writers::OfficialRecordWriter.stable_uuid(row[:event_id] || row[:message_id]) return 0 if db[:llm_registry_events].where(uuid: uuid).first insert_row(:llm_registry_events, { uuid: uuid, provider: row[:provider_family], model_key: row[:model_id], event_type: row[:event_type], status: registry_status(row), reason: row[:metadata_json], recorded_at: row[:occurred_at], inserted_at: Time.now.utc }, operation: 'legacy_llm_backfill.registry_event') 1 end |
.backfill_row(table, row) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 44 def backfill_row(table, row) case table when :llm_prompt_records backfill_prompt(row) when :llm_metering_records backfill_metering(row) when :llm_tool_records backfill_tool(row) when :llm_registry_availability_records backfill_registry(row) end rescue Sequel::UniqueConstraintViolation => e handle_exception(e, level: :warn, handled: true, operation: 'legacy_llm_backfill.duplicate') 0 end |
.backfill_table(table, limit:) ⇒ Object
38 39 40 41 42 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 38 def backfill_table(table, limit:) dataset = db[table].order(:id) dataset = dataset.limit(limit) if limit dataset.all.sum { |row| backfill_row(table, row) } end |
.backfill_tool(row) ⇒ Object
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 131 def backfill_tool(row) response = response_for_request(row[:request_id]) return 0 unless response tool_uuid = Writers::OfficialRecordWriter.stable_uuid(row[:tool_call_id] || row[:message_id]) return 0 if db[:llm_tool_calls].where(uuid: tool_uuid).first insert_row(:llm_tool_calls, { uuid: tool_uuid, message_inference_response_id: response[:id], tool_call_index: next_tool_index(response[:id]), provider_tool_call_ref: row[:tool_call_id], tool_name: row[:tool_name], tool_source_type: row[:tool_source_type], tool_source_server: row[:tool_source_server], status: row[:tool_status], requested_at: row[:tool_start_at], completed_at: row[:tool_end_at], inserted_at: Time.now.utc }, operation: 'legacy_llm_backfill.tool_call') 1 end |
.db ⇒ Object
204 205 206 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 204 def db ::Legion::Data.connection end |
.ensure_no_legacy_writer_mode!(mode) ⇒ Object
32 33 34 35 36 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 32 def ensure_no_legacy_writer_mode!(mode) return unless %i[legacy legacy_only legacy_table_only].include?(mode.to_sym) raise ArgumentError, 'Legacy LLM writer mode is disabled after official backfill; configure official LLM writers.' end |
.insert_row(table, attributes, operation:) ⇒ Object
171 172 173 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 171 def insert_row(table, attributes, operation:) Helpers::PersistenceLogging.insert_row(db, table, attributes, operation: operation) end |
.json_load(value) ⇒ Object
208 209 210 211 212 213 214 215 216 217 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 208 def json_load(value) return {} if value.nil? || value.to_s.empty? Helpers::Json.load(value) rescue StandardError => e raise unless Helpers::Json.parse_error?(e) handle_exception(e, level: :warn, handled: true, operation: 'legacy_llm_backfill.json_load') { content: value } end |
.metering_payload(row) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 104 def metering_payload(row) { message_id: row[:message_id], correlation_id: row[:correlation_id], conversation_id: row[:conversation_id], request_id: row[:request_id], exchange_id: row[:exchange_id], operation: row[:request_type], provider: row[:provider], provider_instance: row[:worker_id], model_id: row[:model_id], tier: row[:tier], input_tokens: row[:input_tokens], output_tokens: row[:output_tokens], thinking_tokens: row[:thinking_tokens], total_tokens: row[:total_tokens], latency_ms: row[:latency_ms], wall_clock_ms: row[:wall_clock_ms], cost_usd: row[:cost_usd], recorded_at: row[:recorded_at], billing: { cost_center: row[:cost_center], budget_id: row[:budget_id] } } end |
.next_tool_index(response_id) ⇒ Object
191 192 193 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 191 def next_tool_index(response_id) db[:llm_tool_calls].where(message_inference_response_id: response_id).max(:tool_call_index).to_i + 1 end |
.official_metric_exists?(payload) ⇒ Boolean
182 183 184 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 182 def official_metric_exists?(payload) db[:llm_message_inference_metrics].where(uuid: official_metric_uuid(payload)).first end |
.official_metric_uuid(payload) ⇒ Object
186 187 188 189 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 186 def official_metric_uuid(payload) ref = payload[:message_id] || "metric:#{Writers::OfficialRecordWriter.request_ref(payload)}" Writers::OfficialRecordWriter.stable_uuid(ref) end |
.prompt_payload(row) ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 76 def prompt_payload(row) { message_id: row[:message_id], correlation_id: row[:correlation_id], conversation_id: row[:conversation_id], response_message_id: row[:response_message_id], request_id: row[:request_id], exchange_id: row[:exchange_id], operation: row[:request_type], provider: row[:provider], model_id: row[:model_id], tier: row[:tier], request: json_load(row[:request_json]), response: json_load(row[:response_json]), response_thinking: json_load(row[:response_thinking_json]), input_tokens: row[:input_tokens], output_tokens: row[:output_tokens], total_tokens: row[:total_tokens], cost_usd: row[:cost_usd], classification_level: row[:classification_level], contains_phi: row[:contains_phi], contains_pii: row[:contains_pii], retention_policy: row[:retention_policy], expires_at: row[:expires_at], recorded_at: row[:recorded_at] } end |
.registry_status(row) ⇒ Object
195 196 197 198 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 195 def registry_status(row) health = json_load(row[:health_json]) health[:status] || health['status'] || row[:event_type] || 'unknown' end |
.response_for_request(request_id) ⇒ Object
175 176 177 178 179 180 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 175 def response_for_request(request_id) request = db[:llm_message_inference_requests].where(request_ref: request_id).first return nil unless request db[:llm_message_inference_responses].where(message_inference_request_id: request[:id]).first end |
.run(limit: nil, writer_mode: :official) ⇒ Object
24 25 26 27 28 29 30 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 24 def run(limit: nil, writer_mode: :official) ensure_no_legacy_writer_mode!(writer_mode) LEGACY_TABLES.to_h do |table| [table, table_present?(table) ? backfill_table(table, limit:) : 0] end end |
.table_present?(table) ⇒ Boolean
200 201 202 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 200 def table_present?(table) db.table_exists?(table) end |