Class: OpenTrace::Pipeline
- Inherits:
-
Object
- Object
- OpenTrace::Pipeline
- Defined in:
- lib/opentrace/pipeline.rb
Overview
Multi-worker dispatch pipeline that processes batches in parallel. Each worker thread has its own HTTP connection and processes batches independently.
Why threads instead of Ractors:
-
Ruby’s GVL is released during I/O (HTTP send) – threads get true parallelism
-
Ractors are still experimental and most gems aren’t Ractor-safe
-
The bottleneck is I/O (HTTP round-trips), not CPU (serialization)
Constant Summary collapse
- DEFAULT_WORKERS =
2- MAX_WORKERS =
8
Instance Attribute Summary collapse
-
#worker_count ⇒ Object
readonly
Returns the value of attribute worker_count.
Instance Method Summary collapse
-
#initialize(config, worker_count: DEFAULT_WORKERS) ⇒ Pipeline
constructor
A new instance of Pipeline.
- #running? ⇒ Boolean
-
#start ⇒ Object
Start the worker pool.
- #stats ⇒ Object
-
#stop(timeout: 5) ⇒ Object
Graceful shutdown – process remaining batches then stop.
-
#submit(batch) ⇒ Object
Submit a batch of raw entries for processing Each batch is: [raw_entry1, raw_entry2, …].
Constructor Details
#initialize(config, worker_count: DEFAULT_WORKERS) ⇒ Pipeline
Returns a new instance of Pipeline.
25 26 27 28 29 30 31 32 33 |
# File 'lib/opentrace/pipeline.rb', line 25 def initialize(config, worker_count: DEFAULT_WORKERS) @config = config @worker_count = [[worker_count, 1].max, MAX_WORKERS].min @queue = Queue.new # Thread::Queue for distributing batches to workers @workers = [] @running = false @mutex = Mutex.new @stats = PipelineStats.new end |
Instance Attribute Details
#worker_count ⇒ Object (readonly)
Returns the value of attribute worker_count.
23 24 25 |
# File 'lib/opentrace/pipeline.rb', line 23 def worker_count @worker_count end |
Instance Method Details
#running? ⇒ Boolean
67 68 69 |
# File 'lib/opentrace/pipeline.rb', line 67 def running? @running end |
#start ⇒ Object
Start the worker pool
36 37 38 39 40 41 42 |
# File 'lib/opentrace/pipeline.rb', line 36 def start @mutex.synchronize do return if @running @running = true @workers = @worker_count.times.map { |i| spawn_worker(i) } end end |
#stats ⇒ Object
71 72 73 |
# File 'lib/opentrace/pipeline.rb', line 71 def stats @stats.to_h end |
#stop(timeout: 5) ⇒ Object
Graceful shutdown – process remaining batches then stop
53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/opentrace/pipeline.rb', line 53 def stop(timeout: 5) @mutex.synchronize do return unless @running @running = false @worker_count.times { @queue.push(:shutdown) } end deadline = Time.now + timeout @workers.each do |w| remaining = deadline - Time.now w.join([remaining, 0].max) if remaining > 0 end @workers = [] end |
#submit(batch) ⇒ Object
Submit a batch of raw entries for processing Each batch is: [raw_entry1, raw_entry2, …]
46 47 48 49 50 |
# File 'lib/opentrace/pipeline.rb', line 46 def submit(batch) return unless @running @queue.push(batch) @stats.increment(:batches_submitted) end |