Module: ClaudeMemory::OTel::OtlpJsonEnvelope

Defined in:
lib/claude_memory/otel/otlp_json_envelope.rb

Overview

Pure functional core for OTLP/HTTP/JSON payloads. Walks the canonical OTLP envelope shapes (resourceMetrics → scopeMetrics → metrics →dataPoints; resourceLogs → scopeLogs → logRecords; resourceSpans →scopeSpans → spans), flattens KeyValue attribute arrays into Ruby hashes, and returns plain row hashes ready to insert.

No Time.now, no ENV reads, no DB. Pass a clock object that responds to ‘now` (or just `Time`) for fallback timestamps when the payload omits one. Required keys raise via Hash#fetch; optional containers default to empty arrays.

All public methods return Arrays of Hashes whose keys match the SQLiteStore.insert_otel_* helpers exactly.

Class Method Summary collapse

Class Method Details

.flatten_attributes(kv_array) ⇒ Object

Flatten OTel KeyValue array (‘[value: {stringValue: …}, …]`) into a plain Hash.



115
116
117
118
119
120
121
122
123
124
# File 'lib/claude_memory/otel/otlp_json_envelope.rb', line 115

def flatten_attributes(kv_array)
  result = {}
  Array(kv_array).each do |kv|
    next unless kv.is_a?(Hash)
    key = kv["key"]
    next if key.nil? || key.empty?
    result[key] = decode_any_value(kv["value"])
  end
  result
end

.parse_logs(payload, clock: Time) ⇒ Array<Hash>

Returns rows for SQLiteStore#insert_otel_event.

Parameters:

  • payload (Hash)

    parsed OTLP LogsServiceRequest JSON

  • clock (#now) (defaults to: Time)

Returns:

  • (Array<Hash>)

    rows for SQLiteStore#insert_otel_event



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/claude_memory/otel/otlp_json_envelope.rb', line 59

def parse_logs(payload, clock: Time)
  rows = []
  Array(payload["resourceLogs"]).each do |resource_log|
    resource = flatten_attributes(dig_attributes(resource_log["resource"]))
    Array(resource_log["scopeLogs"]).each do |scope_log|
      Array(scope_log["logRecords"]).each do |record|
        attributes = flatten_attributes(record["attributes"])
        rows << {
          event_name: event_name_for(record, attributes),
          occurred_at: timestamp_from_record(record, clock),
          session_id: attributes["session.id"],
          prompt_id: attributes["prompt.id"],
          attributes: attributes,
          resource: resource
        }
      end
    end
  end
  rows
end

.parse_metrics(payload, clock: Time) ⇒ Array<Hash>

Returns rows for SQLiteStore#insert_otel_metric.

Parameters:

  • payload (Hash)

    parsed OTLP MetricsServiceRequest JSON

  • clock (#now) (defaults to: Time)

    used for fallback when timeUnixNano is missing

Returns:

  • (Array<Hash>)

    rows for SQLiteStore#insert_otel_metric



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/claude_memory/otel/otlp_json_envelope.rb', line 26

def parse_metrics(payload, clock: Time)
  rows = []
  Array(payload["resourceMetrics"]).each do |resource_metric|
    resource = flatten_attributes(dig_attributes(resource_metric["resource"]))
    Array(resource_metric["scopeMetrics"]).each do |scope_metric|
      Array(scope_metric["metrics"]).each do |metric|
        metric_name = metric.fetch("name")
        unit = metric["unit"]
        data_points = collect_data_points(metric)
        data_points.each do |point|
          value_type, value_int, value_float = decode_metric_value(point)
          next if value_type.nil?

          rows << {
            name: metric_name,
            value_type: value_type,
            value_int: value_int,
            value_float: value_float,
            unit: unit,
            attributes: flatten_attributes(point["attributes"]),
            resource: resource,
            recorded_at: timestamp_from_point(point, clock)
          }
        end
      end
    end
  end
  rows
end

.parse_traces(payload, clock: Time) ⇒ Array<Hash>

Returns rows for SQLiteStore#insert_otel_trace_span.

Parameters:

  • payload (Hash)

    parsed OTLP TracesServiceRequest JSON

  • clock (#now) (defaults to: Time)

Returns:

  • (Array<Hash>)

    rows for SQLiteStore#insert_otel_trace_span



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/claude_memory/otel/otlp_json_envelope.rb', line 83

def parse_traces(payload, clock: Time)
  rows = []
  Array(payload["resourceSpans"]).each do |resource_span|
    resource = flatten_attributes(dig_attributes(resource_span["resource"]))
    Array(resource_span["scopeSpans"]).each do |scope_span|
      Array(scope_span["spans"]).each do |span|
        attributes = flatten_attributes(span["attributes"])
        start_nano = parse_unix_nano(span["startTimeUnixNano"])
        end_nano = parse_unix_nano(span["endTimeUnixNano"])
        rows << {
          trace_id: span.fetch("traceId"),
          span_id: span.fetch("spanId"),
          parent_span_id: span["parentSpanId"],
          name: span.fetch("name"),
          session_id: attributes["session.id"],
          prompt_id: attributes["prompt.id"],
          start_unix_nano: start_nano,
          end_unix_nano: end_nano,
          duration_ms: duration_ms_from(start_nano, end_nano),
          status_code: span.dig("status", "code")&.to_s,
          attributes: attributes,
          resource: resource,
          recorded_at: timestamp_from_unix_nano(start_nano, clock)
        }
      end
    end
  end
  rows
end