Class: Phronomy::Task
- Inherits:
-
Object
- Object
- Phronomy::Task
- Defined in:
- lib/phronomy/task.rb,
lib/phronomy/task/backend.rb,
lib/phronomy/task/fiber_backend.rb,
lib/phronomy/task/thread_backend.rb,
lib/phronomy/task/immediate_backend.rb
Overview
A single unit of concurrent work.
Decouples task semantics from the underlying execution primitive via a pluggable Backend. The default backend is ThreadBackend; a cooperative or test-double backend can be substituted via Task.default_backend_class= or by passing +backend_class:+ to Task.spawn.
+Task.spawn+ is an internal API used by schedulers and the framework itself. Application code and framework components should use +Runtime.instance.spawn+ instead, which routes through the configured scheduler and respects the concurrency model.
Defined Under Namespace
Classes: Backend, FiberBackend, ImmediateBackend, ThreadBackend
Constant Summary collapse
- STATES =
Valid task lifecycle states.
%i[pending running completed failed cancelled].freeze
- SCHEDULER_KEY =
Cooperative task backend using Ruby Fibers.
Unlike ImmediateBackend (which runs the block to completion on the calling thread) or ThreadBackend (which runs the block on a new OS thread), +FiberBackend+ wraps the block in a +Fiber+ that is NOT started immediately. The owning scheduler calls #step to advance execution one cooperative step at a time.
This backend is used exclusively by Runtime::DeterministicScheduler to enable deterministic, wall-clock-free testing of concurrent logic.
Thread-local key under which the currently active DeterministicScheduler is stored so that #await can suspend cooperatively.
:phronomy_deterministic_scheduler
Instance Attribute Summary collapse
-
#backend ⇒ Backend
readonly
private
The execution backend for this task.
-
#name ⇒ String?
readonly
Optional human-readable label.
-
#parent ⇒ Task?
readonly
Parent task in the task tree, if any.
Class Method Summary collapse
-
.checkpoint! ⇒ void
private
Cooperative cancellation checkpoint.
-
.current ⇒ Task?
private
Returns the Task currently executing on this thread, or +nil+.
-
.current_cpu_slice_start_ms ⇒ Integer?
private
Returns the monotonic clock value (ms) when the current task last recorded a yield (or when the task started), or +nil+ when not inside a task context.
-
.default_backend_class ⇒ Class<Backend>
private
Returns the process-wide default backend class.
-
.default_backend_class=(klass) ⇒ Object
private
Sets the process-wide default backend class.
-
.increment_yield_counter! ⇒ Integer
private
Returns and increments a per-thread yield-if-needed counter.
-
.record_yield! ⇒ Object
private
Resets the CPU slice start clock for the current task to +now+.
-
.spawn(name: nil, parent: current, backend_class: default_backend_class, &block) ⇒ Task
private
Spawns a new task executing +block+ concurrently.
Instance Method Summary collapse
-
#alive? ⇒ Boolean
private
Returns +true+ while the task's block is still executing.
-
#await ⇒ Object
private
Blocks until the task completes and returns its value.
-
#cancel! ⇒ self
private
Requests cancellation.
-
#done? ⇒ Boolean
private
Returns +true+ once the task has finished (success, error, or cancellation).
-
#initialize(name: nil, parent: nil, backend_class: self.class.default_backend_class, &block) ⇒ Task
constructor
A new instance of Task.
-
#join(limit = nil) ⇒ Object?
private
Joins the underlying execution context, optionally with a timeout.
-
#on_complete {|value, error| ... } ⇒ self
private
Registers a callback to be invoked when the task reaches a terminal state (+:completed+, +:failed+, or +:cancelled+).
-
#register_child(child) ⇒ Object
private
Registers +child+ as a child task for cancellation propagation.
-
#status ⇒ Symbol
private
Returns the current lifecycle state.
-
#transition!(new_status, value: nil, error: nil) ⇒ Object
private
Updates the task lifecycle state.
Constructor Details
#initialize(name: nil, parent: nil, backend_class: self.class.default_backend_class, &block) ⇒ Task
Returns a new instance of Task.
137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/phronomy/task.rb', line 137 def initialize(name: nil, parent: nil, backend_class: self.class.default_backend_class, &block) @name = name @parent = parent @status = :pending @mutex = Mutex.new @children = [] @on_complete_callbacks = [] @completed_value = nil @completed_error = nil parent&.register_child(self) @backend = backend_class.new(task: self, &block) end |
Instance Attribute Details
#backend ⇒ Backend (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the execution backend for this task.
131 132 133 |
# File 'lib/phronomy/task.rb', line 131 def backend @backend end |
#name ⇒ String? (readonly)
Returns optional human-readable label.
124 125 126 |
# File 'lib/phronomy/task.rb', line 124 def name @name end |
#parent ⇒ Task? (readonly)
Returns parent task in the task tree, if any.
127 128 129 |
# File 'lib/phronomy/task.rb', line 127 def parent @parent end |
Class Method Details
.checkpoint! ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Cooperative cancellation checkpoint.
Raises CancellationError if the current task's status is +:cancelled+. On ThreadBackend, cancellation is delivered via +Thread#raise+ so this is a no-op in practice; on future cooperative backends this will be the primary cancellation mechanism.
Safe to call from outside a task context (no-op when no current task).
103 104 105 106 107 108 |
# File 'lib/phronomy/task.rb', line 103 def self.checkpoint! ct = current return unless ct raise CancellationError, "Task cancelled" if ct.status == :cancelled end |
.current ⇒ Task?
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the Phronomy::Task currently executing on this thread, or +nil+. Returns +nil+ when called from outside a task-managed execution context.
57 58 59 |
# File 'lib/phronomy/task.rb', line 57 def self.current Thread.current[:phronomy_current_task] end |
.current_cpu_slice_start_ms ⇒ Integer?
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the monotonic clock value (ms) when the current task last recorded a yield (or when the task started), or +nil+ when not inside a task context. Used by Runtime#yield for CPU-bound detection without placing +Thread.current+ in files outside the allowlist.
67 68 69 |
# File 'lib/phronomy/task.rb', line 67 def self.current_cpu_slice_start_ms Thread.current[:phronomy_task_cpu_slice_start_ms] end |
.default_backend_class ⇒ Class<Backend>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the process-wide default backend class. Defaults to ThreadBackend. Override in tests or to enable a cooperative scheduler backend.
42 43 44 |
# File 'lib/phronomy/task.rb', line 42 def self.default_backend_class @default_backend_class || ThreadBackend end |
.default_backend_class=(klass) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Sets the process-wide default backend class.
49 50 51 |
# File 'lib/phronomy/task.rb', line 49 def self.default_backend_class=(klass) @default_backend_class = klass end |
.increment_yield_counter! ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns and increments a per-thread yield-if-needed counter. Used by Runtime#yield_if_needed so that the counter is thread-local without putting +Thread.current+ in runtime.rb (which is outside the Thread.current allowlist).
86 87 88 89 90 |
# File 'lib/phronomy/task.rb', line 86 def self.increment_yield_counter! count = (Thread.current[:phronomy_yield_if_needed_counter] || 0) + 1 Thread.current[:phronomy_yield_if_needed_counter] = count count end |
.record_yield! ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Resets the CPU slice start clock for the current task to +now+. Call this immediately after the cooperative yield has been performed so that the next yield correctly measures only the time since the last yield.
75 76 77 78 |
# File 'lib/phronomy/task.rb', line 75 def self.record_yield! Thread.current[:phronomy_task_cpu_slice_start_ms] = Process.clock_gettime(Process::CLOCK_MONOTONIC, :millisecond) end |
.spawn(name: nil, parent: current, backend_class: default_backend_class, &block) ⇒ Task
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Spawns a new task executing +block+ concurrently.
119 120 121 |
# File 'lib/phronomy/task.rb', line 119 def self.spawn(name: nil, parent: current, backend_class: default_backend_class, &block) new(name: name, parent: parent, backend_class: backend_class, &block) end |
Instance Method Details
#alive? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns +true+ while the task's block is still executing.
236 237 238 |
# File 'lib/phronomy/task.rb', line 236 def alive? @backend.alive? end |
#await ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Blocks until the task completes and returns its value. Re-raises any exception raised inside the block.
163 164 165 |
# File 'lib/phronomy/task.rb', line 163 def await @backend.await end |
#cancel! ⇒ self
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Requests cancellation. Propagates to all registered child tasks. Sets status to :cancelled immediately so that even tasks that have not started executing yet are correctly marked as cancelled after join. Passes a CancellationError to on_complete callbacks so callers do not need to call await to discover the error.
212 213 214 215 216 217 218 219 220 221 |
# File 'lib/phronomy/task.rb', line 212 def cancel! transition!(:cancelled, error: CancellationError.new("Task cancelled")) # @backend may be nil if cancel! is called while ImmediateBackend is still # initializing (the block runs synchronously inside .new, so register_child # fires before @backend is assigned). Safe-navigate to avoid NoMethodError. @backend&.cancel! children = @mutex.synchronize { @children.dup } children.each(&:cancel!) self end |
#done? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns +true+ once the task has finished (success, error, or cancellation).
201 202 203 |
# File 'lib/phronomy/task.rb', line 201 def done? %i[completed failed cancelled].include?(status) end |
#join(limit = nil) ⇒ Object?
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Joins the underlying execution context, optionally with a timeout. Returns +nil+ when the timeout expires before completion.
229 230 231 |
# File 'lib/phronomy/task.rb', line 229 def join(limit = nil) @backend.join(limit) end |
#on_complete {|value, error| ... } ⇒ self
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Registers a callback to be invoked when the task reaches a terminal state (+:completed+, +:failed+, or +:cancelled+).
The callback receives two arguments: +value+ (the task's return value, or +nil+) and +error+ (the exception, or +nil+). These are provided directly so the callback does not need to call +task.await+, which would risk a self-join error when the callback runs inside the task's own thread.
If the task is already done when this method is called, the callback is invoked immediately (synchronously, on the calling thread).
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/phronomy/task.rb', line 181 def on_complete(&callback) fire_now = false fire_args = nil @mutex.synchronize do # Check @status directly to avoid re-entering the mutex (done? calls # status, which also takes @mutex). if %i[completed failed cancelled].include?(@status) fire_now = true fire_args = [@completed_value, @completed_error] else @on_complete_callbacks << callback end end callback.call(*fire_args) if fire_now self end |
#register_child(child) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Registers +child+ as a child task for cancellation propagation. Called automatically during child task initialization.
271 272 273 |
# File 'lib/phronomy/task.rb', line 271 def register_child(child) @mutex.synchronize { @children << child } end |
#status ⇒ Symbol
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the current lifecycle state.
153 154 155 |
# File 'lib/phronomy/task.rb', line 153 def status @mutex.synchronize { @status } end |
#transition!(new_status, value: nil, error: nil) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Updates the task lifecycle state. Called by backends during execution transitions. Terminal states (completed/failed/cancelled) are never overwritten. When a terminal state is reached, fires on_complete callbacks (outside the mutex) passing the result value and error directly.
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 |
# File 'lib/phronomy/task.rb', line 250 def transition!(new_status, value: nil, error: nil) callbacks = nil @mutex.synchronize do # Check @status directly (not via #done?) to avoid re-entering the mutex. return if %i[completed failed cancelled].include?(@status) @status = new_status if %i[completed failed cancelled].include?(new_status) @completed_value = value @completed_error = error callbacks = @on_complete_callbacks.dup @on_complete_callbacks.clear end end callbacks&.each { |cb| cb.call(value, error) } end |