Class: Phronomy::TaskGroup
- Inherits:
-
Object
- Object
- Phronomy::TaskGroup
- Defined in:
- lib/phronomy/task_group.rb
Overview
Manages a bounded set of concurrent Tasks with structured concurrency.
Enforces an upper bound on simultaneously running tasks (+limit+). When the limit is reached, #spawn blocks the caller until a slot becomes available. Results are always returned in the order tasks were spawned, regardless of completion order.
A configurable +failure_policy+ controls how errors propagate:
- +:fail_fast+ (default) — cancels all remaining tasks on the first error
- +:collect_all+ — waits for every task to complete, then raises the first error
- +:skip_failed+ — ignores failed tasks and returns only successful results
#cancel_all! cancels every task in the group and joins them, guaranteeing that the active child task count reaches zero before returning.
Constant Summary collapse
- FAILURE_POLICIES =
Valid failure policies.
%i[fail_fast collect_all skip_failed].freeze
Instance Method Summary collapse
-
#active_task_count ⇒ Integer
private
Returns the number of currently executing child tasks.
-
#await_all ⇒ Array
private
Waits for all spawned tasks to complete.
-
#cancel_all! ⇒ self
private
Cancels all tasks currently in the group and waits for each to finish.
-
#initialize(limit: Float::INFINITY, failure_policy: :fail_fast, runtime: nil) ⇒ TaskGroup
constructor
private
A new instance of TaskGroup.
-
#spawn { ... } ⇒ Task
private
Spawns a new task within the group.
Constructor Details
#initialize(limit: Float::INFINITY, failure_policy: :fail_fast, runtime: nil) ⇒ TaskGroup
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of TaskGroup.
38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/phronomy/task_group.rb', line 38 def initialize(limit: Float::INFINITY, failure_policy: :fail_fast, runtime: nil) raise ArgumentError, "unknown failure_policy: #{failure_policy}" unless FAILURE_POLICIES.include?(failure_policy) @limit = limit @failure_policy = failure_policy @runtime = runtime @tasks = [] @mutex = Mutex.new @cond = ConditionVariable.new @active = 0 end |
Instance Method Details
#active_task_count ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the number of currently executing child tasks.
227 228 229 |
# File 'lib/phronomy/task_group.rb', line 227 def active_task_count @mutex.synchronize { @active } end |
#await_all ⇒ Array
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Waits for all spawned tasks to complete. Returns results in spawn order.
Failure behaviour is controlled by the +failure_policy+ set at construction time:
- +:fail_fast+ — raises the first error after cancelling unfinished tasks
- +:collect_all+ — waits for all tasks, then raises the first error
- +:skip_failed+ — returns only the values of successful tasks
89 90 91 92 93 94 95 96 97 98 |
# File 'lib/phronomy/task_group.rb', line 89 def await_all tasks = @mutex.synchronize { @tasks.dup } return [] if tasks.empty? if Phronomy::Runtime::Scheduler.current _await_all_cooperative(tasks) else _await_all_threaded(tasks) end end |
#cancel_all! ⇒ self
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Cancels all tasks currently in the group and waits for each to finish. After this method returns, the active child task count is guaranteed to be zero.
Note: if a task is cancelled before its block has started executing, the internal +ensure+ clause inside the block may not run, so @active is reset explicitly after all tasks are joined.
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/phronomy/task_group.rb', line 201 def cancel_all! tasks = @mutex.synchronize { @tasks.dup } tasks.each(&:cancel!) tasks.each do |t| t.join rescue nil end # Force @active to zero: tasks cancelled before block execution starts # may not decrement @active via their ensure clause. scheduler = Phronomy::Runtime::Scheduler.current if scheduler && @coop_signal @active = 0 scheduler.raise_signal_all(@coop_signal) else @mutex.synchronize do @active = 0 @cond.broadcast end end self end |
#spawn { ... } ⇒ Task
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Spawns a new task within the group. Blocks if the number of currently active tasks equals +limit+.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/phronomy/task_group.rb', line 56 def spawn(&block) wait_for_slot! task = if @runtime @runtime.spawn(name: "task-group-worker") do block.call ensure release_slot! end else Task.new do block.call ensure release_slot! end end @mutex.synchronize { @tasks << task } task end |