Class: Rooibos::Command::Lifecycle

Inherits:
Object
  • Object
show all
Defined in:
lib/rooibos/command/lifecycle.rb

Overview

Coordinates command execution across the runtime.

Commands run off the main thread. Both the runtime and nested commands via Outlet#source share cancellation tokens. Racing results against cancellation is repetitive. Tracking active commands is tedious.

This class centralizes that logic. It races results against cancellation and timeout. Commands that ignore cancellation are orphaned until process exit. Cooperative cancellation is the only way to exit cleanly.

The framework creates one instance at startup. All outlets share it.

Defined Under Namespace

Classes: Entry

Instance Method Summary collapse

Constructor Details

#initializeLifecycle

Creates a lifecycle manager.

The runtime creates one at startup. All outlets share it. Child commands from Outlet#source inherit the same lifecycle for consistent thread management.



29
30
31
# File 'lib/rooibos/command/lifecycle.rb', line 29

def initialize
  @active = Concurrent::Map.new
end

Instance Method Details

#cancel(command) ⇒ Object

Cancels a running command, waiting for its grace period.

Signals cancellation, waits for the command’s grace period, then removes it from tracking. Does nothing if the command isn’t tracked.

command

The command to cancel (must be the same object passed to run_async).



107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/rooibos/command/lifecycle.rb', line 107

def cancel(command)
  entry = @active[command]
  return unless entry&.future&.pending?

  entry.origin.resolve # Signal cancellation

  grace = command.respond_to?(:rooibos_cancellation_grace_period) ?
    command.rooibos_cancellation_grace_period : 0.1
  entry.future.wait(grace.finite? ? grace : nil)

  @active.delete(command)
  entry # Return so caller can remove from pending_futures
end

#run_async(command, channel) ⇒ Object

Runs a command asynchronously, tracking it for later cancellation.

Spawns a future that executes the command. Tracks the command in the active map for cancellation support. Errors are pushed to the channel as Message::Error messages.

command

Callable with call(out, token).

channel

Channel to push results and errors to.

Returns a hash with :future and :origin for tracking.



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/rooibos/command/lifecycle.rb', line 84

def run_async(command, channel)
  cancellation, origin = Concurrent::Cancellation.new
  outlet = Outlet.new(channel, lifecycle: self)

  future = Concurrent::Promises.future do
    command.call(outlet, cancellation)
  rescue => e
    channel.push Message::Error.new(command:, exception: e)
  ensure
    outlet.wait # Don't resolve until children from standing complete
  end

  entry = Entry.new(future:, origin:)
  @active[command] = entry
  entry
end

#run_sync(command, token, timeout:) ⇒ Object

Runs a command synchronously, returning its result.

Spawns a thread, races the result against cancellation and timeout. On cancellation, waits the grace period then orphans the thread if needed.

command

Callable with call(out, token).

token

Parent’s cancellation token.

timeout

Max wait seconds for the result.

Returns the child’s message, or nil if canceled or timed out. Raises if the child raised.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/rooibos/command/lifecycle.rb', line 44

def run_sync(command, token, timeout:)
  return nil if token.canceled?

  child_channel = Concurrent::Promises::Channel.new
  child_outlet = Outlet.new(child_channel, lifecycle: self)

  exception = nil
  Concurrent::Promises.future do
    command.call(child_outlet, token)
  rescue => e
    exception = e
  end

  # Race: pop result vs cancellation vs timeout
  pop_future = Concurrent::Promises.future { child_channel.pop(timeout, :timeout) }
  Concurrent::Promises.any_event(pop_future, token.origin).wait

  # Cooperative cancellation only — misbehaving commands are orphaned
  return nil if token.canceled?

  if exception
    raise exception.is_a?(Exception) ? exception : RuntimeError.new(exception.to_s)
  end

  result = pop_future.value
  return nil if result == :timeout

  result
end

#shutdownObject

Cancels all active commands and waits for them to complete.

Iterates through all tracked commands, signals cancellation, and waits for each command’s grace period. Called at runtime shutdown.



125
126
127
128
129
130
131
132
133
# File 'lib/rooibos/command/lifecycle.rb', line 125

def shutdown
  @active.each do |command, entry|
    entry.origin.resolve # Signal cancellation

    grace = command.respond_to?(:rooibos_cancellation_grace_period) ?
      command.rooibos_cancellation_grace_period : 0.1
    entry.future.wait(grace.finite? ? grace : nil)
  end
end