Module: Sashiko::Ractor
- Defined in:
- lib/sashiko/ractor.rb
Overview
Ractor-based parallel execution with span replay.
The problem: OpenTelemetry Ruby cannot emit spans inside a Ractor because its module state carries unshareable instance variables (mutexes, propagation). Upstream OTel has acknowledged this as a blocker for Ractor adoption.
The workaround: inside the Ractor, record spans as plain frozen Data values (no OTel dependency), send them via Ractor::Port to the main Ractor, and replay them there as real OTel spans with their original start/end timestamps and parent linkage.
Caveats: trace_id / span_id are assigned at replay time on the main side; OpenTelemetry::Baggage set inside the Ractor is not propagated out; sampling decisions happen at replay time.
Defined Under Namespace
Modules: Sink Classes: NonShareableReceiverError, Recorder, SpanEvent
Class Method Summary collapse
-
.parallel_map(items, via:, tracer: nil) ⇒ Object
Map ‘method` over `items` in parallel Ractors.
-
.span(name, kind: :internal, attributes: nil) ⇒ Object
Record a nested span inside a Ractor worker.
Class Method Details
.parallel_map(items, via:, tracer: nil) ⇒ Object
Map ‘method` over `items` in parallel Ractors. Each Ractor’s call is recorded as a root span (named after the method), plus any nested Sashiko::Ractor.span calls inside. All events are shipped back via Ractor::Port and replayed on the main Ractor under the current trace context — so the whole tree shows up as children of the span wrapping this parallel_map call.
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/sashiko/ractor.rb', line 109 def parallel_map(items, via:, tracer: nil) raise ArgumentError, "via: must be a Method object" unless via.is_a?(Method) receiver = via.receiver method_name = via.name unless ::Ractor.shareable?(receiver) raise NonShareableReceiverError, "method receiver #{receiver.inspect} must be Ractor-shareable (a Module or frozen class)" end root_name = "#{receiver}.#{method_name}" carrier = Sashiko::Context.carrier # Resolve once on the main side so every replayed batch lands on # the same tracer (Box-local if the caller passed one). emit_tracer = tracer || Sashiko.tracer ports = items.each_with_index.map do |item, i| port = ::Ractor::Port.new ::Ractor.new(port, receiver, method_name, item, i, root_name, carrier) do |p, r, m, it, idx, rn, _c| Sashiko::Ractor::Recorder.install(Sashiko::Ractor::Recorder.new) result = nil error = nil begin result = Sashiko::Ractor.span(rn, attributes: { "item.index" => idx }) do r.public_send(m, it) end rescue => e error = "#{e.class}: #{e.}" end p.send([idx, result, Sashiko::Ractor::Recorder.drain_events!, error]) end port end results = Array.new(items.size) errors = [] #: Array[String] ports.size.times do idx, value, events, error = ports.shift.receive Sink.replay(events, parent_carrier: carrier, tracer: emit_tracer) if error errors << "item[#{idx}]: #{error}" else results[idx] = value end end raise "Ractor worker failures: #{errors.join("; ")}" unless errors.empty? results end |
.span(name, kind: :internal, attributes: nil) ⇒ Object
Record a nested span inside a Ractor worker. Produces a SpanEvent that will be replayed as an OTel span on the main Ractor. The API mirrors tracer.in_span, but it runs without the OTel runtime —see the module-level caveats about trace_id, baggage, and sampling.
module Compute
def self.run(n)
Sashiko::Ractor.span("phase1") { work1(n) }
Sashiko::Ractor.span("phase2") { work2(n) }
end
end
101 |
# File 'lib/sashiko/ractor.rb', line 101 def span(name, kind: :internal, attributes: nil, &) = Recorder.current.span(name, kind:, attributes:, &) |