Class: Phronomy::TaskGroup

Inherits:
Object
  • Object
show all
Defined in:
lib/phronomy/task_group.rb

Overview

Manages a bounded set of concurrent Tasks with structured concurrency.

Enforces an upper bound on simultaneously running tasks (+limit+). When the limit is reached, #spawn blocks the caller until a slot becomes available. Results are always returned in the order tasks were spawned, regardless of completion order.

A configurable +failure_policy+ controls how errors propagate:

  • +:fail_fast+ (default) — cancels all remaining tasks on the first error
  • +:collect_all+ — waits for every task to complete, then raises the first error
  • +:skip_failed+ — ignores failed tasks and returns only successful results

#cancel_all! cancels every task in the group and joins them, guaranteeing that the active child task count reaches zero before returning.

Examples:

Parallel tool calls with a concurrency cap

group = Phronomy::TaskGroup.new(limit: 5)
tasks = items.map { |item| group.spawn { process(item) } }
results = group.await_all   # Array in spawn order

Collect-all failure policy

group = Phronomy::TaskGroup.new(failure_policy: :collect_all)

Constant Summary collapse

FAILURE_POLICIES =

Valid failure policies.

%i[fail_fast collect_all skip_failed].freeze

Instance Method Summary collapse

Constructor Details

#initialize(limit: Float::INFINITY, failure_policy: :fail_fast, runtime: nil) ⇒ TaskGroup

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of TaskGroup.

Parameters:

  • limit (Integer, Float::INFINITY) (defaults to: Float::INFINITY)

    maximum simultaneous active tasks

  • failure_policy (Symbol) (defaults to: :fail_fast)

    one of FAILURE_POLICIES (default +:fail_fast+)

  • runtime (Runtime, nil) (defaults to: nil)

    runtime used to spawn tasks via Runtime#spawn; when +nil+, tasks are created directly via +Task.new+ (backward-compatible mode). Pass +runtime: self+ from Runtime#task_group to keep task execution consistent with the configured scheduler backend.

Raises:

  • (ArgumentError)


38
39
40
41
42
43
44
45
46
47
48
# File 'lib/phronomy/task_group.rb', line 38

def initialize(limit: Float::INFINITY, failure_policy: :fail_fast, runtime: nil)
  raise ArgumentError, "unknown failure_policy: #{failure_policy}" unless FAILURE_POLICIES.include?(failure_policy)

  @limit = limit
  @failure_policy = failure_policy
  @runtime = runtime
  @tasks = []
  @mutex = Mutex.new
  @cond = ConditionVariable.new
  @active = 0
end

Instance Method Details

#active_task_countInteger

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the number of currently executing child tasks.

Returns:

  • (Integer)


227
228
229
# File 'lib/phronomy/task_group.rb', line 227

def active_task_count
  @mutex.synchronize { @active }
end

#await_allArray

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Waits for all spawned tasks to complete. Returns results in spawn order.

Failure behaviour is controlled by the +failure_policy+ set at construction time:

  • +:fail_fast+ — raises the first error after cancelling unfinished tasks
  • +:collect_all+ — waits for all tasks, then raises the first error
  • +:skip_failed+ — returns only the values of successful tasks

Returns:

  • (Array)

    results in spawn order (or successful-only for :skip_failed)

Raises:

  • (Exception)

    when any task failed (except :skip_failed)



89
90
91
92
93
94
95
96
97
98
# File 'lib/phronomy/task_group.rb', line 89

def await_all
  tasks = @mutex.synchronize { @tasks.dup }
  return [] if tasks.empty?

  if Phronomy::Runtime::Scheduler.current
    _await_all_cooperative(tasks)
  else
    _await_all_threaded(tasks)
  end
end

#cancel_all!self

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Cancels all tasks currently in the group and waits for each to finish. After this method returns, the active child task count is guaranteed to be zero.

Note: if a task is cancelled before its block has started executing, the internal +ensure+ clause inside the block may not run, so @active is reset explicitly after all tasks are joined.

Returns:

  • (self)


201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/phronomy/task_group.rb', line 201

def cancel_all!
  tasks = @mutex.synchronize { @tasks.dup }
  tasks.each(&:cancel!)
  tasks.each do |t|
    t.join
  rescue
    nil
  end
  # Force @active to zero: tasks cancelled before block execution starts
  # may not decrement @active via their ensure clause.
  scheduler = Phronomy::Runtime::Scheduler.current
  if scheduler && @coop_signal
    @active = 0
    scheduler.raise_signal_all(@coop_signal)
  else
    @mutex.synchronize do
      @active = 0
      @cond.broadcast
    end
  end
  self
end

#spawn { ... } ⇒ Task

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Spawns a new task within the group. Blocks if the number of currently active tasks equals +limit+.

Yields:

  • block to execute concurrently

Returns:

  • (Task)

    the spawned task



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/phronomy/task_group.rb', line 56

def spawn(&block)
  wait_for_slot!

  task = if @runtime
    @runtime.spawn(name: "task-group-worker") do
      block.call
    ensure
      release_slot!
    end
  else
    Task.new do
      block.call
    ensure
      release_slot!
    end
  end

  @mutex.synchronize { @tasks << task }
  task
end