Class: Brute::Queue::BaseQueue
- Inherits:
-
Object
- Object
- Brute::Queue::BaseQueue
- Defined in:
- lib/brute/queue/base_queue.rb
Overview
A queue that dequeues Step objects and runs them, honoring cancellation.
Composes four async primitives:
- An inbox (Async::Queue) that holds pending steps
- A barrier (Async::Barrier) that tracks every task the queue spawns
- A semaphore (Async::Semaphore) parented to the barrier, limiting concurrency
- Workers — long-lived tasks that dequeue from the inbox and run steps
The barrier-semaphore composition via parent: means every task the semaphore spawns is also tracked by the barrier. One call site (semaphore.async), two guarantees (scoped lifetime + bounded concurrency).
Direct Known Subclasses
Instance Attribute Summary collapse
-
#steps ⇒ Object
readonly
Returns the value of attribute steps.
Instance Method Summary collapse
- #<<(step) ⇒ Object
-
#cancel ⇒ Object
Hard: close inbox, cancel pending steps, cancel running work.
-
#drain ⇒ Object
Graceful: stop accepting, wait for running work to finish.
- #first ⇒ Object
-
#initialize(concurrency:, worker_count:, parent: Async::Task.current) ⇒ BaseQueue
constructor
A new instance of BaseQueue.
- #last ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize(concurrency:, worker_count:, parent: Async::Task.current) ⇒ BaseQueue
Returns a new instance of BaseQueue.
28 29 30 31 32 33 34 35 |
# File 'lib/brute/queue/base_queue.rb', line 28 def initialize(concurrency:, worker_count:, parent: Async::Task.current) @steps = [] @inbox = Async::Queue.new @barrier = Async::Barrier.new(parent: parent) @semaphore = Async::Semaphore.new(concurrency, parent: @barrier) @worker_count = worker_count @started = false end |
Instance Attribute Details
#steps ⇒ Object (readonly)
Returns the value of attribute steps.
26 27 28 |
# File 'lib/brute/queue/base_queue.rb', line 26 def steps @steps end |
Instance Method Details
#<<(step) ⇒ Object
37 38 39 40 41 |
# File 'lib/brute/queue/base_queue.rb', line 37 def <<(step) @steps << step @inbox.push(step) self end |
#cancel ⇒ Object
Hard: close inbox, cancel pending steps, cancel running work.
69 70 71 72 73 74 75 |
# File 'lib/brute/queue/base_queue.rb', line 69 def cancel @inbox.close @steps.each do |step| step.cancel if step.state == :pending end @barrier.cancel end |
#drain ⇒ Object
Graceful: stop accepting, wait for running work to finish.
63 64 65 66 |
# File 'lib/brute/queue/base_queue.rb', line 63 def drain @inbox.close @barrier.wait end |
#first ⇒ Object
43 |
# File 'lib/brute/queue/base_queue.rb', line 43 def first = @steps.first |
#last ⇒ Object
44 |
# File 'lib/brute/queue/base_queue.rb', line 44 def last = @steps.last |
#start ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/brute/queue/base_queue.rb', line 46 def start return self if @started @started = true @worker_count.times do @barrier.async do while (step = @inbox.dequeue) @semaphore.async do |task| step.call(task) end end end end self end |