Module: Vivarium::OtelExporter
- Defined in:
- lib/vivarium/otel_exporter.rb
Overview
Converts a captured event stream into an OTLP/JSON ResourceSpans document.
Two span layers are emitted (see plan): a base thread/process span per tid (identified by the BPF-issued span_id) and method-call spans reconstructed from span_start/span_stop. Every other event becomes an OTel span event on the innermost active span. Method span ids come from Vivarium.synth_span_id so they match the report –dump-otel view.
Constant Summary collapse
- SPAN_KIND_INTERNAL =
1- SERVICE_NAME =
"vivarium"- INTERNAL_COMM_MATCH =
[/otel_stream\.rb/, /correlator\.rb/].freeze
- CONTROL_EVENT_NAMES =
%w[proc_exit].freeze
Class Method Summary collapse
- .build_document(events:, meta:) ⇒ Object
- .build_span_event(ev, to_unix) ⇒ Object
-
.build_spans(events:, meta:) ⇒ Object
Returns an array of OTLP span hashes.
- .control_event?(event_name) ⇒ Boolean
-
.dump(io, events:, meta:) ⇒ Object
io: a writable IO.
-
.event_attributes(ev) ⇒ Object
OTLP attribute list describing an event (target/severity/uid/gid/comm/…).
- .hex16(value) ⇒ Object
- .hex32(hi, lo) ⇒ Object
- .int_attr(key, value) ⇒ Object
-
.internal_comm?(comm) ⇒ Boolean
— helpers ————————————————————-.
- .iso_to_unix_ns(iso) ⇒ Object
- .method_span_hash(rec, to_unix) ⇒ Object
-
.new_thread_span(ev, main_tid) ⇒ Object
— span record construction ——————————————-.
- .read_span_payload(payload) ⇒ Object
- .span_hash(trace_hi:, trace_lo:, span_id:, parent:, name:, start_k:, stop_k:, to_unix:, attributes:, events:) ⇒ Object
- .str_attr(key, value) ⇒ Object
- .thread_span_hash(ts, start_ktime, stop_ktime, to_unix) ⇒ Object
-
.wrap_document(spans) ⇒ Object
Wraps a list of OTLP span hashes in the ResourceSpans envelope.
Class Method Details
.build_document(events:, meta:) ⇒ Object
27 28 29 |
# File 'lib/vivarium/otel_exporter.rb', line 27 def build_document(events:, meta:) wrap_document(build_spans(events: events, meta: )) end |
.build_span_event(ev, to_unix) ⇒ Object
161 162 163 |
# File 'lib/vivarium/otel_exporter.rb', line 161 def build_span_event(ev, to_unix) { timeUnixNano: to_unix.call(ev.ktime_ns), name: ev.event_name, attributes: event_attributes(ev) } end |
.build_spans(events:, meta:) ⇒ Object
Returns an array of OTLP span hashes.
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/vivarium/otel_exporter.rb', line 51 def build_spans(events:, meta:) sorted = events.sort_by { |e| [e.ktime_ns, e.pid, e.tid] } start_ktime = [:session_start_ktime].to_i stop_ktime = [:session_stop_ktime].to_i start_unix = iso_to_unix_ns([:session_start_iso]) main_tid = [:main_tid] to_unix = ->(k) { (start_unix + (k.to_i - start_ktime)).to_s } thread_spans = {} # tid => mutable span record method_spans = [] # all method span records stacks = Hash.new { |h, k| h[k] = [] } # tid => [method span record, ...] sorted.each do |ev| next if internal_comm?(ev.comm) next if control_event?(ev.event_name) ts = (thread_spans[ev.tid] ||= new_thread_span(ev, main_tid)) ts[:comm] = ev.comm.to_s unless ev.comm.to_s.empty? ts[:min_k] = ev.ktime_ns if ev.ktime_ns < ts[:min_k] ts[:max_k] = ev.ktime_ns if ev.ktime_ns > ts[:max_k] stack = stacks[ev.tid] case ev.event_name when "span_start" name, file, lineno = read_span_payload(ev.payload) parent = stack.empty? ? ts[:span_id] : stack.last[:span_id] rec = { tid: ev.tid, pid: ev.pid, span_id: Vivarium.synth_span_id(ev.trace_hi.to_i, ev.trace_lo.to_i, ev.tid, ev.ktime_ns), trace_hi: ev.trace_hi.to_i, trace_lo: ev.trace_lo.to_i, parent: parent, name: (name.nil? || name.empty? ? "<anonymous>" : name), file: file, lineno: lineno, start_k: ev.ktime_ns, stop_k: nil, events: [] } method_spans << rec stack.push(rec) when "span_stop" rec = stack.pop rec[:stop_k] = ev.ktime_ns if rec else host = stack.empty? ? ts : stack.last host[:events] << build_span_event(ev, to_unix) end end method_spans.each { |rec| rec[:stop_k] ||= stop_ktime } out = [] thread_spans.each_value { |ts| out << thread_span_hash(ts, start_ktime, stop_ktime, to_unix) } method_spans.each { |rec| out << method_span_hash(rec, to_unix) } out end |
.control_event?(event_name) ⇒ Boolean
193 194 195 |
# File 'lib/vivarium/otel_exporter.rb', line 193 def control_event?(event_name) CONTROL_EVENT_NAMES.include?(event_name.to_s) end |
.dump(io, events:, meta:) ⇒ Object
io: a writable IO. Writes a single-line OTLP/JSON document.
23 24 25 |
# File 'lib/vivarium/otel_exporter.rb', line 23 def dump(io, events:, meta:) io.write(JSON.generate(build_document(events: events, meta: ))) end |
.event_attributes(ev) ⇒ Object
OTLP attribute list describing an event (target/severity/uid/gid/comm/…). Reused for both span events and standalone single-event spans (streaming).
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/vivarium/otel_exporter.rb', line 167 def event_attributes(ev) target = begin Vivarium.render_event_payload(ev).to_s.gsub(/\s+/, " ").strip rescue StandardError "" end attrs = [ int_attr("thread.id", ev.tid), int_attr("process.pid", ev.pid), int_attr("user.id", ev.uid), int_attr("group.id", ev.gid), str_attr("severity", Vivarium.event_severity(ev.event_name)) ] attrs << str_attr("process.command", ev.comm.to_s) unless ev.comm.to_s.empty? attrs << str_attr("target", target) unless target.empty? attrs end |
.hex16(value) ⇒ Object
219 220 221 |
# File 'lib/vivarium/otel_exporter.rb', line 219 def hex16(value) format("%016x", value.to_i & Vivarium::U64_MASK) end |
.hex32(hi, lo) ⇒ Object
215 216 217 |
# File 'lib/vivarium/otel_exporter.rb', line 215 def hex32(hi, lo) format("%016x%016x", hi.to_i & Vivarium::U64_MASK, lo.to_i & Vivarium::U64_MASK) end |
.int_attr(key, value) ⇒ Object
227 228 229 |
# File 'lib/vivarium/otel_exporter.rb', line 227 def int_attr(key, value) { key: key, value: { intValue: value.to_i.to_s } } end |
.internal_comm?(comm) ⇒ Boolean
— helpers ————————————————————-
188 189 190 191 |
# File 'lib/vivarium/otel_exporter.rb', line 188 def internal_comm?(comm) value = comm.to_s INTERNAL_COMM_MATCH.any? { |regex| value.match?(regex) } end |
.iso_to_unix_ns(iso) ⇒ Object
207 208 209 210 211 212 213 |
# File 'lib/vivarium/otel_exporter.rb', line 207 def iso_to_unix_ns(iso) return 0 if iso.nil? || iso.to_s.empty? (Time.iso8601(iso).to_r * 1_000_000_000).to_i rescue ArgumentError 0 end |
.method_span_hash(rec, to_unix) ⇒ Object
134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/vivarium/otel_exporter.rb', line 134 def method_span_hash(rec, to_unix) attrs = [int_attr("thread.id", rec[:tid]), int_attr("process.pid", rec[:pid])] attrs << str_attr("code.filepath", rec[:file]) if rec[:file] && !rec[:file].empty? attrs << int_attr("code.lineno", rec[:lineno]) if rec[:lineno] && rec[:lineno] > 0 span_hash( trace_hi: rec[:trace_hi], trace_lo: rec[:trace_lo], span_id: rec[:span_id], parent: rec[:parent], name: rec[:name], start_k: rec[:start_k], stop_k: rec[:stop_k], to_unix: to_unix, attributes: attrs, events: rec[:events] ) end |
.new_thread_span(ev, main_tid) ⇒ Object
— span record construction ——————————————-
105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/vivarium/otel_exporter.rb', line 105 def new_thread_span(ev, main_tid) span_id = ev.span_id.to_i span_id = Vivarium.synth_span_id(ev.trace_hi.to_i, ev.trace_lo.to_i, ev.tid, ev.ktime_ns) if span_id.zero? { tid: ev.tid, pid: ev.pid, span_id: span_id, trace_hi: ev.trace_hi.to_i, trace_lo: ev.trace_lo.to_i, parent: ev.parent_span_id.to_i, comm: ev.comm.to_s, root: ev.tid == main_tid, min_k: ev.ktime_ns, max_k: ev.ktime_ns, events: [] } end |
.read_span_payload(payload) ⇒ Object
197 198 199 200 201 202 203 204 205 |
# File 'lib/vivarium/otel_exporter.rb', line 197 def read_span_payload(payload) bytes = payload.to_s.b return [nil, nil, -1] if bytes.empty? name = Vivarium.c_string(bytes[0, Vivarium::SPAN_METHOD_SIZE]) file = Vivarium.c_string(bytes[Vivarium::SPAN_METHOD_SIZE, Vivarium::SPAN_FILE_SIZE]) lineno = bytes.bytesize > Vivarium::SPAN_LINENO_OFFSET ? bytes[Vivarium::SPAN_LINENO_OFFSET, 8].unpack1("q<") : -1 [name, file, lineno] end |
.span_hash(trace_hi:, trace_lo:, span_id:, parent:, name:, start_k:, stop_k:, to_unix:, attributes:, events:) ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/vivarium/otel_exporter.rb', line 146 def span_hash(trace_hi:, trace_lo:, span_id:, parent:, name:, start_k:, stop_k:, to_unix:, attributes:, events:) hash = { traceId: hex32(trace_hi, trace_lo), spanId: hex16(span_id), name: name, kind: SPAN_KIND_INTERNAL, startTimeUnixNano: to_unix.call(start_k), endTimeUnixNano: to_unix.call(stop_k), attributes: attributes } hash[:parentSpanId] = hex16(parent) unless parent.to_i.zero? hash[:events] = events unless events.empty? hash end |
.str_attr(key, value) ⇒ Object
223 224 225 |
# File 'lib/vivarium/otel_exporter.rb', line 223 def str_attr(key, value) { key: key, value: { stringValue: value.to_s } } end |
.thread_span_hash(ts, start_ktime, stop_ktime, to_unix) ⇒ Object
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/vivarium/otel_exporter.rb', line 117 def thread_span_hash(ts, start_ktime, stop_ktime, to_unix) start_k = ts[:root] ? start_ktime : ts[:min_k] stop_k = ts[:root] ? stop_ktime : ts[:max_k] name = ts[:comm].empty? ? "tid=#{ts[:tid]}" : ts[:comm] span_hash( trace_hi: ts[:trace_hi], trace_lo: ts[:trace_lo], span_id: ts[:span_id], parent: ts[:parent], name: name, start_k: start_k, stop_k: stop_k, to_unix: to_unix, attributes: [ int_attr("thread.id", ts[:tid]), int_attr("process.pid", ts[:pid]), str_attr("process.command", ts[:comm]) ], events: ts[:events] || [] ) end |
.wrap_document(spans) ⇒ Object
Wraps a list of OTLP span hashes in the ResourceSpans envelope. Shared by the batch file exporter and the streaming HTTP exporter so both emit the same resource/scope.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/vivarium/otel_exporter.rb', line 34 def wrap_document(spans) { resourceSpans: [ { resource: { attributes: [str_attr("service.name", SERVICE_NAME)] }, scopeSpans: [ { scope: { name: SERVICE_NAME, version: Vivarium::VERSION }, spans: spans } ] } ] } end |