Class: Philiprehberger::TaskQueue::Queue
- Inherits:
-
Object
- Object
- Philiprehberger::TaskQueue::Queue
- Defined in:
- lib/philiprehberger/task_queue/queue.rb
Overview
In-process async job queue with concurrency control.
Tasks are enqueued as blocks or callable objects and executed by a pool of worker threads. The queue is fully thread-safe.
Instance Attribute Summary collapse
-
#concurrency ⇒ Integer
readonly
The maximum number of concurrent worker threads.
Instance Method Summary collapse
-
#clear ⇒ Integer
Remove all pending tasks from the queue.
-
#drain(timeout: 30) ⇒ void
Block until all pending tasks are complete without shutting down.
-
#empty? ⇒ Boolean
Whether there are no pending tasks waiting to be started.
-
#initialize(concurrency: 4) ⇒ Queue
constructor
A new instance of Queue.
-
#on_complete {|result| ... } ⇒ self
Register a callback invoked after each successful task completion.
-
#on_error {|exception, task| ... } ⇒ self
Register a callback invoked when a task raises an exception.
-
#pause ⇒ self
Pause the queue so workers stop dequeuing new tasks.
-
#paused? ⇒ Boolean
Whether the queue is currently paused.
-
#push(callable = nil) { ... } ⇒ self
(also: #<<)
Enqueue a task to be processed asynchronously.
-
#resume ⇒ self
Resume a paused queue, waking workers to continue processing.
-
#running? ⇒ Boolean
Whether the queue is accepting new tasks.
-
#shutdown(timeout: 30) ⇒ void
Gracefully shut down the queue.
-
#size ⇒ Integer
Number of pending (not yet started) tasks.
-
#stats ⇒ Hash{Symbol => Integer}
Return statistics about processed tasks.
-
#stats_reset! ⇒ self
Atomically zero the
completedandfailedcounters.
Constructor Details
#initialize(concurrency: 4) ⇒ Queue
Returns a new instance of Queue.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 16 def initialize(concurrency: 4) @concurrency = concurrency @tasks = [] @mutex = Mutex.new @condition = ConditionVariable.new @drain_condition = ConditionVariable.new @workers = [] @running = true @started = false @paused = false @pause_condition = ConditionVariable.new @error_handler = nil @complete_handler = nil @stats = { completed: 0, failed: 0, in_flight: 0 } end |
Instance Attribute Details
#concurrency ⇒ Integer (readonly)
Returns the maximum number of concurrent worker threads.
13 14 15 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 13 def concurrency @concurrency end |
Instance Method Details
#clear ⇒ Integer
Remove all pending tasks from the queue.
98 99 100 101 102 103 104 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 98 def clear @mutex.synchronize do count = @tasks.size @tasks.clear count end end |
#drain(timeout: 30) ⇒ void
This method returns an undefined value.
Block until all pending tasks are complete without shutting down.
125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 125 def drain(timeout: 30) deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout @mutex.synchronize do while !@tasks.empty? || @stats[:in_flight].positive? remaining = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC) break if remaining <= 0 @drain_condition.wait(@mutex, remaining) end end nil end |
#empty? ⇒ Boolean
Whether there are no pending tasks waiting to be started.
In-flight tasks are not considered; use drain to wait for them.
172 173 174 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 172 def empty? @mutex.synchronize { @tasks.empty? } end |
#on_complete {|result| ... } ⇒ self
Register a callback invoked after each successful task completion.
The callback receives the return value of the completed task.
49 50 51 52 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 49 def on_complete(&block) @mutex.synchronize { @complete_handler = block } self end |
#on_error {|exception, task| ... } ⇒ self
Register a callback invoked when a task raises an exception.
The callback receives the exception and the task that raised it.
38 39 40 41 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 38 def on_error(&block) @mutex.synchronize { @error_handler = block } self end |
#pause ⇒ self
Pause the queue so workers stop dequeuing new tasks.
In-flight tasks will finish, but no new tasks will be picked up until resume is called.
70 71 72 73 74 75 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 70 def pause @mutex.synchronize do @paused = true end self end |
#paused? ⇒ Boolean
Whether the queue is currently paused.
91 92 93 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 91 def paused? @mutex.synchronize { @paused } end |
#push(callable = nil) { ... } ⇒ self Also known as: <<
Enqueue a task to be processed asynchronously.
143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 143 def push(callable = nil, &block) task = block || callable raise ArgumentError, 'a block is required' unless task @mutex.synchronize do raise 'queue is shut down' unless @running start_workers unless @started @tasks << task @condition.signal end self end |
#resume ⇒ self
Resume a paused queue, waking workers to continue processing.
80 81 82 83 84 85 86 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 80 def resume @mutex.synchronize do @paused = false @pause_condition.broadcast end self end |
#running? ⇒ Boolean
Whether the queue is accepting new tasks.
179 180 181 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 179 def running? @mutex.synchronize { @running } end |
#shutdown(timeout: 30) ⇒ void
This method returns an undefined value.
Gracefully shut down the queue.
Signals all workers to finish their current task and drain remaining tasks, then waits up to timeout seconds for threads to exit.
190 191 192 193 194 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 190 def shutdown(timeout: 30) signal_shutdown wait_for_workers(timeout) nil end |
#size ⇒ Integer
Number of pending (not yet started) tasks.
163 164 165 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 163 def size @mutex.synchronize { @tasks.size } end |
#stats ⇒ Hash{Symbol => Integer}
Return statistics about processed tasks.
57 58 59 60 61 62 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 57 def stats @mutex.synchronize do { completed: @stats[:completed], failed: @stats[:failed], pending: @tasks.size, in_flight: @stats[:in_flight] } end end |
#stats_reset! ⇒ self
Atomically zero the completed and failed counters.
Leaves pending, in_flight, worker threads, and registered callbacks untouched. Useful for resetting metrics between reporting intervals without shutting down the pool.
113 114 115 116 117 118 119 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 113 def stats_reset! @mutex.synchronize do @stats[:completed] = 0 @stats[:failed] = 0 end self end |