Module: Legion::Extensions::Llm::Ledger::Backfill::LegacyLlmRecords
- Extended by:
- Logging::Helper
- Defined in:
- lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb
Constant Summary collapse
- LEGACY_TABLES =
%i[ z_archive_llm_prompt_records z_archive_llm_metering_records z_archive_llm_tool_records llm_registry_availability_records ].freeze
Class Method Summary collapse
- .backfill_metering(row) ⇒ Object
- .backfill_prompt(row) ⇒ Object
- .backfill_registry(row) ⇒ Object
- .backfill_row(table, row) ⇒ Object
- .backfill_table(table, limit:) ⇒ Object
- .backfill_tool(row) ⇒ Object
- .db ⇒ Object
- .ensure_no_legacy_writer_mode!(mode) ⇒ Object
- .insert_row(table, attributes, operation:) ⇒ Object
- .json_load(value) ⇒ Object
- .metering_payload(row) ⇒ Object
- .next_tool_index(response_id) ⇒ Object
- .official_metric_exists?(payload) ⇒ Boolean
- .official_metric_uuid(payload) ⇒ Object
- .prompt_payload(row) ⇒ Object
- .registry_reason(row) ⇒ Object
- .registry_status(row) ⇒ Object
- .response_for_request(request_id) ⇒ Object
- .run(limit: nil, writer_mode: :official) ⇒ Object
- .table_present?(table) ⇒ Boolean
Class Method Details
.backfill_metering(row) ⇒ Object
68 69 70 71 72 73 74 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 68 def backfill_metering(row) payload = metering_payload(row) return 0 if official_metric_exists?(payload) Writers::OfficialMeteringWriter.write(payload) 1 end |
.backfill_prompt(row) ⇒ Object
60 61 62 63 64 65 66 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 60 def backfill_prompt(row) payload = prompt_payload(row) return 0 if official_metric_exists?(payload) Writers::OfficialPromptWriter.write(payload) 1 end |
.backfill_registry(row) ⇒ Object
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 155 def backfill_registry(row) uuid = Writers::OfficialRecordWriter.stable_uuid(row[:event_id] || row[:message_id]) return 0 if db[:llm_registry_events].where(uuid: uuid).first insert_row(:llm_registry_events, { uuid: uuid, provider: row[:provider_family], model_key: row[:model_id], event_type: row[:event_type], status: registry_status(row), reason: registry_reason(row), recorded_at: row[:occurred_at], inserted_at: Time.now.utc }, operation: 'legacy_llm_backfill.registry_event') 1 end |
.backfill_row(table, row) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 44 def backfill_row(table, row) case table when :z_archive_llm_prompt_records backfill_prompt(row) when :z_archive_llm_metering_records backfill_metering(row) when :z_archive_llm_tool_records backfill_tool(row) when :llm_registry_availability_records backfill_registry(row) end rescue Sequel::UniqueConstraintViolation => e handle_exception(e, level: :warn, handled: true, operation: 'legacy_llm_backfill.duplicate') 0 end |
.backfill_table(table, limit:) ⇒ Object
38 39 40 41 42 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 38 def backfill_table(table, limit:) dataset = db[table].order(:id) dataset = dataset.limit(limit) if limit dataset.all.sum { |row| backfill_row(table, row) } end |
.backfill_tool(row) ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 132 def backfill_tool(row) response = response_for_request(row[:request_id]) return 0 unless response tool_uuid = Writers::OfficialRecordWriter.stable_uuid(row[:tool_call_id] || row[:message_id]) return 0 if db[:llm_tool_calls].where(uuid: tool_uuid).first insert_row(:llm_tool_calls, { uuid: tool_uuid, message_inference_response_id: response[:id], tool_call_index: next_tool_index(response[:id]), provider_tool_call_ref: row[:tool_call_id], tool_name: row[:tool_name], tool_source_type: row[:tool_source_type], tool_source_server: row[:tool_source_server], status: row[:tool_status], requested_at: row[:tool_start_at], completed_at: row[:tool_end_at], inserted_at: Time.now.utc }, operation: 'legacy_llm_backfill.tool_call') 1 end |
.db ⇒ Object
210 211 212 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 210 def db ::Legion::Data.connection end |
.ensure_no_legacy_writer_mode!(mode) ⇒ Object
32 33 34 35 36 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 32 def ensure_no_legacy_writer_mode!(mode) return unless %i[legacy legacy_only legacy_table_only].include?(mode.to_sym) raise ArgumentError, 'Legacy LLM writer mode is disabled after official backfill; configure official LLM writers.' end |
.insert_row(table, attributes, operation:) ⇒ Object
172 173 174 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 172 def insert_row(table, attributes, operation:) Helpers::PersistenceLogging.insert_row(db, table, attributes, operation: operation) end |
.json_load(value) ⇒ Object
214 215 216 217 218 219 220 221 222 223 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 214 def json_load(value) return {} if value.nil? || value.to_s.empty? Helpers::Json.load(value) rescue StandardError => e raise unless Helpers::Json.parse_error?(e) handle_exception(e, level: :warn, handled: true, operation: 'legacy_llm_backfill.json_load') { content: value } end |
.metering_payload(row) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 104 def metering_payload(row) { message_id: row[:message_id], correlation_id: row[:correlation_id], conversation_id: row[:conversation_id], request_id: row[:request_id], exchange_id: row[:exchange_id], operation: row[:request_type], provider: row[:provider], provider_instance: row[:provider_instance], worker_id: row[:worker_id], model_id: row[:model_id], tier: row[:tier], input_tokens: row[:input_tokens], output_tokens: row[:output_tokens], thinking_tokens: row[:thinking_tokens], total_tokens: row[:total_tokens], latency_ms: row[:latency_ms], wall_clock_ms: row[:wall_clock_ms], cost_usd: row[:cost_usd], recorded_at: row[:recorded_at], billing: { cost_center: row[:cost_center], budget_id: row[:budget_id] } } end |
.next_tool_index(response_id) ⇒ Object
192 193 194 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 192 def next_tool_index(response_id) db[:llm_tool_calls].where(message_inference_response_id: response_id).max(:tool_call_index).to_i + 1 end |
.official_metric_exists?(payload) ⇒ Boolean
183 184 185 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 183 def official_metric_exists?(payload) db[:llm_message_inference_metrics].where(uuid: official_metric_uuid(payload)).first end |
.official_metric_uuid(payload) ⇒ Object
187 188 189 190 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 187 def official_metric_uuid(payload) ref = payload[:metric_id] || payload[:metric_ref] || "metric:#{Writers::OfficialRecordWriter.request_ref(payload)}" Writers::OfficialRecordWriter.stable_uuid(ref) end |
.prompt_payload(row) ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 76 def prompt_payload(row) { message_id: row[:message_id], correlation_id: row[:correlation_id], conversation_id: row[:conversation_id], response_message_id: row[:response_message_id], request_id: row[:request_id], exchange_id: row[:exchange_id], operation: row[:request_type], provider: row[:provider], model_id: row[:model_id], tier: row[:tier], request: json_load(row[:request_json]), response: json_load(row[:response_json]), response_thinking: json_load(row[:response_thinking_json]), input_tokens: row[:input_tokens], output_tokens: row[:output_tokens], total_tokens: row[:total_tokens], cost_usd: row[:cost_usd], classification_level: row[:classification_level], contains_phi: row[:contains_phi], contains_pii: row[:contains_pii], retention_policy: row[:retention_policy], expires_at: row[:expires_at], recorded_at: row[:recorded_at] } end |
.registry_reason(row) ⇒ Object
201 202 203 204 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 201 def registry_reason(row) = json_load(row[:metadata_json]) [:reason] || ['reason'] || [:message] || ['message'] || row[:event_type] end |
.registry_status(row) ⇒ Object
196 197 198 199 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 196 def registry_status(row) health = json_load(row[:health_json]) health[:status] || health['status'] || row[:event_type] || 'unknown' end |
.response_for_request(request_id) ⇒ Object
176 177 178 179 180 181 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 176 def response_for_request(request_id) request = db[:llm_message_inference_requests].where(request_ref: request_id).first return nil unless request db[:llm_message_inference_responses].where(message_inference_request_id: request[:id]).first end |
.run(limit: nil, writer_mode: :official) ⇒ Object
24 25 26 27 28 29 30 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 24 def run(limit: nil, writer_mode: :official) ensure_no_legacy_writer_mode!(writer_mode) LEGACY_TABLES.to_h do |table| [table, table_present?(table) ? backfill_table(table, limit:) : 0] end end |
.table_present?(table) ⇒ Boolean
206 207 208 |
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 206 def table_present?(table) db.table_exists?(table) end |