Class: Philiprehberger::TaskQueue::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/philiprehberger/task_queue/queue.rb

Overview

In-process async job queue with concurrency control.

Tasks are enqueued as blocks or callable objects and executed by a pool of worker threads. The queue is fully thread-safe.

Instance Method Summary collapse

Constructor Details

#initialize(concurrency: 4) ⇒ Queue

Returns a new instance of Queue.

Parameters:

  • concurrency (Integer) (defaults to: 4)

    maximum number of concurrent worker threads



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/philiprehberger/task_queue/queue.rb', line 13

def initialize(concurrency: 4)
  @concurrency = concurrency
  @tasks = []
  @mutex = Mutex.new
  @condition = ConditionVariable.new
  @drain_condition = ConditionVariable.new
  @workers = []
  @running = true
  @started = false
  @paused = false
  @pause_condition = ConditionVariable.new
  @error_handler = nil
  @complete_handler = nil
  @stats = { completed: 0, failed: 0, in_flight: 0 }
end

Instance Method Details

#clearInteger

Remove all pending tasks from the queue.

Returns:

  • (Integer)

    number of tasks cleared



95
96
97
98
99
100
101
# File 'lib/philiprehberger/task_queue/queue.rb', line 95

def clear
  @mutex.synchronize do
    count = @tasks.size
    @tasks.clear
    count
  end
end

#drain(timeout: 30) ⇒ void

This method returns an undefined value.

Block until all pending tasks are complete without shutting down.

Parameters:

  • timeout (Numeric) (defaults to: 30)

    seconds to wait before returning



122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/philiprehberger/task_queue/queue.rb', line 122

def drain(timeout: 30)
  deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout
  @mutex.synchronize do
    while !@tasks.empty? || @stats[:in_flight].positive?
      remaining = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC)
      break if remaining <= 0

      @drain_condition.wait(@mutex, remaining)
    end
  end
  nil
end

#empty?Boolean

Whether there are no pending tasks waiting to be started.

In-flight tasks are not considered; use drain to wait for them.

Returns:

  • (Boolean)


169
170
171
# File 'lib/philiprehberger/task_queue/queue.rb', line 169

def empty?
  @mutex.synchronize { @tasks.empty? }
end

#on_complete {|result| ... } ⇒ self

Register a callback invoked after each successful task completion.

The callback receives the return value of the completed task.

Yields:

  • (result)

    called on task success

Returns:

  • (self)


46
47
48
49
# File 'lib/philiprehberger/task_queue/queue.rb', line 46

def on_complete(&block)
  @mutex.synchronize { @complete_handler = block }
  self
end

#on_error {|exception, task| ... } ⇒ self

Register a callback invoked when a task raises an exception.

The callback receives the exception and the task that raised it.

Yields:

  • (exception, task)

    called on task failure

Returns:

  • (self)


35
36
37
38
# File 'lib/philiprehberger/task_queue/queue.rb', line 35

def on_error(&block)
  @mutex.synchronize { @error_handler = block }
  self
end

#pauseself

Pause the queue so workers stop dequeuing new tasks.

In-flight tasks will finish, but no new tasks will be picked up until resume is called.

Returns:

  • (self)


67
68
69
70
71
72
# File 'lib/philiprehberger/task_queue/queue.rb', line 67

def pause
  @mutex.synchronize do
    @paused = true
  end
  self
end

#paused?Boolean

Whether the queue is currently paused.

Returns:

  • (Boolean)


88
89
90
# File 'lib/philiprehberger/task_queue/queue.rb', line 88

def paused?
  @mutex.synchronize { @paused }
end

#push(callable = nil) { ... } ⇒ self Also known as: <<

Enqueue a task to be processed asynchronously.

Parameters:

  • callable (#call, nil) (defaults to: nil)

    a callable object (used by <<)

Yields:

  • the block to execute (takes precedence over callable)

Returns:

  • (self)

Raises:

  • (ArgumentError)


140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/philiprehberger/task_queue/queue.rb', line 140

def push(callable = nil, &block)
  task = block || callable
  raise ArgumentError, 'a block is required' unless task

  @mutex.synchronize do
    raise 'queue is shut down' unless @running

    start_workers unless @started
    @tasks << task
    @condition.signal
  end

  self
end

#resumeself

Resume a paused queue, waking workers to continue processing.

Returns:

  • (self)


77
78
79
80
81
82
83
# File 'lib/philiprehberger/task_queue/queue.rb', line 77

def resume
  @mutex.synchronize do
    @paused = false
    @pause_condition.broadcast
  end
  self
end

#running?Boolean

Whether the queue is accepting new tasks.

Returns:

  • (Boolean)


176
177
178
# File 'lib/philiprehberger/task_queue/queue.rb', line 176

def running?
  @mutex.synchronize { @running }
end

#shutdown(timeout: 30) ⇒ void

This method returns an undefined value.

Gracefully shut down the queue.

Signals all workers to finish their current task and drain remaining tasks, then waits up to timeout seconds for threads to exit.

Parameters:

  • timeout (Numeric) (defaults to: 30)

    seconds to wait for workers to finish



187
188
189
190
191
# File 'lib/philiprehberger/task_queue/queue.rb', line 187

def shutdown(timeout: 30)
  signal_shutdown
  wait_for_workers(timeout)
  nil
end

#sizeInteger

Number of pending (not yet started) tasks.

Returns:

  • (Integer)


160
161
162
# File 'lib/philiprehberger/task_queue/queue.rb', line 160

def size
  @mutex.synchronize { @tasks.size }
end

#statsHash{Symbol => Integer}

Return statistics about processed tasks.

Returns:

  • (Hash{Symbol => Integer})

    counts for :completed, :failed, :pending, :in_flight



54
55
56
57
58
59
# File 'lib/philiprehberger/task_queue/queue.rb', line 54

def stats
  @mutex.synchronize do
    { completed: @stats[:completed], failed: @stats[:failed], pending: @tasks.size,
      in_flight: @stats[:in_flight] }
  end
end

#stats_reset!self

Atomically zero the completed and failed counters.

Leaves pending, in_flight, worker threads, and registered callbacks untouched. Useful for resetting metrics between reporting intervals without shutting down the pool.

Returns:

  • (self)


110
111
112
113
114
115
116
# File 'lib/philiprehberger/task_queue/queue.rb', line 110

def stats_reset!
  @mutex.synchronize do
    @stats[:completed] = 0
    @stats[:failed] = 0
  end
  self
end