Class: Rooibos::Command::All

Inherits:
Object
  • Object
show all
Includes:
Custom
Defined in:
lib/rooibos/command/all.rb

Overview

Aggregates parallel commands and returns all results together.

Dashboards load user profiles, settings, and stats before rendering. Fetching sequentially is slow. Fire-and-forget batches lose correlation between commands and their results.

This command runs children in parallel and collects their results into a single Message::All response. Pattern-match on the envelope to correlate results. Each result appears in the same order as commands.

Use it for coordinated fetches where you need all results before proceeding.

Prefer the Command.all factory method for convenience.

Example

# Using the factory method (recommended)
Command.all(:dashboard,
  Command.http(:get, "/users", :_),
  Command.http(:get, "/stats", :_),
)

# Using the class directly
All.new(:dashboard,
  Command.http(:get, "/users", :_),
  Command.http(:get, "/stats", :_),
)

# Pattern-match on the aggregated result
def update(message, model)
  case message
  in { type: :all, envelope: :dashboard, results: [users, stats] }
    model.with(users:, stats:, loading: false)
  end
end

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Custom

#deconstruct_keys, #rooibos_cancellation_grace_period, #rooibos_command?

Class Method Details

.new(tag, *args) ⇒ Object

Creates an aggregating parallel command.

tag

Symbol to tag the result message.

args

Commands to run in parallel. Pass as multiple arguments or a single array.

Example

All.new(:dashboard,
  Command.http(:get, "/users", :_),
  Command.http(:get, "/stats", :_),
)


63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/rooibos/command/all.rb', line 63

def new(tag, *args)
  # DWIM: flatten single-array arg to support both call patterns
  nested = args.size == 1 && args.first.is_a?(Array)
  commands = [args].flatten(2)

  if RatatuiRuby::Debug.enabled?
    commands.each do |cmd|
      unless Ractor.shareable?(cmd)
        raise Rooibos::Error::Invariant,
          "Command is not Ractor-shareable: #{cmd.inspect}\n" \
            "Use Ractor.make_shareable or a Data.define command."
      end
    end
  end

  instance = allocate
  instance.__send__(:initialize, envelope: tag, commands: commands.freeze, nested:)
  instance
end

Instance Method Details

#call(out, token) ⇒ Object

Executes all child commands in parallel and aggregates results.

Sends Message::All when all children complete. Results appear in the same order as commands. If canceled, sends Message::Canceled.

out

Outlet for sending messages.

token

Cancellation token from the runtime.



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/rooibos/command/all.rb', line 91

def call(out, token)
  # Early return for empty commands - prevents hang from zip_futures([])
  if commands.empty?
    results = [] #: Array[Object]
    response = Message::All.new(envelope:, results: results.freeze, nested:)
    out.put(Ractor.make_shareable(response))
    return
  end

  child_lifecycle = Lifecycle.new

  futures = commands.map do |command|
    Concurrent::Promises.future do
      child_channel = Concurrent::Promises::Channel.new
      child_outlet = Outlet.new(child_channel, lifecycle: child_lifecycle)
      command.call(child_outlet, token)
      child_channel.pop
    end
  end

  all_done = Concurrent::Promises.zip_futures(*futures)
  Concurrent::Promises.any_event(all_done, token.origin).wait

  return out.put(Message::Canceled.new(command: self)) if token.canceled?

  shareable_results = Ractor.make_shareable(all_done.value!)
  response = Message::All.new(envelope:, results: shareable_results, nested:)
  out.put(Ractor.make_shareable(response))
end