Class: Utilrb::ThreadPool

Inherits:
Object show all
Defined in:
lib/utilrb/thread_pool.rb

Overview

ThreadPool implementation inspired by github.com/meh/ruby-threadpool

Examples:

Using a thread pool of 10 threads

pool = ThreadPool.new(10)
0.upto(9) do
   pool.process do
     sleep 1
     puts "done"
   end
end
pool.shutdown
pool.join

Author:

  • Alexander Duda <Alexander.Duda@dfki.de>

Defined Under Namespace

Classes: Task

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(min = 5, max = min) ⇒ ThreadPool

A ThreadPool

Parameters:

  • min (Fixnum) (defaults to: 5)

    the minimum number of threads

  • max (Fixnum) (defaults to: min)

    the maximum number of threads



297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
# File 'lib/utilrb/thread_pool.rb', line 297

def initialize (min = 5, max = min)
    @min = min
    @max = max

    @cond = ConditionVariable.new
    @cond_sync_key = ConditionVariable.new
    @mutex = Mutex.new

    @tasks_waiting = []         # tasks waiting for execution
    @tasks_running = []         # tasks which are currently running

    # Statistics
    @avg_run_time = 0           # average run time of a task in s [Float]
    @avg_wait_time = 0          # average time a task has to wait for execution in s [Float]

    @workers = []               # thread pool
    @spawned = 0
    @waiting = 0
    @shutdown = false
    @callback_on_task_finished = nil
    @pipes = nil
    @sync_keys = Set.new

    @trim_requests = 0
    @auto_trim = false

    @mutex.synchronize do
        min.times do
            spawn_thread
        end
    end
end

Instance Attribute Details

#auto_trimBoolean

Auto trim automatically reduces the number of worker threads if there are too many threads waiting for work.

Returns:

  • (Boolean)


291
292
293
# File 'lib/utilrb/thread_pool.rb', line 291

def auto_trim
  @auto_trim
end

#avg_run_timeFloat (readonly)

The average execution time of a (running) task.

Returns:

  • (Float)


281
282
283
# File 'lib/utilrb/thread_pool.rb', line 281

def avg_run_time
  @avg_run_time
end

#avg_wait_timeFloat (readonly)

The average waiting time of a task before being executed.

Returns:

  • (Float)


286
287
288
# File 'lib/utilrb/thread_pool.rb', line 286

def avg_wait_time
  @avg_wait_time
end

#maxFixnum

The maximum number of worker threads.

Returns:

  • (Fixnum)


266
267
268
# File 'lib/utilrb/thread_pool.rb', line 266

def max
  @max
end

#minFixnum

The minimum number of worker threads.

Returns:

  • (Fixnum)


261
262
263
# File 'lib/utilrb/thread_pool.rb', line 261

def min
  @min
end

#spawnedFixnum (readonly)

The real number of worker threads.

Returns:

  • (Fixnum)


271
272
273
# File 'lib/utilrb/thread_pool.rb', line 271

def spawned
  @spawned
end

#waitingFixnum (readonly)

The number of worker threads waiting for work.

Returns:

  • (Fixnum)


276
277
278
# File 'lib/utilrb/thread_pool.rb', line 276

def waiting
  @waiting
end

Class Method Details

.report_exception(msg, e) ⇒ Object



623
624
625
626
627
628
629
# File 'lib/utilrb/thread_pool.rb', line 623

def self.report_exception(msg, e)
    if msg
        STDERR.puts msg
    end
    STDERR.puts e.message
    STDERR.puts "  #{e.backtrace.join("\n  ")}"
end

Instance Method Details

#<<(task) ⇒ Task

Processes the given Task as soon as the next thread is available

Parameters:

  • task (Task)

    The task.

Returns:



490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
# File 'lib/utilrb/thread_pool.rb', line 490

def <<(task)
    raise "cannot add task #{task} it is still running" if task.thread
    task.reset if task.finished?
    @mutex.synchronize do
        if shutdown?
            raise "unable to add work while shutting down"
        end
        task.queued_at = Time.now
        @tasks_waiting << task
        if @waiting <= @tasks_waiting.size && @spawned < @max
            spawn_thread
        end
        @cond.signal
    end
    task
end

#backlogFixnum

Number of tasks waiting for execution

Returns:

  • (Fixnum)

    the number of tasks



379
380
381
382
383
# File 'lib/utilrb/thread_pool.rb', line 379

def backlog
    @mutex.synchronize do
        @tasks_waiting.length
    end
end

#clearObject



347
348
349
350
351
352
353
# File 'lib/utilrb/thread_pool.rb', line 347

def clear
    shutdown
    join
rescue Exception
ensure
    @shutdown = false
end

#joinObject

Blocks until all threads were terminated. This does not terminate any thread by itself and will block for ever if shutdown was not called.



533
534
535
536
537
538
539
540
541
542
# File 'lib/utilrb/thread_pool.rb', line 533

def join
    while true
        if w = @mutex.synchronize { @workers.first }
            w.join
        else
            break
        end
    end
    self
end

#on_task_finished {|Task| ... } ⇒ Object

Given code block is called for every task which was finished even it was terminated.

This can be used to store the result for an event loop

Yields:

  • (Task)

    the code block



550
551
552
553
554
# File 'lib/utilrb/thread_pool.rb', line 550

def on_task_finished (&block)
    @mutex.synchronize do
        @callback_on_task_finished = block
    end
end

#process(*args) {|*args| ... } ⇒ Task

Processes the given block as soon as the next thread is available.

Parameters:

  • args (Array)

    the block arguments

Yields:

  • (*args)

    the block

Returns:



399
400
401
# File 'lib/utilrb/thread_pool.rb', line 399

def process (*args, &block)
    process_with_options(nil,*args,&block)
end

#process?Boolean

Returns true if a worker thread is currently processing a task and no work is queued

Returns:

  • (Boolean)


407
408
409
410
411
# File 'lib/utilrb/thread_pool.rb', line 407

def process?
    @mutex.synchronize do
         waiting != spawned || @tasks_waiting.length > 0
    end
end

#process_with_options(options, *args, &block) ⇒ Task

Processes the given block as soon as the next thread is available with the given options.

Parameters:

  • options (Hash)

    The options of the task.

  • args (Array)

    The arguments for the code block

  • block (#call)

    The code block

Options Hash (options):

  • :sync_key (Object)

    The sync key

  • :callback (Proc)

    The callback

  • :default (Object)

    Default value returned when an error ocurred which was handled.

Returns:



419
420
421
422
423
# File 'lib/utilrb/thread_pool.rb', line 419

def process_with_options(options,*args, &block)
    task = Task.new(options,*args, &block)
    self << task
    task
end

#resize(min, max = nil) ⇒ Object

Changes the minimum and maximum number of threads

Parameters:

  • min (Fixnum)

    the minimum number of threads

  • max (Fixnum) (defaults to: nil)

    the maximum number of threads



364
365
366
367
368
369
370
371
372
373
374
# File 'lib/utilrb/thread_pool.rb', line 364

def resize (min, max = nil)
    @mutex.synchronize do
        @min = min
        @max = max || min
        count = [@tasks_waiting.size,@max - @spawned].min
        count.times do
            spawn_thread
        end
    end
    trim true
end

#shutdownObject

Shuts down all threads.



521
522
523
524
525
526
527
# File 'lib/utilrb/thread_pool.rb', line 521

def shutdown
    tasks = nil
    @mutex.synchronize do
        @shutdown = true
        @cond.broadcast
    end
end

#shutdown?boolean

Checks if the thread pool is shutting down all threads.

Returns:

  • (boolean)


358
# File 'lib/utilrb/thread_pool.rb', line 358

def shutdown?; @shutdown; end

#sync(sync_key, *args) {|*args| ... } ⇒ Object

Processes the given block from current thread but insures that during processing no worker thread is executing a task which has the same sync_key.

This is useful for instance member calls which are not thread safe.

Parameters:

  • sync_key (Object)

    The sync key

Yields:

  • (*args)

    the code block block

Returns:

  • (Object)

    The result of the code block

Raises:

  • (ArgumentError)


435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
# File 'lib/utilrb/thread_pool.rb', line 435

def sync(sync_key,*args,&block)
    raise ArgumentError,"no sync key" unless sync_key

    @mutex.synchronize do
        while(!@sync_keys.add?(sync_key))
            @cond_sync_key.wait @mutex #wait until someone has removed a key
        end
    end
    begin
        result = block.call(*args)
    ensure
        @mutex.synchronize do
            @sync_keys.delete sync_key
        end
        @cond_sync_key.signal
        @cond.signal # worker threads are just waiting for work no matter if it is
        # because of a deletion of a sync_key or a task was added
    end
    result
end

#sync_keysObject

returns the current used sync_keys



341
342
343
344
345
# File 'lib/utilrb/thread_pool.rb', line 341

def sync_keys
    @mutex.synchronize do
        @sync_keys.clone
    end
end

#sync_timeout(sync_key, timeout, *args) {|*args| ... } ⇒ Object

Same as sync but raises Timeout::Error if sync_key cannot be obtained after the given execution time.

Parameters:

  • sync_key (Object)

    The sync key

  • timeout (Float)

    The timeout

Yields:

  • (*args)

    the code block block

Returns:

  • (Object)

    The result of the code block

Raises:

  • (ArgumentError)


463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
# File 'lib/utilrb/thread_pool.rb', line 463

def sync_timeout(sync_key,timeout,*args,&block)
    raise ArgumentError,"no sync key" unless sync_key

    Timeout::timeout(timeout) do
        @mutex.synchronize do
            while(!@sync_keys.add?(sync_key))
                @cond_sync_key.wait @mutex #wait until someone has removed a key
            end
        end
    end
    begin
        result = block.call(*args)
    ensure
        @mutex.synchronize do
            @sync_keys.delete sync_key
        end
        @cond_sync_key.signal
        @cond.signal # worker threads are just waiting for work no matter if it is
        # because of a deletion of a sync_key or a task was added
    end
    result
end

#tasksArray<Task>

Returns an array of the current waiting and running tasks

Returns:



388
389
390
391
392
# File 'lib/utilrb/thread_pool.rb', line 388

def tasks
    @mutex.synchronize do
         @tasks_running.dup + @tasks_waiting.dup
    end
end

#trim(force = false) ⇒ Object

Trims the number of threads if threads are waiting for work and the number of spawned threads is higher than the minimum number.

Parameters:

  • force (boolean) (defaults to: false)

    Trim even if no thread is waiting.



511
512
513
514
515
516
517
# File 'lib/utilrb/thread_pool.rb', line 511

def trim (force = false)
    @mutex.synchronize do
        @trim_requests += 1
        @cond.signal
    end
    self
end