Class: Philiprehberger::TaskQueue::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/philiprehberger/task_queue/queue.rb

Overview

In-process async job queue with concurrency control.

Tasks are enqueued as blocks or callable objects and executed by a pool of worker threads. The queue is fully thread-safe.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(concurrency: 4) ⇒ Queue

Returns a new instance of Queue.

Parameters:

  • concurrency (Integer) (defaults to: 4)

    maximum number of concurrent worker threads



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/philiprehberger/task_queue/queue.rb', line 16

def initialize(concurrency: 4)
  @concurrency = concurrency
  @tasks = []
  @mutex = Mutex.new
  @condition = ConditionVariable.new
  @drain_condition = ConditionVariable.new
  @workers = []
  @running = true
  @started = false
  @paused = false
  @pause_condition = ConditionVariable.new
  @error_handler = nil
  @complete_handler = nil
  @stats = { completed: 0, failed: 0, in_flight: 0 }
end

Instance Attribute Details

#concurrencyInteger (readonly)

Returns the maximum number of concurrent worker threads.

Returns:

  • (Integer)

    the maximum number of concurrent worker threads



13
14
15
# File 'lib/philiprehberger/task_queue/queue.rb', line 13

def concurrency
  @concurrency
end

Instance Method Details

#clearInteger

Remove all pending tasks from the queue.

Returns:

  • (Integer)

    number of tasks cleared



98
99
100
101
102
103
104
# File 'lib/philiprehberger/task_queue/queue.rb', line 98

def clear
  @mutex.synchronize do
    count = @tasks.size
    @tasks.clear
    count
  end
end

#drain(timeout: 30) ⇒ void

This method returns an undefined value.

Block until all pending tasks are complete without shutting down.

Parameters:

  • timeout (Numeric) (defaults to: 30)

    seconds to wait before returning



125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/philiprehberger/task_queue/queue.rb', line 125

def drain(timeout: 30)
  deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout
  @mutex.synchronize do
    while !@tasks.empty? || @stats[:in_flight].positive?
      remaining = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC)
      break if remaining <= 0

      @drain_condition.wait(@mutex, remaining)
    end
  end
  nil
end

#empty?Boolean

Whether there are no pending tasks waiting to be started.

In-flight tasks are not considered; use drain to wait for them.

Returns:

  • (Boolean)


172
173
174
# File 'lib/philiprehberger/task_queue/queue.rb', line 172

def empty?
  @mutex.synchronize { @tasks.empty? }
end

#on_complete {|result| ... } ⇒ self

Register a callback invoked after each successful task completion.

The callback receives the return value of the completed task.

Yields:

  • (result)

    called on task success

Returns:

  • (self)


49
50
51
52
# File 'lib/philiprehberger/task_queue/queue.rb', line 49

def on_complete(&block)
  @mutex.synchronize { @complete_handler = block }
  self
end

#on_error {|exception, task| ... } ⇒ self

Register a callback invoked when a task raises an exception.

The callback receives the exception and the task that raised it.

Yields:

  • (exception, task)

    called on task failure

Returns:

  • (self)


38
39
40
41
# File 'lib/philiprehberger/task_queue/queue.rb', line 38

def on_error(&block)
  @mutex.synchronize { @error_handler = block }
  self
end

#pauseself

Pause the queue so workers stop dequeuing new tasks.

In-flight tasks will finish, but no new tasks will be picked up until resume is called.

Returns:

  • (self)


70
71
72
73
74
75
# File 'lib/philiprehberger/task_queue/queue.rb', line 70

def pause
  @mutex.synchronize do
    @paused = true
  end
  self
end

#paused?Boolean

Whether the queue is currently paused.

Returns:

  • (Boolean)


91
92
93
# File 'lib/philiprehberger/task_queue/queue.rb', line 91

def paused?
  @mutex.synchronize { @paused }
end

#push(callable = nil) { ... } ⇒ self Also known as: <<

Enqueue a task to be processed asynchronously.

Parameters:

  • callable (#call, nil) (defaults to: nil)

    a callable object (used by <<)

Yields:

  • the block to execute (takes precedence over callable)

Returns:

  • (self)

Raises:

  • (ArgumentError)


143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/philiprehberger/task_queue/queue.rb', line 143

def push(callable = nil, &block)
  task = block || callable
  raise ArgumentError, 'a block is required' unless task

  @mutex.synchronize do
    raise 'queue is shut down' unless @running

    start_workers unless @started
    @tasks << task
    @condition.signal
  end

  self
end

#resumeself

Resume a paused queue, waking workers to continue processing.

Returns:

  • (self)


80
81
82
83
84
85
86
# File 'lib/philiprehberger/task_queue/queue.rb', line 80

def resume
  @mutex.synchronize do
    @paused = false
    @pause_condition.broadcast
  end
  self
end

#running?Boolean

Whether the queue is accepting new tasks.

Returns:

  • (Boolean)


179
180
181
# File 'lib/philiprehberger/task_queue/queue.rb', line 179

def running?
  @mutex.synchronize { @running }
end

#shutdown(timeout: 30) ⇒ void

This method returns an undefined value.

Gracefully shut down the queue.

Signals all workers to finish their current task and drain remaining tasks, then waits up to timeout seconds for threads to exit.

Parameters:

  • timeout (Numeric) (defaults to: 30)

    seconds to wait for workers to finish



190
191
192
193
194
# File 'lib/philiprehberger/task_queue/queue.rb', line 190

def shutdown(timeout: 30)
  signal_shutdown
  wait_for_workers(timeout)
  nil
end

#sizeInteger

Number of pending (not yet started) tasks.

Returns:

  • (Integer)


163
164
165
# File 'lib/philiprehberger/task_queue/queue.rb', line 163

def size
  @mutex.synchronize { @tasks.size }
end

#statsHash{Symbol => Integer}

Return statistics about processed tasks.

Returns:

  • (Hash{Symbol => Integer})

    counts for :completed, :failed, :pending, :in_flight



57
58
59
60
61
62
# File 'lib/philiprehberger/task_queue/queue.rb', line 57

def stats
  @mutex.synchronize do
    { completed: @stats[:completed], failed: @stats[:failed], pending: @tasks.size,
      in_flight: @stats[:in_flight] }
  end
end

#stats_reset!self

Atomically zero the completed and failed counters.

Leaves pending, in_flight, worker threads, and registered callbacks untouched. Useful for resetting metrics between reporting intervals without shutting down the pool.

Returns:

  • (self)


113
114
115
116
117
118
119
# File 'lib/philiprehberger/task_queue/queue.rb', line 113

def stats_reset!
  @mutex.synchronize do
    @stats[:completed] = 0
    @stats[:failed] = 0
  end
  self
end