Module: Legion::Extensions::Llm::Ledger::Backfill::LegacyLlmRecords

Extended by:
Logging::Helper
Defined in:
lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb

Constant Summary collapse

LEGACY_TABLES =
%i[
  llm_prompt_records
  llm_metering_records
  llm_tool_records
  llm_registry_availability_records
].freeze

Class Method Summary collapse

Class Method Details

.backfill_metering(row) ⇒ Object



68
69
70
71
72
73
74
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 68

def backfill_metering(row)
  payload = metering_payload(row)
  return 0 if official_metric_exists?(payload)

  Writers::OfficialMeteringWriter.write(payload)
  1
end

.backfill_prompt(row) ⇒ Object



60
61
62
63
64
65
66
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 60

def backfill_prompt(row)
  payload = prompt_payload(row)
  return 0 if official_metric_exists?(payload)

  Writers::OfficialPromptWriter.write(payload)
  1
end

.backfill_registry(row) ⇒ Object



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 154

def backfill_registry(row)
  uuid = Writers::OfficialRecordWriter.stable_uuid(row[:event_id] || row[:message_id])
  return 0 if db[:llm_registry_events].where(uuid: uuid).first

  insert_row(:llm_registry_events, {
               uuid:        uuid,
               provider:    row[:provider_family],
               model_key:   row[:model_id],
               event_type:  row[:event_type],
               status:      registry_status(row),
               reason:      row[:metadata_json],
               recorded_at: row[:occurred_at],
               inserted_at: Time.now.utc
             }, operation: 'legacy_llm_backfill.registry_event')
  1
end

.backfill_row(table, row) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 44

def backfill_row(table, row)
  case table
  when :llm_prompt_records
    backfill_prompt(row)
  when :llm_metering_records
    backfill_metering(row)
  when :llm_tool_records
    backfill_tool(row)
  when :llm_registry_availability_records
    backfill_registry(row)
  end
rescue Sequel::UniqueConstraintViolation => e
  handle_exception(e, level: :warn, handled: true, operation: 'legacy_llm_backfill.duplicate')
  0
end

.backfill_table(table, limit:) ⇒ Object



38
39
40
41
42
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 38

def backfill_table(table, limit:)
  dataset = db[table].order(:id)
  dataset = dataset.limit(limit) if limit
  dataset.all.sum { |row| backfill_row(table, row) }
end

.backfill_tool(row) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 131

def backfill_tool(row)
  response = response_for_request(row[:request_id])
  return 0 unless response

  tool_uuid = Writers::OfficialRecordWriter.stable_uuid(row[:tool_call_id] || row[:message_id])
  return 0 if db[:llm_tool_calls].where(uuid: tool_uuid).first

  insert_row(:llm_tool_calls, {
               uuid:                          tool_uuid,
               message_inference_response_id: response[:id],
               tool_call_index:               next_tool_index(response[:id]),
               provider_tool_call_ref:        row[:tool_call_id],
               tool_name:                     row[:tool_name],
               tool_source_type:              row[:tool_source_type],
               tool_source_server:            row[:tool_source_server],
               status:                        row[:tool_status],
               requested_at:                  row[:tool_start_at],
               completed_at:                  row[:tool_end_at],
               inserted_at:                   Time.now.utc
             }, operation: 'legacy_llm_backfill.tool_call')
  1
end

.dbObject



204
205
206
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 204

def db
  ::Legion::Data.connection
end

.ensure_no_legacy_writer_mode!(mode) ⇒ Object

Raises:

  • (ArgumentError)


32
33
34
35
36
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 32

def ensure_no_legacy_writer_mode!(mode)
  return unless %i[legacy legacy_only legacy_table_only].include?(mode.to_sym)

  raise ArgumentError, 'Legacy LLM writer mode is disabled after official backfill; configure official LLM writers.'
end

.insert_row(table, attributes, operation:) ⇒ Object



171
172
173
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 171

def insert_row(table, attributes, operation:)
  Helpers::PersistenceLogging.insert_row(db, table, attributes, operation: operation)
end

.json_load(value) ⇒ Object



208
209
210
211
212
213
214
215
216
217
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 208

def json_load(value)
  return {} if value.nil? || value.to_s.empty?

  Helpers::Json.load(value)
rescue StandardError => e
  raise unless Helpers::Json.parse_error?(e)

  handle_exception(e, level: :warn, handled: true, operation: 'legacy_llm_backfill.json_load')
  { content: value }
end

.metering_payload(row) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 104

def metering_payload(row)
  {
    message_id:        row[:message_id],
    correlation_id:    row[:correlation_id],
    conversation_id:   row[:conversation_id],
    request_id:        row[:request_id],
    exchange_id:       row[:exchange_id],
    operation:         row[:request_type],
    provider:          row[:provider],
    provider_instance: row[:worker_id],
    model_id:          row[:model_id],
    tier:              row[:tier],
    input_tokens:      row[:input_tokens],
    output_tokens:     row[:output_tokens],
    thinking_tokens:   row[:thinking_tokens],
    total_tokens:      row[:total_tokens],
    latency_ms:        row[:latency_ms],
    wall_clock_ms:     row[:wall_clock_ms],
    cost_usd:          row[:cost_usd],
    recorded_at:       row[:recorded_at],
    billing:           {
      cost_center: row[:cost_center],
      budget_id:   row[:budget_id]
    }
  }
end

.next_tool_index(response_id) ⇒ Object



191
192
193
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 191

def next_tool_index(response_id)
  db[:llm_tool_calls].where(message_inference_response_id: response_id).max(:tool_call_index).to_i + 1
end

.official_metric_exists?(payload) ⇒ Boolean

Returns:

  • (Boolean)


182
183
184
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 182

def official_metric_exists?(payload)
  db[:llm_message_inference_metrics].where(uuid: official_metric_uuid(payload)).first
end

.official_metric_uuid(payload) ⇒ Object



186
187
188
189
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 186

def official_metric_uuid(payload)
  ref = payload[:message_id] || "metric:#{Writers::OfficialRecordWriter.request_ref(payload)}"
  Writers::OfficialRecordWriter.stable_uuid(ref)
end

.prompt_payload(row) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 76

def prompt_payload(row)
  {
    message_id:           row[:message_id],
    correlation_id:       row[:correlation_id],
    conversation_id:      row[:conversation_id],
    response_message_id:  row[:response_message_id],
    request_id:           row[:request_id],
    exchange_id:          row[:exchange_id],
    operation:            row[:request_type],
    provider:             row[:provider],
    model_id:             row[:model_id],
    tier:                 row[:tier],
    request:              json_load(row[:request_json]),
    response:             json_load(row[:response_json]),
    response_thinking:    json_load(row[:response_thinking_json]),
    input_tokens:         row[:input_tokens],
    output_tokens:        row[:output_tokens],
    total_tokens:         row[:total_tokens],
    cost_usd:             row[:cost_usd],
    classification_level: row[:classification_level],
    contains_phi:         row[:contains_phi],
    contains_pii:         row[:contains_pii],
    retention_policy:     row[:retention_policy],
    expires_at:           row[:expires_at],
    recorded_at:          row[:recorded_at]
  }
end

.registry_status(row) ⇒ Object



195
196
197
198
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 195

def registry_status(row)
  health = json_load(row[:health_json])
  health[:status] || health['status'] || row[:event_type] || 'unknown'
end

.response_for_request(request_id) ⇒ Object



175
176
177
178
179
180
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 175

def response_for_request(request_id)
  request = db[:llm_message_inference_requests].where(request_ref: request_id).first
  return nil unless request

  db[:llm_message_inference_responses].where(message_inference_request_id: request[:id]).first
end

.run(limit: nil, writer_mode: :official) ⇒ Object



24
25
26
27
28
29
30
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 24

def run(limit: nil, writer_mode: :official)
  ensure_no_legacy_writer_mode!(writer_mode)

  LEGACY_TABLES.to_h do |table|
    [table, table_present?(table) ? backfill_table(table, limit:) : 0]
  end
end

.table_present?(table) ⇒ Boolean

Returns:

  • (Boolean)


200
201
202
# File 'lib/legion/extensions/llm/ledger/backfill/legacy_llm_records.rb', line 200

def table_present?(table)
  db.table_exists?(table)
end