Class: Brute::Queue::BaseQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/brute/queue/base_queue.rb

Overview

A queue that dequeues Step objects and runs them, honoring cancellation.

Composes four async primitives:

- An inbox (Async::Queue) that holds pending steps
- A barrier (Async::Barrier) that tracks every task the queue spawns
- A semaphore (Async::Semaphore) parented to the barrier, limiting concurrency
- Workers — long-lived tasks that dequeue from the inbox and run steps

The barrier-semaphore composition via parent: means every task the semaphore spawns is also tracked by the barrier. One call site (semaphore.async), two guarantees (scoped lifetime + bounded concurrency).

Direct Known Subclasses

ParallelQueue, SequentialQueue

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(concurrency:, worker_count:, parent: Async::Task.current) ⇒ BaseQueue

Returns a new instance of BaseQueue.



28
29
30
31
32
33
34
35
# File 'lib/brute/queue/base_queue.rb', line 28

def initialize(concurrency:, worker_count:, parent: Async::Task.current)
  @steps        = []
  @inbox        = Async::Queue.new
  @barrier      = Async::Barrier.new(parent: parent)
  @semaphore    = Async::Semaphore.new(concurrency, parent: @barrier)
  @worker_count = worker_count
  @started      = false
end

Instance Attribute Details

#stepsObject (readonly)

Returns the value of attribute steps.



26
27
28
# File 'lib/brute/queue/base_queue.rb', line 26

def steps
  @steps
end

Instance Method Details

#<<(step) ⇒ Object



37
38
39
40
41
# File 'lib/brute/queue/base_queue.rb', line 37

def <<(step)
  @steps << step
  @inbox.push(step)
  self
end

#cancelObject

Hard: close inbox, cancel pending steps, cancel running work.



69
70
71
72
73
74
75
# File 'lib/brute/queue/base_queue.rb', line 69

def cancel
  @inbox.close
  @steps.each do |step|
    step.cancel if step.state == :pending
  end
  @barrier.cancel
end

#drainObject

Graceful: stop accepting, wait for running work to finish.



63
64
65
66
# File 'lib/brute/queue/base_queue.rb', line 63

def drain
  @inbox.close
  @barrier.wait
end

#firstObject



43
# File 'lib/brute/queue/base_queue.rb', line 43

def first = @steps.first

#lastObject



44
# File 'lib/brute/queue/base_queue.rb', line 44

def last  = @steps.last

#startObject



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/brute/queue/base_queue.rb', line 46

def start
  return self if @started
  @started = true

  @worker_count.times do
    @barrier.async do
      while (step = @inbox.dequeue)
        @semaphore.async do |task|
          step.call(task)
        end
      end
    end
  end
  self
end