Class: Utilrb::ThreadPool
Overview
ThreadPool implementation inspired by github.com/meh/ruby-threadpool
Defined Under Namespace
Classes: Task
Instance Attribute Summary collapse
-
#auto_trim ⇒ Boolean
Auto trim automatically reduces the number of worker threads if there are too many threads waiting for work.
-
#avg_run_time ⇒ Float
readonly
The average execution time of a (running) task.
-
#avg_wait_time ⇒ Float
readonly
The average waiting time of a task before being executed.
-
#max ⇒ Fixnum
The maximum number of worker threads.
-
#min ⇒ Fixnum
The minimum number of worker threads.
-
#spawned ⇒ Fixnum
readonly
The real number of worker threads.
-
#waiting ⇒ Fixnum
readonly
The number of worker threads waiting for work.
Class Method Summary collapse
Instance Method Summary collapse
-
#<<(task) ⇒ Task
Processes the given Task as soon as the next thread is available.
-
#backlog ⇒ Fixnum
Number of tasks waiting for execution.
- #clear ⇒ Object
-
#initialize(min = 5, max = min) ⇒ ThreadPool
constructor
A ThreadPool.
-
#join ⇒ Object
Blocks until all threads were terminated.
-
#on_task_finished {|Task| ... } ⇒ Object
Given code block is called for every task which was finished even it was terminated.
-
#process(*args) {|*args| ... } ⇒ Task
Processes the given block as soon as the next thread is available.
-
#process? ⇒ Boolean
Returns true if a worker thread is currently processing a task and no work is queued.
-
#process_with_options(options, *args, &block) ⇒ Task
Processes the given block as soon as the next thread is available with the given options.
-
#resize(min, max = nil) ⇒ Object
Changes the minimum and maximum number of threads.
-
#shutdown ⇒ Object
Shuts down all threads.
-
#shutdown? ⇒ boolean
Checks if the thread pool is shutting down all threads.
-
#sync(sync_key, *args) {|*args| ... } ⇒ Object
Processes the given block from current thread but insures that during processing no worker thread is executing a task which has the same sync_key.
-
#sync_keys ⇒ Object
returns the current used sync_keys.
-
#sync_timeout(sync_key, timeout, *args) {|*args| ... } ⇒ Object
Same as sync but raises Timeout::Error if sync_key cannot be obtained after the given execution time.
-
#tasks ⇒ Array<Task>
Returns an array of the current waiting and running tasks.
-
#trim(force = false) ⇒ Object
Trims the number of threads if threads are waiting for work and the number of spawned threads is higher than the minimum number.
Constructor Details
#initialize(min = 5, max = min) ⇒ ThreadPool
A ThreadPool
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 |
# File 'lib/utilrb/thread_pool.rb', line 297 def initialize (min = 5, max = min) @min = min @max = max @cond = ConditionVariable.new @cond_sync_key = ConditionVariable.new @mutex = Mutex.new @tasks_waiting = [] # tasks waiting for execution @tasks_running = [] # tasks which are currently running # Statistics @avg_run_time = 0 # average run time of a task in s [Float] @avg_wait_time = 0 # average time a task has to wait for execution in s [Float] @workers = [] # thread pool @spawned = 0 @waiting = 0 @shutdown = false @callback_on_task_finished = nil @pipes = nil @sync_keys = Set.new @trim_requests = 0 @auto_trim = false @mutex.synchronize do min.times do spawn_thread end end end |
Instance Attribute Details
#auto_trim ⇒ Boolean
Auto trim automatically reduces the number of worker threads if there are too many threads waiting for work.
291 292 293 |
# File 'lib/utilrb/thread_pool.rb', line 291 def auto_trim @auto_trim end |
#avg_run_time ⇒ Float (readonly)
The average execution time of a (running) task.
281 282 283 |
# File 'lib/utilrb/thread_pool.rb', line 281 def avg_run_time @avg_run_time end |
#avg_wait_time ⇒ Float (readonly)
The average waiting time of a task before being executed.
286 287 288 |
# File 'lib/utilrb/thread_pool.rb', line 286 def avg_wait_time @avg_wait_time end |
#max ⇒ Fixnum
The maximum number of worker threads.
266 267 268 |
# File 'lib/utilrb/thread_pool.rb', line 266 def max @max end |
#min ⇒ Fixnum
The minimum number of worker threads.
261 262 263 |
# File 'lib/utilrb/thread_pool.rb', line 261 def min @min end |
#spawned ⇒ Fixnum (readonly)
The real number of worker threads.
271 272 273 |
# File 'lib/utilrb/thread_pool.rb', line 271 def spawned @spawned end |
#waiting ⇒ Fixnum (readonly)
The number of worker threads waiting for work.
276 277 278 |
# File 'lib/utilrb/thread_pool.rb', line 276 def waiting @waiting end |
Class Method Details
.report_exception(msg, e) ⇒ Object
623 624 625 626 627 628 629 |
# File 'lib/utilrb/thread_pool.rb', line 623 def self.report_exception(msg, e) if msg STDERR.puts msg end STDERR.puts e. STDERR.puts " #{e.backtrace.join("\n ")}" end |
Instance Method Details
#<<(task) ⇒ Task
Processes the given Task as soon as the next thread is available
490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 |
# File 'lib/utilrb/thread_pool.rb', line 490 def <<(task) raise "cannot add task #{task} it is still running" if task.thread task.reset if task.finished? @mutex.synchronize do if shutdown? raise "unable to add work while shutting down" end task.queued_at = Time.now @tasks_waiting << task if @waiting <= @tasks_waiting.size && @spawned < @max spawn_thread end @cond.signal end task end |
#backlog ⇒ Fixnum
Number of tasks waiting for execution
379 380 381 382 383 |
# File 'lib/utilrb/thread_pool.rb', line 379 def backlog @mutex.synchronize do @tasks_waiting.length end end |
#clear ⇒ Object
347 348 349 350 351 352 353 |
# File 'lib/utilrb/thread_pool.rb', line 347 def clear shutdown join rescue Exception ensure @shutdown = false end |
#join ⇒ Object
Blocks until all threads were terminated. This does not terminate any thread by itself and will block for ever if shutdown was not called.
533 534 535 536 537 538 539 540 541 542 |
# File 'lib/utilrb/thread_pool.rb', line 533 def join while true if w = @mutex.synchronize { @workers.first } w.join else break end end self end |
#on_task_finished {|Task| ... } ⇒ Object
Given code block is called for every task which was finished even it was terminated.
This can be used to store the result for an event loop
550 551 552 553 554 |
# File 'lib/utilrb/thread_pool.rb', line 550 def on_task_finished (&block) @mutex.synchronize do @callback_on_task_finished = block end end |
#process(*args) {|*args| ... } ⇒ Task
Processes the given block as soon as the next thread is available.
399 400 401 |
# File 'lib/utilrb/thread_pool.rb', line 399 def process (*args, &block) (nil,*args,&block) end |
#process? ⇒ Boolean
Returns true if a worker thread is currently processing a task and no work is queued
407 408 409 410 411 |
# File 'lib/utilrb/thread_pool.rb', line 407 def process? @mutex.synchronize do waiting != spawned || @tasks_waiting.length > 0 end end |
#process_with_options(options, *args, &block) ⇒ Task
Processes the given block as soon as the next thread is available with the given options.
419 420 421 422 423 |
# File 'lib/utilrb/thread_pool.rb', line 419 def (,*args, &block) task = Task.new(,*args, &block) self << task task end |
#resize(min, max = nil) ⇒ Object
Changes the minimum and maximum number of threads
364 365 366 367 368 369 370 371 372 373 374 |
# File 'lib/utilrb/thread_pool.rb', line 364 def resize (min, max = nil) @mutex.synchronize do @min = min @max = max || min count = [@tasks_waiting.size,@max - @spawned].min count.times do spawn_thread end end trim true end |
#shutdown ⇒ Object
Shuts down all threads.
521 522 523 524 525 526 527 |
# File 'lib/utilrb/thread_pool.rb', line 521 def shutdown tasks = nil @mutex.synchronize do @shutdown = true @cond.broadcast end end |
#shutdown? ⇒ boolean
Checks if the thread pool is shutting down all threads.
358 |
# File 'lib/utilrb/thread_pool.rb', line 358 def shutdown?; @shutdown; end |
#sync(sync_key, *args) {|*args| ... } ⇒ Object
Processes the given block from current thread but insures that during processing no worker thread is executing a task which has the same sync_key.
This is useful for instance member calls which are not thread safe.
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 |
# File 'lib/utilrb/thread_pool.rb', line 435 def sync(sync_key,*args,&block) raise ArgumentError,"no sync key" unless sync_key @mutex.synchronize do while(!@sync_keys.add?(sync_key)) @cond_sync_key.wait @mutex #wait until someone has removed a key end end begin result = block.call(*args) ensure @mutex.synchronize do @sync_keys.delete sync_key end @cond_sync_key.signal @cond.signal # worker threads are just waiting for work no matter if it is # because of a deletion of a sync_key or a task was added end result end |
#sync_keys ⇒ Object
returns the current used sync_keys
341 342 343 344 345 |
# File 'lib/utilrb/thread_pool.rb', line 341 def sync_keys @mutex.synchronize do @sync_keys.clone end end |
#sync_timeout(sync_key, timeout, *args) {|*args| ... } ⇒ Object
Same as sync but raises Timeout::Error if sync_key cannot be obtained after the given execution time.
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 |
# File 'lib/utilrb/thread_pool.rb', line 463 def sync_timeout(sync_key,timeout,*args,&block) raise ArgumentError,"no sync key" unless sync_key Timeout::timeout(timeout) do @mutex.synchronize do while(!@sync_keys.add?(sync_key)) @cond_sync_key.wait @mutex #wait until someone has removed a key end end end begin result = block.call(*args) ensure @mutex.synchronize do @sync_keys.delete sync_key end @cond_sync_key.signal @cond.signal # worker threads are just waiting for work no matter if it is # because of a deletion of a sync_key or a task was added end result end |
#tasks ⇒ Array<Task>
Returns an array of the current waiting and running tasks
388 389 390 391 392 |
# File 'lib/utilrb/thread_pool.rb', line 388 def tasks @mutex.synchronize do @tasks_running.dup + @tasks_waiting.dup end end |
#trim(force = false) ⇒ Object
Trims the number of threads if threads are waiting for work and the number of spawned threads is higher than the minimum number.
511 512 513 514 515 516 517 |
# File 'lib/utilrb/thread_pool.rb', line 511 def trim (force = false) @mutex.synchronize do @trim_requests += 1 @cond.signal end self end |