philiprehberger-task_queue
In-process async job queue with concurrency control
Requirements
- Ruby >= 3.1
Installation
Add to your Gemfile:
gem "philiprehberger-task_queue"
Or install directly:
gem install philiprehberger-task_queue
Usage
require "philiprehberger/task_queue"
queue = Philiprehberger::TaskQueue.new(concurrency: 4)
10.times do |i|
queue.push { puts "Processing job #{i}" }
end
puts queue.size # number of pending tasks
puts queue.running? # => true
queue.shutdown(timeout: 30)
Using the << alias
queue << -> { puts "Hello from a task!" }
Error handling
Register a callback to handle exceptions raised inside tasks. The callback receives the exception and the original task (callable) that failed. Unhandled errors are silently swallowed when no callback is registered.
queue = Philiprehberger::TaskQueue.new(concurrency: 2)
queue.on_error do |exception, task|
warn "[TaskQueue] #{exception.class}: #{exception.}"
warn exception.backtrace.first(5).join("\n")
end
queue.push { Integer("not_a_number") }
queue.push { File.read("/nonexistent") }
queue.drain(timeout: 5)
puts queue.stats
# => { completed: 0, failed: 2, pending: 0, in_flight: 0 }
Completion callback
Register a callback to run after each successful task completion. The callback receives the return value of the task.
queue = Philiprehberger::TaskQueue.new(concurrency: 2)
queue.on_complete do |result|
puts "Task finished with: #{result}"
end
queue.push { 42 }
queue.push { { status: "ok" } }
queue.drain(timeout: 5)
# Task finished with: 42
# Task finished with: {:status=>"ok"}
Statistics
stats returns a snapshot of completed, failed, pending, and in-flight counts. All counters are thread-safe and updated atomically after each task finishes.
queue = Philiprehberger::TaskQueue.new(concurrency: 4)
20.times { |i| queue.push { sleep(0.01); raise "boom" if i == 5 } }
queue.drain(timeout: 10)
stats = queue.stats
puts "Completed: #{stats[:completed]}"
puts "Failed: #{stats[:failed]}"
puts "Pending: #{stats[:pending]}"
puts "In-flight: #{stats[:in_flight]}"
# Completed: 19
# Failed: 1
# Pending: 0
# In-flight: 0
Pause and resume
Temporarily suspend task consumption without shutting down. In-flight tasks will finish, but no new tasks are picked up until the queue is resumed.
queue = Philiprehberger::TaskQueue.new(concurrency: 4)
10.times { |i| queue.push { process(i) } }
queue.pause
puts queue.paused? # => true
# Tasks already in flight will complete, but pending tasks wait.
queue.resume
puts queue.paused? # => false
queue.shutdown(timeout: 10)
Clear pending tasks
Discard all pending tasks from the queue. Returns the number of tasks removed.
queue = Philiprehberger::TaskQueue.new(concurrency: 2)
100.times { |i| queue.push { process(i) } }
cleared = queue.clear
puts "Cleared #{cleared} tasks"
queue.shutdown(timeout: 5)
Reset counters
queue = Philiprehberger::TaskQueue.new(concurrency: 4)
10.times { queue.push { do_work } }
queue.drain
queue.stats_reset!
queue.stats[:completed] # => 0
FIFO ordering guarantees
Tasks are stored in an internal array and dequeued in FIFO order. When concurrency is 1, tasks execute strictly in the order they were pushed. With higher concurrency, dequeue order is still FIFO but tasks may complete out of order depending on individual execution time.
results = Queue.new # stdlib thread-safe queue for collecting output
queue = Philiprehberger::TaskQueue.new(concurrency: 1)
5.times { |i| queue.push { results << i } }
queue.drain(timeout: 5)
puts results.size.times.map { results.pop }
# => [0, 1, 2, 3, 4]
Graceful shutdown
shutdown signals all worker threads to stop accepting new tasks, lets in-flight tasks finish, then drains any remaining enqueued tasks before joining threads. The timeout parameter caps total wait time; workers that exceed the deadline are abandoned.
queue = Philiprehberger::TaskQueue.new(concurrency: 4)
100.times { |i| queue.push { sleep(0.05) } }
queue.shutdown(timeout: 10)
puts queue.running? # => false
# queue.push { ... } would now raise "queue is shut down"
Draining
drain blocks the calling thread until all pending and in-flight tasks finish, but keeps the queue running so new tasks can still be pushed afterwards.
queue = Philiprehberger::TaskQueue.new(concurrency: 4)
10.times { |i| queue.push { process(i) } }
queue.drain(timeout: 10) # waits for all tasks to finish
puts queue.running? # => true — still accepting new tasks
queue.push { process(:extra) }
queue.shutdown(timeout: 5)
API
| Method | Parameters | Returns | Description |
|---|---|---|---|
.new(concurrency:) |
concurrency — max worker threads (Integer, default 4) |
Queue |
Create a new queue with the given concurrency limit |
#push(&block) |
&block — the task to execute |
self |
Enqueue a block for async execution; raises ArgumentError if no block given, raises RuntimeError if the queue is shut down |
#<<(callable) |
callable — any object responding to #call |
self |
Alias for #push; convenient for lambdas and procs |
#size |
(none) | Integer |
Number of pending (not yet started) tasks |
#empty? |
(none) | Boolean |
Whether there are no pending tasks waiting to be started |
#running? |
(none) | Boolean |
Whether the queue is accepting new tasks |
#shutdown(timeout:) |
timeout — seconds to wait for workers (Numeric, default 30) |
nil |
Signal workers to stop, drain remaining tasks, join threads up to timeout seconds |
#on_complete(&block) |
&block — callback receiving (result) |
self |
Register a callback invoked after each successful task completion with the task's return value |
#on_error(&block) |
&block — callback receiving (exception, task) |
self |
Register an error callback invoked when a task raises a StandardError |
#stats |
(none) | Hash |
Returns { completed:, failed:, pending:, in_flight: } with Integer counts |
#drain(timeout:) |
timeout — seconds to wait (Numeric, default 30) |
nil |
Block until all pending and in-flight tasks complete without shutting down |
#pause |
(none) | self |
Suspend task consumption; in-flight tasks finish but no new tasks are picked up |
#resume |
(none) | self |
Resume a paused queue, waking workers to continue processing |
#paused? |
(none) | Boolean |
Whether the queue is currently paused |
#clear |
(none) | Integer |
Remove all pending tasks and return the number cleared |
#stats_reset! |
(none) | self |
Atomically zero the completed and failed counters while leaving pending, in-flight, workers, and callbacks untouched |
Development
bundle install
bundle exec rspec
bundle exec rubocop
Support
If you find this project useful: