Module: Sashiko::Ractor

Defined in:
lib/sashiko/ractor.rb

Overview

Ractor-based parallel execution with span replay.

The problem: OpenTelemetry Ruby cannot emit spans inside a Ractor because its module state carries unshareable instance variables (mutexes, propagation). Upstream OTel has acknowledged this as a blocker for Ractor adoption.

The workaround: inside the Ractor, record spans as plain frozen Data values (no OTel dependency), send them via Ractor::Port to the main Ractor, and replay them there as real OTel spans with their original start/end timestamps and parent linkage.

Caveats: trace_id / span_id are assigned at replay time on the main side; OpenTelemetry::Baggage set inside the Ractor is not propagated out; sampling decisions happen at replay time.

Defined Under Namespace

Modules: Sink Classes: NonShareableReceiverError, Recorder, SpanEvent

Class Method Summary collapse

Class Method Details

.parallel_map(items, via:, tracer: nil) ⇒ Object

Map ‘method` over `items` in parallel Ractors. Each Ractor’s call is recorded as a root span (named after the method), plus any nested Sashiko::Ractor.span calls inside. All events are shipped back via Ractor::Port and replayed on the main Ractor under the current trace context — so the whole tree shows up as children of the span wrapping this parallel_map call.

Raises:

  • (ArgumentError)


109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/sashiko/ractor.rb', line 109

def parallel_map(items, via:, tracer: nil)
  raise ArgumentError, "via: must be a Method object" unless via.is_a?(Method)
  receiver    = via.receiver
  method_name = via.name
  unless ::Ractor.shareable?(receiver)
    raise NonShareableReceiverError,
      "method receiver #{receiver.inspect} must be Ractor-shareable (a Module or frozen class)"
  end
  root_name = "#{receiver}.#{method_name}"
  carrier   = Sashiko::Context.carrier
  # Resolve once on the main side so every replayed batch lands on
  # the same tracer (Box-local if the caller passed one).
  emit_tracer = tracer || Sashiko.tracer

  ports = items.each_with_index.map do |item, i|
    port = ::Ractor::Port.new
    ::Ractor.new(port, receiver, method_name, item, i, root_name, carrier) do |p, r, m, it, idx, rn, _c|
      Sashiko::Ractor::Recorder.install(Sashiko::Ractor::Recorder.new)
      result = nil
      error  = nil
      begin
        result = Sashiko::Ractor.span(rn, attributes: { "item.index" => idx }) do
          r.public_send(m, it)
        end
      rescue => e
        error = "#{e.class}: #{e.message}"
      end
      p.send([idx, result, Sashiko::Ractor::Recorder.drain_events!, error])
    end
    port
  end

  results = Array.new(items.size)
  errors  = [] #: Array[String]
  ports.size.times do
    idx, value, events, error = ports.shift.receive
    Sink.replay(events, parent_carrier: carrier, tracer: emit_tracer)
    if error
      errors << "item[#{idx}]: #{error}"
    else
      results[idx] = value
    end
  end
  raise "Ractor worker failures: #{errors.join("; ")}" unless errors.empty?
  results
end

.span(name, kind: :internal, attributes: nil) ⇒ Object

Record a nested span inside a Ractor worker. Produces a SpanEvent that will be replayed as an OTel span on the main Ractor. The API mirrors tracer.in_span, but it runs without the OTel runtime —see the module-level caveats about trace_id, baggage, and sampling.

module Compute
  def self.run(n)
    Sashiko::Ractor.span("phase1") { work1(n) }
    Sashiko::Ractor.span("phase2") { work2(n) }
  end
end


101
# File 'lib/sashiko/ractor.rb', line 101

def span(name, kind: :internal, attributes: nil, &) = Recorder.current.span(name, kind:, attributes:, &)