Class: Philiprehberger::TaskQueue::Queue
- Inherits:
-
Object
- Object
- Philiprehberger::TaskQueue::Queue
- Defined in:
- lib/philiprehberger/task_queue/queue.rb
Overview
In-process async job queue with concurrency control.
Tasks are enqueued as blocks or callable objects and executed by a pool of worker threads. The queue is fully thread-safe.
Instance Method Summary collapse
-
#clear ⇒ Integer
Remove all pending tasks from the queue.
-
#drain(timeout: 30) ⇒ void
Block until all pending tasks are complete without shutting down.
-
#empty? ⇒ Boolean
Whether there are no pending tasks waiting to be started.
-
#initialize(concurrency: 4) ⇒ Queue
constructor
A new instance of Queue.
-
#on_complete {|result| ... } ⇒ self
Register a callback invoked after each successful task completion.
-
#on_error {|exception, task| ... } ⇒ self
Register a callback invoked when a task raises an exception.
-
#pause ⇒ self
Pause the queue so workers stop dequeuing new tasks.
-
#paused? ⇒ Boolean
Whether the queue is currently paused.
-
#push(callable = nil) { ... } ⇒ self
(also: #<<)
Enqueue a task to be processed asynchronously.
-
#resume ⇒ self
Resume a paused queue, waking workers to continue processing.
-
#running? ⇒ Boolean
Whether the queue is accepting new tasks.
-
#shutdown(timeout: 30) ⇒ void
Gracefully shut down the queue.
-
#size ⇒ Integer
Number of pending (not yet started) tasks.
-
#stats ⇒ Hash{Symbol => Integer}
Return statistics about processed tasks.
-
#stats_reset! ⇒ self
Atomically zero the
completedandfailedcounters.
Constructor Details
#initialize(concurrency: 4) ⇒ Queue
Returns a new instance of Queue.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 13 def initialize(concurrency: 4) @concurrency = concurrency @tasks = [] @mutex = Mutex.new @condition = ConditionVariable.new @drain_condition = ConditionVariable.new @workers = [] @running = true @started = false @paused = false @pause_condition = ConditionVariable.new @error_handler = nil @complete_handler = nil @stats = { completed: 0, failed: 0, in_flight: 0 } end |
Instance Method Details
#clear ⇒ Integer
Remove all pending tasks from the queue.
95 96 97 98 99 100 101 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 95 def clear @mutex.synchronize do count = @tasks.size @tasks.clear count end end |
#drain(timeout: 30) ⇒ void
This method returns an undefined value.
Block until all pending tasks are complete without shutting down.
122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 122 def drain(timeout: 30) deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout @mutex.synchronize do while !@tasks.empty? || @stats[:in_flight].positive? remaining = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC) break if remaining <= 0 @drain_condition.wait(@mutex, remaining) end end nil end |
#empty? ⇒ Boolean
Whether there are no pending tasks waiting to be started.
In-flight tasks are not considered; use drain to wait for them.
169 170 171 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 169 def empty? @mutex.synchronize { @tasks.empty? } end |
#on_complete {|result| ... } ⇒ self
Register a callback invoked after each successful task completion.
The callback receives the return value of the completed task.
46 47 48 49 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 46 def on_complete(&block) @mutex.synchronize { @complete_handler = block } self end |
#on_error {|exception, task| ... } ⇒ self
Register a callback invoked when a task raises an exception.
The callback receives the exception and the task that raised it.
35 36 37 38 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 35 def on_error(&block) @mutex.synchronize { @error_handler = block } self end |
#pause ⇒ self
Pause the queue so workers stop dequeuing new tasks.
In-flight tasks will finish, but no new tasks will be picked up until resume is called.
67 68 69 70 71 72 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 67 def pause @mutex.synchronize do @paused = true end self end |
#paused? ⇒ Boolean
Whether the queue is currently paused.
88 89 90 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 88 def paused? @mutex.synchronize { @paused } end |
#push(callable = nil) { ... } ⇒ self Also known as: <<
Enqueue a task to be processed asynchronously.
140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 140 def push(callable = nil, &block) task = block || callable raise ArgumentError, 'a block is required' unless task @mutex.synchronize do raise 'queue is shut down' unless @running start_workers unless @started @tasks << task @condition.signal end self end |
#resume ⇒ self
Resume a paused queue, waking workers to continue processing.
77 78 79 80 81 82 83 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 77 def resume @mutex.synchronize do @paused = false @pause_condition.broadcast end self end |
#running? ⇒ Boolean
Whether the queue is accepting new tasks.
176 177 178 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 176 def running? @mutex.synchronize { @running } end |
#shutdown(timeout: 30) ⇒ void
This method returns an undefined value.
Gracefully shut down the queue.
Signals all workers to finish their current task and drain remaining tasks, then waits up to timeout seconds for threads to exit.
187 188 189 190 191 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 187 def shutdown(timeout: 30) signal_shutdown wait_for_workers(timeout) nil end |
#size ⇒ Integer
Number of pending (not yet started) tasks.
160 161 162 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 160 def size @mutex.synchronize { @tasks.size } end |
#stats ⇒ Hash{Symbol => Integer}
Return statistics about processed tasks.
54 55 56 57 58 59 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 54 def stats @mutex.synchronize do { completed: @stats[:completed], failed: @stats[:failed], pending: @tasks.size, in_flight: @stats[:in_flight] } end end |
#stats_reset! ⇒ self
Atomically zero the completed and failed counters.
Leaves pending, in_flight, worker threads, and registered callbacks untouched. Useful for resetting metrics between reporting intervals without shutting down the pool.
110 111 112 113 114 115 116 |
# File 'lib/philiprehberger/task_queue/queue.rb', line 110 def stats_reset! @mutex.synchronize do @stats[:completed] = 0 @stats[:failed] = 0 end self end |