Module: ClaudeMemory::OTel::OtlpJsonEnvelope
- Defined in:
- lib/claude_memory/otel/otlp_json_envelope.rb
Overview
Pure functional core for OTLP/HTTP/JSON payloads. Walks the canonical OTLP envelope shapes (resourceMetrics → scopeMetrics → metrics →dataPoints; resourceLogs → scopeLogs → logRecords; resourceSpans →scopeSpans → spans), flattens KeyValue attribute arrays into Ruby hashes, and returns plain row hashes ready to insert.
No Time.now, no ENV reads, no DB. Pass a clock object that responds to ‘now` (or just `Time`) for fallback timestamps when the payload omits one. Required keys raise via Hash#fetch; optional containers default to empty arrays.
All public methods return Arrays of Hashes whose keys match the SQLiteStore.insert_otel_* helpers exactly.
Class Method Summary collapse
-
.flatten_attributes(kv_array) ⇒ Object
Flatten OTel KeyValue array (‘[value: {stringValue: …}, …]`) into a plain Hash.
-
.parse_logs(payload, clock: Time) ⇒ Array<Hash>
Rows for SQLiteStore#insert_otel_event.
-
.parse_metrics(payload, clock: Time) ⇒ Array<Hash>
Rows for SQLiteStore#insert_otel_metric.
-
.parse_traces(payload, clock: Time) ⇒ Array<Hash>
Rows for SQLiteStore#insert_otel_trace_span.
Class Method Details
.flatten_attributes(kv_array) ⇒ Object
Flatten OTel KeyValue array (‘[value: {stringValue: …}, …]`) into a plain Hash.
115 116 117 118 119 120 121 122 123 124 |
# File 'lib/claude_memory/otel/otlp_json_envelope.rb', line 115 def flatten_attributes(kv_array) result = {} Array(kv_array).each do |kv| next unless kv.is_a?(Hash) key = kv["key"] next if key.nil? || key.empty? result[key] = decode_any_value(kv["value"]) end result end |
.parse_logs(payload, clock: Time) ⇒ Array<Hash>
Returns rows for SQLiteStore#insert_otel_event.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/claude_memory/otel/otlp_json_envelope.rb', line 59 def parse_logs(payload, clock: Time) rows = [] Array(payload["resourceLogs"]).each do |resource_log| resource = flatten_attributes(dig_attributes(resource_log["resource"])) Array(resource_log["scopeLogs"]).each do |scope_log| Array(scope_log["logRecords"]).each do |record| attributes = flatten_attributes(record["attributes"]) rows << { event_name: event_name_for(record, attributes), occurred_at: (record, clock), session_id: attributes["session.id"], prompt_id: attributes["prompt.id"], attributes: attributes, resource: resource } end end end rows end |
.parse_metrics(payload, clock: Time) ⇒ Array<Hash>
Returns rows for SQLiteStore#insert_otel_metric.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/claude_memory/otel/otlp_json_envelope.rb', line 26 def parse_metrics(payload, clock: Time) rows = [] Array(payload["resourceMetrics"]).each do |resource_metric| resource = flatten_attributes(dig_attributes(resource_metric["resource"])) Array(resource_metric["scopeMetrics"]).each do |scope_metric| Array(scope_metric["metrics"]).each do |metric| metric_name = metric.fetch("name") unit = metric["unit"] data_points = collect_data_points(metric) data_points.each do |point| value_type, value_int, value_float = decode_metric_value(point) next if value_type.nil? rows << { name: metric_name, value_type: value_type, value_int: value_int, value_float: value_float, unit: unit, attributes: flatten_attributes(point["attributes"]), resource: resource, recorded_at: (point, clock) } end end end end rows end |
.parse_traces(payload, clock: Time) ⇒ Array<Hash>
Returns rows for SQLiteStore#insert_otel_trace_span.
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/claude_memory/otel/otlp_json_envelope.rb', line 83 def parse_traces(payload, clock: Time) rows = [] Array(payload["resourceSpans"]).each do |resource_span| resource = flatten_attributes(dig_attributes(resource_span["resource"])) Array(resource_span["scopeSpans"]).each do |scope_span| Array(scope_span["spans"]).each do |span| attributes = flatten_attributes(span["attributes"]) start_nano = parse_unix_nano(span["startTimeUnixNano"]) end_nano = parse_unix_nano(span["endTimeUnixNano"]) rows << { trace_id: span.fetch("traceId"), span_id: span.fetch("spanId"), parent_span_id: span["parentSpanId"], name: span.fetch("name"), session_id: attributes["session.id"], prompt_id: attributes["prompt.id"], start_unix_nano: start_nano, end_unix_nano: end_nano, duration_ms: duration_ms_from(start_nano, end_nano), status_code: span.dig("status", "code")&.to_s, attributes: attributes, resource: resource, recorded_at: (start_nano, clock) } end end end rows end |