Class: OpenTrace::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/opentrace/pipeline.rb

Overview

Multi-worker dispatch pipeline that processes batches in parallel. Each worker thread has its own HTTP connection and processes batches independently.

Why threads instead of Ractors:

  • Ruby’s GVL is released during I/O (HTTP send) – threads get true parallelism

  • Ractors are still experimental and most gems aren’t Ractor-safe

  • The bottleneck is I/O (HTTP round-trips), not CPU (serialization)

Constant Summary collapse

DEFAULT_WORKERS =
2
MAX_WORKERS =
8

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, worker_count: DEFAULT_WORKERS) ⇒ Pipeline

Returns a new instance of Pipeline.



25
26
27
28
29
30
31
32
33
# File 'lib/opentrace/pipeline.rb', line 25

def initialize(config, worker_count: DEFAULT_WORKERS)
  @config = config
  @worker_count = [[worker_count, 1].max, MAX_WORKERS].min
  @queue = Queue.new  # Thread::Queue for distributing batches to workers
  @workers = []
  @running = false
  @mutex = Mutex.new
  @stats = PipelineStats.new
end

Instance Attribute Details

#worker_countObject (readonly)

Returns the value of attribute worker_count.



23
24
25
# File 'lib/opentrace/pipeline.rb', line 23

def worker_count
  @worker_count
end

Instance Method Details

#running?Boolean

Returns:

  • (Boolean)


67
68
69
# File 'lib/opentrace/pipeline.rb', line 67

def running?
  @running
end

#startObject

Start the worker pool



36
37
38
39
40
41
42
# File 'lib/opentrace/pipeline.rb', line 36

def start
  @mutex.synchronize do
    return if @running
    @running = true
    @workers = @worker_count.times.map { |i| spawn_worker(i) }
  end
end

#statsObject



71
72
73
# File 'lib/opentrace/pipeline.rb', line 71

def stats
  @stats.to_h
end

#stop(timeout: 5) ⇒ Object

Graceful shutdown – process remaining batches then stop



53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/opentrace/pipeline.rb', line 53

def stop(timeout: 5)
  @mutex.synchronize do
    return unless @running
    @running = false
    @worker_count.times { @queue.push(:shutdown) }
  end
  deadline = Time.now + timeout
  @workers.each do |w|
    remaining = deadline - Time.now
    w.join([remaining, 0].max) if remaining > 0
  end
  @workers = []
end

#submit(batch) ⇒ Object

Submit a batch of raw entries for processing Each batch is: [raw_entry1, raw_entry2, …]



46
47
48
49
50
# File 'lib/opentrace/pipeline.rb', line 46

def submit(batch)
  return unless @running
  @queue.push(batch)
  @stats.increment(:batches_submitted)
end