Class: Phronomy::Task
- Inherits:
-
Object
- Object
- Phronomy::Task
- Defined in:
- lib/phronomy/task.rb,
lib/phronomy/task/backend.rb,
lib/phronomy/task/fiber_backend.rb,
lib/phronomy/task/mapped_backend.rb,
lib/phronomy/task/thread_backend.rb,
lib/phronomy/task/immediate_backend.rb
Overview
A single unit of concurrent work.
Decouples task semantics from the underlying execution primitive via a pluggable Backend. The default backend is ThreadBackend; a cooperative or test-double backend can be substituted via Task.default_backend_class= or by passing +backend_class:+ to Task.spawn.
+Task.spawn+ is an internal API used by schedulers and the framework itself. Application code and framework components should use +Runtime.instance.spawn+ instead, which routes through the configured scheduler and respects the concurrency model.
Defined Under Namespace
Classes: Backend, FiberBackend, ImmediateBackend, MappedBackend, ThreadBackend
Constant Summary collapse
- STATES =
Valid task lifecycle states.
%i[pending running completed failed cancelled].freeze
- SCHEDULER_KEY =
Cooperative task backend using Ruby Fibers.
Unlike ImmediateBackend (which runs the block to completion on the calling thread) or ThreadBackend (which runs the block on a new OS thread), +FiberBackend+ wraps the block in a +Fiber+ that is NOT started immediately. The owning scheduler calls #step to advance execution one cooperative step at a time.
This backend is used exclusively by Runtime::DeterministicScheduler to enable deterministic, wall-clock-free testing of concurrent logic.
Thread-local key under which the currently active DeterministicScheduler is stored so that #await can suspend cooperatively.
:phronomy_deterministic_scheduler
Instance Attribute Summary collapse
-
#backend ⇒ Backend
readonly
private
The execution backend for this task.
-
#name ⇒ String?
readonly
Optional human-readable label.
-
#parent ⇒ Task?
readonly
Parent task in the task tree, if any.
Class Method Summary collapse
-
.checkpoint! ⇒ void
private
Cooperative cancellation checkpoint.
-
.current ⇒ Task?
private
Returns the Task currently executing on this thread, or +nil+.
-
.current_cpu_slice_start_ms ⇒ Integer?
private
Returns the monotonic clock value (ms) when the current task last recorded a yield (or when the task started), or +nil+ when not inside a task context.
-
.default_backend_class ⇒ Class<Backend>
private
Returns the process-wide default backend class.
-
.default_backend_class=(klass) ⇒ Object
private
Sets the process-wide default backend class.
-
.increment_yield_counter! ⇒ Integer
private
Returns and increments a per-thread yield-if-needed counter.
-
.record_yield! ⇒ Object
private
Resets the CPU slice start clock for the current task to +now+.
-
.spawn(name: nil, parent: current, backend_class: default_backend_class, &block) ⇒ Task
private
Spawns a new task executing +block+ concurrently.
Instance Method Summary collapse
-
#alive? ⇒ Boolean
private
Returns +true+ while the task's block is still executing.
-
#await ⇒ Object
private
Blocks until the task completes and returns its value.
-
#cancel! ⇒ self
private
Requests cancellation.
-
#done? ⇒ Boolean
private
Returns +true+ once the task has finished (success, error, or cancellation).
-
#initialize(name: nil, parent: nil, backend_class: self.class.default_backend_class, &block) ⇒ Task
constructor
A new instance of Task.
-
#join(limit = nil) ⇒ Object?
private
Joins the underlying execution context, optionally with a timeout.
-
#map {|value| ... } ⇒ Task
Returns a new Task whose completed value is the result of applying +block+ to this task's completed value.
-
#on_complete {|value, error| ... } ⇒ self
private
Registers a callback to be invoked when the task reaches a terminal state (+:completed+, +:failed+, or +:cancelled+).
-
#register_child(child) ⇒ Object
private
Registers +child+ as a child task for cancellation propagation.
-
#status ⇒ Symbol
private
Returns the current lifecycle state.
-
#transition!(new_status, value: nil, error: nil) ⇒ Object
private
Updates the task lifecycle state.
Constructor Details
#initialize(name: nil, parent: nil, backend_class: self.class.default_backend_class, &block) ⇒ Task
Returns a new instance of Task.
138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/phronomy/task.rb', line 138 def initialize(name: nil, parent: nil, backend_class: self.class.default_backend_class, &block) @name = name @parent = parent @status = :pending @mutex = Mutex.new @children = [] @on_complete_callbacks = [] @completed_value = nil @completed_error = nil parent&.register_child(self) @backend = backend_class.new(task: self, &block) end |
Instance Attribute Details
#backend ⇒ Backend (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the execution backend for this task.
132 133 134 |
# File 'lib/phronomy/task.rb', line 132 def backend @backend end |
#name ⇒ String? (readonly)
Returns optional human-readable label.
125 126 127 |
# File 'lib/phronomy/task.rb', line 125 def name @name end |
#parent ⇒ Task? (readonly)
Returns parent task in the task tree, if any.
128 129 130 |
# File 'lib/phronomy/task.rb', line 128 def parent @parent end |
Class Method Details
.checkpoint! ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Cooperative cancellation checkpoint.
Raises CancellationError if the current task's status is +:cancelled+. On ThreadBackend, cancellation is delivered via +Thread#raise+ so this is a no-op in practice; on future cooperative backends this will be the primary cancellation mechanism.
Safe to call from outside a task context (no-op when no current task).
104 105 106 107 108 109 |
# File 'lib/phronomy/task.rb', line 104 def self.checkpoint! ct = current return unless ct raise CancellationError, "Task cancelled" if ct.status == :cancelled end |
.current ⇒ Task?
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the Phronomy::Task currently executing on this thread, or +nil+. Returns +nil+ when called from outside a task-managed execution context.
58 59 60 |
# File 'lib/phronomy/task.rb', line 58 def self.current Thread.current[:phronomy_current_task] end |
.current_cpu_slice_start_ms ⇒ Integer?
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the monotonic clock value (ms) when the current task last recorded a yield (or when the task started), or +nil+ when not inside a task context. Used by Runtime#yield for CPU-bound detection without placing +Thread.current+ in files outside the allowlist.
68 69 70 |
# File 'lib/phronomy/task.rb', line 68 def self.current_cpu_slice_start_ms Thread.current[:phronomy_task_cpu_slice_start_ms] end |
.default_backend_class ⇒ Class<Backend>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the process-wide default backend class. Defaults to ThreadBackend. Override in tests or to enable a cooperative scheduler backend.
43 44 45 |
# File 'lib/phronomy/task.rb', line 43 def self.default_backend_class @default_backend_class || ThreadBackend end |
.default_backend_class=(klass) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Sets the process-wide default backend class.
50 51 52 |
# File 'lib/phronomy/task.rb', line 50 def self.default_backend_class=(klass) @default_backend_class = klass end |
.increment_yield_counter! ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns and increments a per-thread yield-if-needed counter. Used by Runtime#yield_if_needed so that the counter is thread-local without putting +Thread.current+ in runtime.rb (which is outside the Thread.current allowlist).
87 88 89 90 91 |
# File 'lib/phronomy/task.rb', line 87 def self.increment_yield_counter! count = (Thread.current[:phronomy_yield_if_needed_counter] || 0) + 1 Thread.current[:phronomy_yield_if_needed_counter] = count count end |
.record_yield! ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Resets the CPU slice start clock for the current task to +now+. Call this immediately after the cooperative yield has been performed so that the next yield correctly measures only the time since the last yield.
76 77 78 79 |
# File 'lib/phronomy/task.rb', line 76 def self.record_yield! Thread.current[:phronomy_task_cpu_slice_start_ms] = Process.clock_gettime(Process::CLOCK_MONOTONIC, :millisecond) end |
.spawn(name: nil, parent: current, backend_class: default_backend_class, &block) ⇒ Task
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Spawns a new task executing +block+ concurrently.
120 121 122 |
# File 'lib/phronomy/task.rb', line 120 def self.spawn(name: nil, parent: current, backend_class: default_backend_class, &block) new(name: name, parent: parent, backend_class: backend_class, &block) end |
Instance Method Details
#alive? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns +true+ while the task's block is still executing.
289 290 291 |
# File 'lib/phronomy/task.rb', line 289 def alive? @backend.alive? end |
#await ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Blocks until the task completes and returns its value. Re-raises any exception raised inside the block.
164 165 166 |
# File 'lib/phronomy/task.rb', line 164 def await @backend.await end |
#cancel! ⇒ self
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Requests cancellation. Propagates to all registered child tasks. Sets status to :cancelled immediately so that even tasks that have not started executing yet are correctly marked as cancelled after join. Passes a CancellationError to on_complete callbacks so callers do not need to call await to discover the error.
265 266 267 268 269 270 271 272 273 274 |
# File 'lib/phronomy/task.rb', line 265 def cancel! transition!(:cancelled, error: CancellationError.new("Task cancelled")) # @backend may be nil if cancel! is called while ImmediateBackend is still # initializing (the block runs synchronously inside .new, so register_child # fires before @backend is assigned). Safe-navigate to avoid NoMethodError. @backend&.cancel! children = @mutex.synchronize { @children.dup } children.each(&:cancel!) self end |
#done? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns +true+ once the task has finished (success, error, or cancellation).
254 255 256 |
# File 'lib/phronomy/task.rb', line 254 def done? %i[completed failed cancelled].include?(status) end |
#join(limit = nil) ⇒ Object?
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Joins the underlying execution context, optionally with a timeout. Returns +nil+ when the timeout expires before completion.
282 283 284 |
# File 'lib/phronomy/task.rb', line 282 def join(limit = nil) @backend.join(limit) end |
#map {|value| ... } ⇒ Task
Returns a new Phronomy::Task whose completed value is the result of applying +block+ to this task's completed value.
If this task fails or is cancelled, the mapped task also fails/is cancelled with the same error. The block is never called in error cases.
The primary use-case is transforming an agent result into a WorkflowContext so that a Workflow entry action can return a Task whose value is picked up by Workflow::FSMSession via the existing +:action_completed+ path:
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
# File 'lib/phronomy/task.rb', line 221 def map(&block) # MappedBackend drives the task lifecycle entirely via on_complete; # it never spawns a thread of its own. mapped = self.class.spawn( name: "#{@name}-mapped", parent: @parent, backend_class: MappedBackend ) {} on_complete do |value, error| mapped_value = nil mapped_error = error unless error begin mapped_value = block.call(value) rescue => e mapped_error = e end end if mapped_error mapped.transition!(:failed, error: mapped_error) else mapped.transition!(:completed, value: mapped_value) end # Unblock mapped.await / mapped.join after the terminal transition. mapped.backend.unblock(mapped_value, mapped_error) end mapped end |
#on_complete {|value, error| ... } ⇒ self
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Registers a callback to be invoked when the task reaches a terminal state (+:completed+, +:failed+, or +:cancelled+).
The callback receives two arguments: +value+ (the task's return value, or +nil+) and +error+ (the exception, or +nil+). These are provided directly so the callback does not need to call +task.await+, which would risk a self-join error when the callback runs inside the task's own thread.
If the task is already done when this method is called, the callback is invoked immediately (synchronously, on the calling thread).
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/phronomy/task.rb', line 182 def on_complete(&callback) fire_now = false fire_args = nil @mutex.synchronize do # Check @status directly to avoid re-entering the mutex (done? calls # status, which also takes @mutex). if %i[completed failed cancelled].include?(@status) fire_now = true fire_args = [@completed_value, @completed_error] else @on_complete_callbacks << callback end end callback.call(*fire_args) if fire_now self end |
#register_child(child) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Registers +child+ as a child task for cancellation propagation. Called automatically during child task initialization.
324 325 326 |
# File 'lib/phronomy/task.rb', line 324 def register_child(child) @mutex.synchronize { @children << child } end |
#status ⇒ Symbol
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the current lifecycle state.
154 155 156 |
# File 'lib/phronomy/task.rb', line 154 def status @mutex.synchronize { @status } end |
#transition!(new_status, value: nil, error: nil) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Updates the task lifecycle state. Called by backends during execution transitions. Terminal states (completed/failed/cancelled) are never overwritten. When a terminal state is reached, fires on_complete callbacks (outside the mutex) passing the result value and error directly.
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 |
# File 'lib/phronomy/task.rb', line 303 def transition!(new_status, value: nil, error: nil) callbacks = nil @mutex.synchronize do # Check @status directly (not via #done?) to avoid re-entering the mutex. return if %i[completed failed cancelled].include?(@status) @status = new_status if %i[completed failed cancelled].include?(new_status) @completed_value = value @completed_error = error callbacks = @on_complete_callbacks.dup @on_complete_callbacks.clear end end callbacks&.each { |cb| cb.call(value, error) } end |