Class: Charming::Tasks::ThreadedExecutor
- Inherits:
-
Object
- Object
- Charming::Tasks::ThreadedExecutor
- Defined in:
- lib/charming/tasks/threaded_executor.rb
Overview
ThreadedExecutor runs submitted tasks on background Ruby threads. Each submission creates a new thread that invokes the block and pushes the resulting TaskEvent onto the shared queue. Threads are tracked so ‘shutdown` can wait (or kill) in-flight work, and tracked by name so `cancel` can interrupt a specific task.
Instance Method Summary collapse
-
#cancel(name) ⇒ Object
Cancels the named in-flight task by raising Tasks::Cancelled in its thread.
-
#initialize(queue) ⇒ ThreadedExecutor
constructor
queue is the thread-safe Queue (typically ‘runtime.@task_queue`) into which completed TaskEvents are pushed.
-
#shutdown(timeout: 2.0) ⇒ Object
Waits up to timeout seconds for in-flight threads to finish, then kills any remaining live threads.
-
#submit(name, timeout: nil, &block) ⇒ Object
Wraps block in a Task and spawns a new thread to invoke it.
Constructor Details
#initialize(queue) ⇒ ThreadedExecutor
queue is the thread-safe Queue (typically ‘runtime.@task_queue`) into which completed TaskEvents are pushed.
12 13 14 15 16 17 18 |
# File 'lib/charming/tasks/threaded_executor.rb', line 12 def initialize(queue) @queue = queue @threads = [] @threads_by_name = {} @mutex = Mutex.new @shutting_down = false end |
Instance Method Details
#cancel(name) ⇒ Object
Cancels the named in-flight task by raising Tasks::Cancelled in its thread. The task completes with a TaskEvent whose error is the Cancelled exception. No-op when the task isn’t running.
40 41 42 43 44 45 46 |
# File 'lib/charming/tasks/threaded_executor.rb', line 40 def cancel(name) thread = @mutex.synchronize { @threads_by_name[name.to_sym] } return unless thread&.alive? thread.raise(Cancelled, "task #{name} cancelled") nil end |
#shutdown(timeout: 2.0) ⇒ Object
Waits up to timeout seconds for in-flight threads to finish, then kills any remaining live threads. Refuses new submissions once called.
50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/charming/tasks/threaded_executor.rb', line 50 def shutdown(timeout: 2.0) threads = @mutex.synchronize do @shutting_down = true @threads.dup end threads.each { |thread| thread.join(timeout) } threads.each do |thread| next unless thread.alive? thread.kill thread.join(0) end end |
#submit(name, timeout: nil, &block) ⇒ Object
Wraps block in a Task and spawns a new thread to invoke it. The thread’s return value (or rescued exception) is pushed onto the queue as a TaskEvent. Blocks that accept an argument receive a Progress reporter. timeout (seconds) cancels the task when exceeded. Returns nil immediately. Raises if called after shutdown has begun.
25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/charming/tasks/threaded_executor.rb', line 25 def submit(name, timeout: nil, &block) task = Task.new(name: name.to_sym, block: block, timeout: timeout) @mutex.synchronize do raise "cannot submit task after shutdown" if @shutting_down thread = Thread.new(task) { |t| @queue << run(t) } @threads << thread @threads_by_name[task.name] = thread end nil end |