Class: Rooibos::Command::Lifecycle
- Inherits:
-
Object
- Object
- Rooibos::Command::Lifecycle
- Defined in:
- lib/rooibos/command/lifecycle.rb
Overview
Coordinates command execution across the runtime.
Commands run off the main thread. Both the runtime and nested commands via Outlet#source share cancellation tokens. Racing results against cancellation is repetitive. Tracking active commands is tedious.
This class centralizes that logic. It races results against cancellation and timeout. Commands that ignore cancellation are orphaned until process exit. Cooperative cancellation is the only way to exit cleanly.
The framework creates one instance at startup. All outlets share it.
Defined Under Namespace
Classes: Entry
Instance Method Summary collapse
-
#cancel(command) ⇒ Object
Cancels a running command, waiting for its grace period.
-
#initialize ⇒ Lifecycle
constructor
Creates a lifecycle manager.
-
#run_async(command, channel) ⇒ Object
Runs a command asynchronously, tracking it for later cancellation.
-
#run_sync(command, token, timeout:) ⇒ Object
Runs a command synchronously, returning its result.
-
#shutdown ⇒ Object
Cancels all active commands and waits for them to complete.
Constructor Details
#initialize ⇒ Lifecycle
Creates a lifecycle manager.
The runtime creates one at startup. All outlets share it. Child commands from Outlet#source inherit the same lifecycle for consistent thread management.
29 30 31 |
# File 'lib/rooibos/command/lifecycle.rb', line 29 def initialize @active = Concurrent::Map.new end |
Instance Method Details
#cancel(command) ⇒ Object
Cancels a running command, waiting for its grace period.
Signals cancellation, waits for the command’s grace period, then removes it from tracking. Does nothing if the command isn’t tracked.
- command
-
The command to cancel (must be the same object passed to run_async).
107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/rooibos/command/lifecycle.rb', line 107 def cancel(command) entry = @active[command] return unless entry&.future&.pending? entry.origin.resolve # Signal cancellation grace = command.respond_to?(:rooibos_cancellation_grace_period) ? command.rooibos_cancellation_grace_period : 0.1 entry.future.wait(grace.finite? ? grace : nil) @active.delete(command) entry # Return so caller can remove from pending_futures end |
#run_async(command, channel) ⇒ Object
Runs a command asynchronously, tracking it for later cancellation.
Spawns a future that executes the command. Tracks the command in the active map for cancellation support. Errors are pushed to the channel as Message::Error messages.
- command
-
Callable with
call(out, token). - channel
-
Channel to push results and errors to.
Returns a hash with :future and :origin for tracking.
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/rooibos/command/lifecycle.rb', line 84 def run_async(command, channel) cancellation, origin = Concurrent::Cancellation.new outlet = Outlet.new(channel, lifecycle: self) future = Concurrent::Promises.future do command.call(outlet, cancellation) rescue => e channel.push Message::Error.new(command:, exception: e) ensure outlet.wait # Don't resolve until children from standing complete end entry = Entry.new(future:, origin:) @active[command] = entry entry end |
#run_sync(command, token, timeout:) ⇒ Object
Runs a command synchronously, returning its result.
Spawns a thread, races the result against cancellation and timeout. On cancellation, waits the grace period then orphans the thread if needed.
- command
-
Callable with
call(out, token). - token
-
Parent’s cancellation token.
- timeout
-
Max wait seconds for the result.
Returns the child’s message, or nil if canceled or timed out. Raises if the child raised.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/rooibos/command/lifecycle.rb', line 44 def run_sync(command, token, timeout:) return nil if token.canceled? child_channel = Concurrent::Promises::Channel.new child_outlet = Outlet.new(child_channel, lifecycle: self) exception = nil Concurrent::Promises.future do command.call(child_outlet, token) rescue => e exception = e end # Race: pop result vs cancellation vs timeout pop_future = Concurrent::Promises.future { child_channel.pop(timeout, :timeout) } Concurrent::Promises.any_event(pop_future, token.origin).wait # Cooperative cancellation only — misbehaving commands are orphaned return nil if token.canceled? if exception raise exception.is_a?(Exception) ? exception : RuntimeError.new(exception.to_s) end result = pop_future.value return nil if result == :timeout result end |
#shutdown ⇒ Object
Cancels all active commands and waits for them to complete.
Iterates through all tracked commands, signals cancellation, and waits for each command’s grace period. Called at runtime shutdown.
125 126 127 128 129 130 131 132 133 |
# File 'lib/rooibos/command/lifecycle.rb', line 125 def shutdown @active.each do |command, entry| entry.origin.resolve # Signal cancellation grace = command.respond_to?(:rooibos_cancellation_grace_period) ? command.rooibos_cancellation_grace_period : 0.1 entry.future.wait(grace.finite? ? grace : nil) end end |