Class: Takagi::Controller::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/takagi/controller/thread_pool.rb

Overview

Thread pool for processing controller requests

Each controller can have its own dedicated thread pool, allowing resource allocation based on expected load. This enables:

  • IngressController with 30 threads for high throughput

  • ConfigController with 2 threads for low traffic

  • Fine-grained resource control per controller

Examples:

pool = ThreadPool.new(size: 10, name: 'IngressController')
pool.schedule { handle_request(data) }
pool.shutdown

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size:, name: 'controller-pool') ⇒ ThreadPool

Initialize a new thread pool

Parameters:

  • size (Integer)

    Number of worker threads

  • name (String) (defaults to: 'controller-pool')

    Pool name for debugging/monitoring



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/takagi/controller/thread_pool.rb', line 24

def initialize(size:, name: 'controller-pool')
  @size = size
  @name = name
  @queue = Queue.new
  @workers = []
  @shutdown = false
  @mutex = Mutex.new
  @stats = {
    processed: 0,
    errors: 0,
    queue_size: 0,
    created_at: Time.now
  }

  spawn_workers
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



18
19
20
# File 'lib/takagi/controller/thread_pool.rb', line 18

def name
  @name
end

#sizeObject (readonly)

Returns the value of attribute size.



18
19
20
# File 'lib/takagi/controller/thread_pool.rb', line 18

def size
  @size
end

#statsObject (readonly)

Returns the value of attribute stats.



18
19
20
# File 'lib/takagi/controller/thread_pool.rb', line 18

def stats
  @stats
end

Instance Method Details

#current_statsHash

Get current pool statistics

Returns:

  • (Hash)

    Statistics including processed, errors, queue size



81
82
83
84
85
86
87
88
89
90
# File 'lib/takagi/controller/thread_pool.rb', line 81

def current_stats
  @mutex.synchronize do
    @stats.merge(
      size: @size,
      queue_size: @queue.size,
      shutdown: @shutdown,
      uptime: Time.now - @stats[:created_at]
    )
  end
end

#empty?Boolean

Check if the queue is empty

Returns:

  • (Boolean)

    true if queue is empty



102
103
104
# File 'lib/takagi/controller/thread_pool.rb', line 102

def empty?
  @queue.empty?
end

#queue_sizeInteger

Get the number of jobs currently in the queue

Returns:

  • (Integer)

    Queue size



95
96
97
# File 'lib/takagi/controller/thread_pool.rb', line 95

def queue_size
  @queue.size
end

#schedule { ... } ⇒ Object

Schedule a job to be executed by the thread pool

Examples:

pool.schedule do
  result = process_request(data)
  send_response(result)
end

Yields:

  • Block to execute in a worker thread

Raises:

  • (RuntimeError)

    if pool is shutdown



51
52
53
54
55
56
# File 'lib/takagi/controller/thread_pool.rb', line 51

def schedule(&block)
  raise "ThreadPool '#{@name}' is shutdown" if @shutdown

  @queue << block
  update_queue_size
end

#shutdownObject

Gracefully shutdown the thread pool

Sends poison pills to all workers and waits for them to finish. Blocks until all workers have terminated.



62
63
64
65
66
67
68
69
# File 'lib/takagi/controller/thread_pool.rb', line 62

def shutdown
  return if @shutdown

  @shutdown = true
  @size.times { @queue << nil }  # Poison pills
  @workers.each(&:join)
  Takagi.logger.info "ThreadPool '#{@name}' shutdown complete"
end

#shutdown?Boolean

Check if the pool is shutdown

Returns:

  • (Boolean)

    true if shutdown



74
75
76
# File 'lib/takagi/controller/thread_pool.rb', line 74

def shutdown?
  @shutdown
end