Class: Phronomy::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/phronomy/task.rb,
lib/phronomy/task/backend.rb,
lib/phronomy/task/fiber_backend.rb,
lib/phronomy/task/thread_backend.rb,
lib/phronomy/task/immediate_backend.rb

Overview

A single unit of concurrent work.

Decouples task semantics from the underlying execution primitive via a pluggable Backend. The default backend is ThreadBackend; a cooperative or test-double backend can be substituted via Task.default_backend_class= or by passing +backend_class:+ to Task.spawn.

+Task.spawn+ is an internal API used by schedulers and the framework itself. Application code and framework components should use +Runtime.instance.spawn+ instead, which routes through the configured scheduler and respects the concurrency model.

Examples:

Basic usage (framework/test code only — prefer Runtime.instance.spawn in app code)

task = Phronomy::Task.spawn { expensive_io() }
result = task.await   # blocks until done, re-raises errors

Cancel a running task

task = Phronomy::Task.spawn { loop { Phronomy::Task.checkpoint! } }
task.cancel!

Task tree — cancel parent cancels children

parent = Phronomy::Task.spawn { sleep 10 }
child  = Phronomy::Task.spawn(parent: parent) { sleep 10 }
parent.cancel!   # child is also cancelled

Defined Under Namespace

Classes: Backend, FiberBackend, ImmediateBackend, ThreadBackend

Constant Summary collapse

STATES =

Valid task lifecycle states.

%i[pending running completed failed cancelled].freeze
SCHEDULER_KEY =

Cooperative task backend using Ruby Fibers.

Unlike ImmediateBackend (which runs the block to completion on the calling thread) or ThreadBackend (which runs the block on a new OS thread), +FiberBackend+ wraps the block in a +Fiber+ that is NOT started immediately. The owning scheduler calls #step to advance execution one cooperative step at a time.

This backend is used exclusively by Runtime::DeterministicScheduler to enable deterministic, wall-clock-free testing of concurrent logic.

Thread-local key under which the currently active DeterministicScheduler is stored so that #await can suspend cooperatively.

:phronomy_deterministic_scheduler

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name: nil, parent: nil, backend_class: self.class.default_backend_class, &block) ⇒ Task

Returns a new instance of Task.

Parameters:

  • name (String, nil) (defaults to: nil)
  • parent (Task, nil) (defaults to: nil)
  • backend_class (Class<Backend>) (defaults to: self.class.default_backend_class)


137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/phronomy/task.rb', line 137

def initialize(name: nil, parent: nil, backend_class: self.class.default_backend_class, &block)
  @name = name
  @parent = parent
  @status = :pending
  @mutex = Mutex.new
  @children = []
  @on_complete_callbacks = []
  @completed_value = nil
  @completed_error = nil
  parent&.register_child(self)
  @backend = backend_class.new(task: self, &block)
end

Instance Attribute Details

#backendBackend (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the execution backend for this task.

Returns:

  • (Backend)

    the execution backend for this task



131
132
133
# File 'lib/phronomy/task.rb', line 131

def backend
  @backend
end

#nameString? (readonly)

Returns optional human-readable label.

Returns:

  • (String, nil)

    optional human-readable label



124
125
126
# File 'lib/phronomy/task.rb', line 124

def name
  @name
end

#parentTask? (readonly)

Returns parent task in the task tree, if any.

Returns:

  • (Task, nil)

    parent task in the task tree, if any



127
128
129
# File 'lib/phronomy/task.rb', line 127

def parent
  @parent
end

Class Method Details

.checkpoint!void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Cooperative cancellation checkpoint.

Raises CancellationError if the current task's status is +:cancelled+. On ThreadBackend, cancellation is delivered via +Thread#raise+ so this is a no-op in practice; on future cooperative backends this will be the primary cancellation mechanism.

Safe to call from outside a task context (no-op when no current task).

Raises:



103
104
105
106
107
108
# File 'lib/phronomy/task.rb', line 103

def self.checkpoint!
  ct = current
  return unless ct

  raise CancellationError, "Task cancelled" if ct.status == :cancelled
end

.currentTask?

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the Phronomy::Task currently executing on this thread, or +nil+. Returns +nil+ when called from outside a task-managed execution context.

Returns:



57
58
59
# File 'lib/phronomy/task.rb', line 57

def self.current
  Thread.current[:phronomy_current_task]
end

.current_cpu_slice_start_msInteger?

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the monotonic clock value (ms) when the current task last recorded a yield (or when the task started), or +nil+ when not inside a task context. Used by Runtime#yield for CPU-bound detection without placing +Thread.current+ in files outside the allowlist.

Returns:

  • (Integer, nil)


67
68
69
# File 'lib/phronomy/task.rb', line 67

def self.current_cpu_slice_start_ms
  Thread.current[:phronomy_task_cpu_slice_start_ms]
end

.default_backend_classClass<Backend>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the process-wide default backend class. Defaults to ThreadBackend. Override in tests or to enable a cooperative scheduler backend.

Returns:



42
43
44
# File 'lib/phronomy/task.rb', line 42

def self.default_backend_class
  @default_backend_class || ThreadBackend
end

.default_backend_class=(klass) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Sets the process-wide default backend class.

Parameters:



49
50
51
# File 'lib/phronomy/task.rb', line 49

def self.default_backend_class=(klass)
  @default_backend_class = klass
end

.increment_yield_counter!Integer

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns and increments a per-thread yield-if-needed counter. Used by Runtime#yield_if_needed so that the counter is thread-local without putting +Thread.current+ in runtime.rb (which is outside the Thread.current allowlist).

Returns:

  • (Integer)

    the new counter value



86
87
88
89
90
# File 'lib/phronomy/task.rb', line 86

def self.increment_yield_counter!
  count = (Thread.current[:phronomy_yield_if_needed_counter] || 0) + 1
  Thread.current[:phronomy_yield_if_needed_counter] = count
  count
end

.record_yield!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Resets the CPU slice start clock for the current task to +now+. Call this immediately after the cooperative yield has been performed so that the next yield correctly measures only the time since the last yield.



75
76
77
78
# File 'lib/phronomy/task.rb', line 75

def self.record_yield!
  Thread.current[:phronomy_task_cpu_slice_start_ms] =
    Process.clock_gettime(Process::CLOCK_MONOTONIC, :millisecond)
end

.spawn(name: nil, parent: current, backend_class: default_backend_class, &block) ⇒ Task

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Spawns a new task executing +block+ concurrently.

Parameters:

  • name (String, nil) (defaults to: nil)

    optional human-readable label

  • parent (Task, nil) (defaults to: current)

    parent task; cancelling the parent also cancels this task (default: currently running task)

  • backend_class (Class<Backend>) (defaults to: default_backend_class)

    backend to use

Yield Returns:

  • (Object)

    the task result

Returns:



119
120
121
# File 'lib/phronomy/task.rb', line 119

def self.spawn(name: nil, parent: current, backend_class: default_backend_class, &block)
  new(name: name, parent: parent, backend_class: backend_class, &block)
end

Instance Method Details

#alive?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns +true+ while the task's block is still executing.

Returns:

  • (Boolean)


236
237
238
# File 'lib/phronomy/task.rb', line 236

def alive?
  @backend.alive?
end

#awaitObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Blocks until the task completes and returns its value. Re-raises any exception raised inside the block.

Returns:

  • (Object)

    the result produced by the block

Raises:

  • (Exception)

    if the block raised an error



163
164
165
# File 'lib/phronomy/task.rb', line 163

def await
  @backend.await
end

#cancel!self

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Requests cancellation. Propagates to all registered child tasks. Sets status to :cancelled immediately so that even tasks that have not started executing yet are correctly marked as cancelled after join. Passes a CancellationError to on_complete callbacks so callers do not need to call await to discover the error.

Returns:

  • (self)


212
213
214
215
216
217
218
219
220
221
# File 'lib/phronomy/task.rb', line 212

def cancel!
  transition!(:cancelled, error: CancellationError.new("Task cancelled"))
  # @backend may be nil if cancel! is called while ImmediateBackend is still
  # initializing (the block runs synchronously inside .new, so register_child
  # fires before @backend is assigned).  Safe-navigate to avoid NoMethodError.
  @backend&.cancel!
  children = @mutex.synchronize { @children.dup }
  children.each(&:cancel!)
  self
end

#done?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns +true+ once the task has finished (success, error, or cancellation).

Returns:

  • (Boolean)


201
202
203
# File 'lib/phronomy/task.rb', line 201

def done?
  %i[completed failed cancelled].include?(status)
end

#join(limit = nil) ⇒ Object?

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Joins the underlying execution context, optionally with a timeout. Returns +nil+ when the timeout expires before completion.

Parameters:

  • limit (Numeric, nil) (defaults to: nil)

    seconds to wait; nil waits indefinitely

Returns:

  • (Object, nil)


229
230
231
# File 'lib/phronomy/task.rb', line 229

def join(limit = nil)
  @backend.join(limit)
end

#on_complete {|value, error| ... } ⇒ self

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Registers a callback to be invoked when the task reaches a terminal state (+:completed+, +:failed+, or +:cancelled+).

The callback receives two arguments: +value+ (the task's return value, or +nil+) and +error+ (the exception, or +nil+). These are provided directly so the callback does not need to call +task.await+, which would risk a self-join error when the callback runs inside the task's own thread.

If the task is already done when this method is called, the callback is invoked immediately (synchronously, on the calling thread).

Yields:

  • (value, error)

    called when the task finishes

Returns:

  • (self)


181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/phronomy/task.rb', line 181

def on_complete(&callback)
  fire_now = false
  fire_args = nil
  @mutex.synchronize do
    # Check @status directly to avoid re-entering the mutex (done? calls
    # status, which also takes @mutex).
    if %i[completed failed cancelled].include?(@status)
      fire_now = true
      fire_args = [@completed_value, @completed_error]
    else
      @on_complete_callbacks << callback
    end
  end
  callback.call(*fire_args) if fire_now
  self
end

#register_child(child) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Registers +child+ as a child task for cancellation propagation. Called automatically during child task initialization.

Parameters:



271
272
273
# File 'lib/phronomy/task.rb', line 271

def register_child(child)
  @mutex.synchronize { @children << child }
end

#statusSymbol

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the current lifecycle state.

Returns:



153
154
155
# File 'lib/phronomy/task.rb', line 153

def status
  @mutex.synchronize { @status }
end

#transition!(new_status, value: nil, error: nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Updates the task lifecycle state. Called by backends during execution transitions. Terminal states (completed/failed/cancelled) are never overwritten. When a terminal state is reached, fires on_complete callbacks (outside the mutex) passing the result value and error directly.

Parameters:

  • new_status (Symbol)
  • value (Object, nil) (defaults to: nil)

    task return value (terminal states only)

  • error (Exception, nil) (defaults to: nil)

    exception raised by the block, if any



250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/phronomy/task.rb', line 250

def transition!(new_status, value: nil, error: nil)
  callbacks = nil
  @mutex.synchronize do
    # Check @status directly (not via #done?) to avoid re-entering the mutex.
    return if %i[completed failed cancelled].include?(@status)

    @status = new_status
    if %i[completed failed cancelled].include?(new_status)
      @completed_value = value
      @completed_error = error
      callbacks = @on_complete_callbacks.dup
      @on_complete_callbacks.clear
    end
  end
  callbacks&.each { |cb| cb.call(value, error) }
end