Module: Legion::LLM::Inference::EmbedPipeline

Extended by:
Legion::Logging::Helper
Defined in:
lib/legion/llm/inference/embed_pipeline.rb

Overview

Slim, governed pipeline for embedding requests (G16/G19): chat-only steps (RAG, tool discovery, GAIA, debate, etc.) do not apply, so embeddings have their own dedicated path:

normalize -> ContentHash -> Cache(model, dims, sha256) lookup
  -> on hit: Metering.emit(cost_usd: 0, cache_hit: true) + return
  -> on miss: Call::Embeddings.generate -> Metering.emit + Cache.set + return

‘Legion::LLM.embed` is wired to this pipeline; the public signature stays identical so existing callers keep working. Cache is content-addressed (vectors are deterministic per (model, dims) — a text-only key would collide across embedding spaces). The same SHA-256 utility powers request_content_hash on the audit ledger so records join across systems.

Class Method Summary collapse

Class Method Details

.cache_key(model:, dimensions:, digest:) ⇒ Object



120
121
122
123
# File 'lib/legion/llm/inference/embed_pipeline.rb', line 120

def cache_key(model:, dimensions:, digest:)
  dims = dimensions.to_i
  "#{cache_key_prefix}:#{model}:#{dims}:#{digest}"
end

.cache_key_prefixObject



111
112
113
# File 'lib/legion/llm/inference/embed_pipeline.rb', line 111

def cache_key_prefix
  (cache_settings[:key_prefix] || 'llm:embed').to_s
end

.cache_lookup_eligible?(model:) ⇒ Boolean

Returns:

  • (Boolean)


100
101
102
103
104
105
# File 'lib/legion/llm/inference/embed_pipeline.rb', line 100

def cache_lookup_eligible?(model:)
  return false unless cache_settings[:enabled] != false
  return false unless model || default_model

  true
end

.cache_settingsObject



107
108
109
# File 'lib/legion/llm/inference/embed_pipeline.rb', line 107

def cache_settings
  Legion::Settings[:llm][:embedding][:cache] || {}
end

.cache_ttlObject



115
116
117
118
# File 'lib/legion/llm/inference/embed_pipeline.rb', line 115

def cache_ttl
  ttl = cache_settings[:ttl]
  ttl.is_a?(Numeric) && ttl.positive? ? ttl.to_i : Cache::DEFAULT_TTL
end

.cacheable_result?(result) ⇒ Boolean

Returns:

  • (Boolean)


125
126
127
128
129
130
131
# File 'lib/legion/llm/inference/embed_pipeline.rb', line 125

def cacheable_result?(result)
  return false unless result.is_a?(Hash)
  return false if result[:error]

  vector = result[:vector]
  vector.is_a?(Array) && vector.any? && vector.first.is_a?(Numeric)
end

.call(text:, model: nil, provider: nil, instance: nil, dimensions: nil, task: :document) ⇒ Hash

Returns same shape as Call::Embeddings.generate, plus :cache_hit when applicable.

Parameters:

  • text (String, Array, Hash)

    caller-supplied text

Returns:

  • (Hash)

    same shape as Call::Embeddings.generate, plus :cache_hit when applicable



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/legion/llm/inference/embed_pipeline.rb', line 33

def call(text:, model: nil, provider: nil, instance: nil, dimensions: nil, task: :document)
  normalized = normalize_text(text)
  content_digest = ContentHash.call(normalized)

  cached_key = nil
  cached_value = nil
  if content_digest && cache_lookup_eligible?(model: model)
    cached_key = cache_key(model: model, dimensions: dimensions, digest: content_digest)
    cached_value = Cache.get(cached_key)
  end

  if cached_value
    log.debug("[llm][embed_pipeline] action=cache_hit key=#{cached_key} dims=#{cached_value['dimensions'] || cached_value[:dimensions]}")
    result = symbolize_cached(cached_value)
    emit_metering(result: result, cache_hit: true)
    return result.merge(cache_hit: true)
  end

  result = Call::Embeddings.generate(
    text:       normalized,
    model:      model,
    provider:   provider,
    instance:   instance,
    dimensions: dimensions,
    task:       task
  )

  emit_metering(result: result, cache_hit: false)

  # Persist only when we have a real vector AND an effective cache key
  # (the post-call key resolves the model the provider actually used,
  # which is not always the model the caller passed in).
  if content_digest && cacheable_result?(result)
    persist_key = cache_key(
      model:      result[:model] || model,
      dimensions: result[:dimensions] || dimensions,
      digest:     content_digest
    )
    store_cache_value(persist_key, result)
  end

  result
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.embed_pipeline.call')
  { vector: nil, model: model, provider: provider, error: e.message }
end

.default_modelObject



153
154
155
156
157
# File 'lib/legion/llm/inference/embed_pipeline.rb', line 153

def default_model
  Call::Embeddings.default_model
rescue StandardError
  nil
end

.emit_metering(result:, cache_hit:) ⇒ Object



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/legion/llm/inference/embed_pipeline.rb', line 159

def emit_metering(result:, cache_hit:)
  return unless result.is_a?(Hash) && !result[:error]

  tokens = result[:tokens].to_i
  event = {
    provider:      result[:provider],
    model_id:      result[:model],
    request_type:  'embedding',
    tier:          'direct',
    input_tokens:  cache_hit ? 0 : tokens,
    output_tokens: 0,
    total_tokens:  cache_hit ? 0 : tokens,
    event_type:    'llm_embedding',
    status:        'success',
    cache_hit:     cache_hit,
    cost_usd:      cache_hit ? 0 : nil,
    caller:        { requested_by: { type: :system, identity: 'legion:internal:embed' } }
  }.compact
  # `cost_usd: nil` removed by .compact for the miss path so downstream
  # pricing logic can compute the actual cost; on a hit we hard-pin 0.
  event[:cost_usd] = 0 if cache_hit
  Metering.emit(event)
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.embed_pipeline.metering')
end

.normalize_text(value) ⇒ Object


Helpers




84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/legion/llm/inference/embed_pipeline.rb', line 84

def normalize_text(value)
  # Mirror Call::Embeddings#coerce_text behaviour, but expose it here so the
  # cache key reflects exactly what the provider sees. Call::Embeddings
  # still re-normalises internally; the second pass is idempotent.
  case value
  when String then value
  when Array
    value.filter_map { |e| e.is_a?(Hash) ? (e[:text] || e[:content]) : e.to_s }
         .map(&:strip).reject(&:empty?).join("\n")
  when Hash
    (value[:text] || value[:content] || value.values.first).to_s
  else
    value.to_s
  end
end

.store_cache_value(key, result) ⇒ Object



133
134
135
136
137
138
139
140
141
142
143
# File 'lib/legion/llm/inference/embed_pipeline.rb', line 133

def store_cache_value(key, result)
  payload = {
    vector:     result[:vector],
    model:      result[:model],
    provider:   result[:provider]&.to_s,
    dimensions: result[:dimensions],
    tokens:     result[:tokens],
    chunks:     result[:chunks]
  }.compact
  Cache.set(key, payload, ttl: cache_ttl)
end

.symbolize_cached(value) ⇒ Object



145
146
147
148
149
150
151
# File 'lib/legion/llm/inference/embed_pipeline.rb', line 145

def symbolize_cached(value)
  return value if value.is_a?(Hash) && value.keys.first.is_a?(Symbol)

  symbolized = value.transform_keys { |k| k.respond_to?(:to_sym) ? k.to_sym : k }
  symbolized[:provider] = symbolized[:provider].to_sym if symbolized[:provider].is_a?(String)
  symbolized
end