Class: RobotLab::Network

Inherits:
Object
  • Object
show all
Includes:
Utils
Defined in:
lib/robot_lab/network.rb

Overview

Orchestrates multiple robots in a pipeline workflow

Network is a thin wrapper around SimpleFlow::Pipeline that provides a clean DSL for defining robot workflows with sequential, parallel, and conditional execution.

Shared Memory

Networks provide a shared reactive memory that all robots can read and write. Robots can subscribe to memory keys and be notified when values change, or use blocking reads to wait for values from other robots.

Broadcast Messages

Networks support a broadcast channel for network-wide announcements. Use ‘broadcast` to send messages to all robots, and `on_broadcast` to register handlers for incoming broadcasts.

Examples:

Sequential execution

network = RobotLab.create_network(name: "pipeline") do
  task :analyst, analyst_robot, depends_on: :none
  task :writer, writer_robot, depends_on: [:analyst]
end

With per-task context

network = RobotLab.create_network(name: "support") do
  task :classifier, classifier_robot, depends_on: :none
  task :billing, billing_robot,
       context: { department: "billing" },
       tools: [RefundTool],
       depends_on: :optional
end

Parallel execution with shared memory

network = RobotLab.create_network(name: "analysis") do
  task :fetch, fetcher_robot, depends_on: :none
  task :sentiment, sentiment_robot, depends_on: [:fetch]
  task :entities, entity_robot, depends_on: [:fetch]
  task :summarize, summary_robot, depends_on: [:sentiment, :entities]
end

# In sentiment_robot:
memory.set(:sentiment, analyze_sentiment(text))

# In summarize_robot:
results = memory.get(:sentiment, :entities, wait: 60)

Broadcasting

network.on_broadcast do |message|
  puts "Received: #{message[:event]}"
end

network.broadcast(event: :pause, reason: "rate limit")

Constant Summary collapse

BROADCAST_KEY =

Reserved key for broadcast messages in memory

:_network_broadcast

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, concurrency: :auto, memory: nil, config: nil, parallel_mode: :async) { ... } ⇒ Network

Creates a new Network instance.

Examples:

network = Network.new(name: "support") do
  task :classifier, classifier, depends_on: :none
  task :billing, billing_robot, context: { dept: "billing" }, depends_on: :optional
end

Parameters:

  • name (String)

    unique identifier for the network

  • concurrency (Symbol) (defaults to: :auto)

    concurrency model (:auto, :threads, :async)

  • memory (Memory, nil) (defaults to: nil)

    optional pre-configured memory instance

Yields:

  • Block for defining pipeline tasks



89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/robot_lab/network.rb', line 89

def initialize(name:, concurrency: :auto, memory: nil, config: nil, parallel_mode: :async, &)
  @name = name.to_s
  @robots = {}
  @tasks = {}
  @pipeline = SimpleFlow::Pipeline.new(concurrency: concurrency)
  @memory = memory || Memory.new(network_name: @name)
  @config = config || RunConfig.new
  @parallel_mode = parallel_mode
  @broadcast_handlers = []
  @bus_poller = BusPoller.new.start

  instance_eval(&) if block_given?
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



74
75
76
# File 'lib/robot_lab/network.rb', line 74

def config
  @config
end

#memoryMemory (readonly)

Returns shared memory for all robots in the network.

Returns:

  • (Memory)

    shared memory for all robots in the network



74
# File 'lib/robot_lab/network.rb', line 74

attr_reader :name, :pipeline, :robots, :memory, :config, :parallel_mode

#nameString (readonly)

Returns unique identifier for the network.

Returns:

  • (String)

    unique identifier for the network



74
75
76
# File 'lib/robot_lab/network.rb', line 74

def name
  @name
end

#parallel_modeObject (readonly)

Returns the value of attribute parallel_mode.



74
75
76
# File 'lib/robot_lab/network.rb', line 74

def parallel_mode
  @parallel_mode
end

#pipelineSimpleFlow::Pipeline (readonly)

Returns the underlying pipeline.

Returns:

  • (SimpleFlow::Pipeline)

    the underlying pipeline



74
# File 'lib/robot_lab/network.rb', line 74

attr_reader :name, :pipeline, :robots, :memory, :config, :parallel_mode

#robotsHash<String, Robot> (readonly)

Returns robots in this network, keyed by name.

Returns:

  • (Hash<String, Robot>)

    robots in this network, keyed by name



74
# File 'lib/robot_lab/network.rb', line 74

attr_reader :name, :pipeline, :robots, :memory, :config, :parallel_mode

Instance Method Details

#add_robot(robot) ⇒ self

Add a robot to the network without adding it as a task

Useful for dynamically adding robots that will be referenced later.

Parameters:

  • robot (Robot)

    the robot instance to add

Returns:

  • (self)

Raises:

  • (ArgumentError)

    if a robot with the same name already exists



297
298
299
300
301
302
303
304
# File 'lib/robot_lab/network.rb', line 297

def add_robot(robot)
  if @robots.key?(robot.name)
    raise ArgumentError, "Robot '#{robot.name}' already exists in network '#{@name}'"
  end

  @robots[robot.name] = robot
  self
end

#available_robotsArray<Robot>

Get all robots in the network

Returns:



285
286
287
# File 'lib/robot_lab/network.rb', line 285

def available_robots
  @robots.values
end

#broadcast(payload) ⇒ self

Broadcast a message to all robots in the network.

This sends a network-wide message that all robots subscribed via ‘on_broadcast` will receive asynchronously.

Examples:

Pause all robots

network.broadcast(event: :pause, reason: "rate limit hit")

Signal completion

network.broadcast(event: :phase_complete, phase: "analysis")

Parameters:

  • payload (Hash)

    the message payload

Returns:

  • (self)


212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/robot_lab/network.rb', line 212

def broadcast(payload)
  message = {
    payload: payload,
    network: @name,
    timestamp: Time.now
  }

  # Notify handlers asynchronously
  @broadcast_handlers.each do |handler|
    dispatch_async { handler.call(message) }
  end

  # Also set in memory so robots can subscribe via memory.subscribe
  @memory.set(BROADCAST_KEY, message)

  self
end

#execution_planString?

Get the execution plan

Returns:

  • (String, nil)


334
335
336
# File 'lib/robot_lab/network.rb', line 334

def execution_plan
  @pipeline.execution_plan
end

#on_broadcast {|Hash| ... } ⇒ self

Register a handler for broadcast messages.

The handler is called asynchronously whenever ‘broadcast` is called.

Examples:

network.on_broadcast do |message|
  case message[:payload][:event]
  when :pause
    pause_current_work
  when :resume
    resume_work
  end
end

Yields:

  • (Hash)

    the broadcast message with :payload, :network, :timestamp

Returns:

  • (self)

Raises:

  • (ArgumentError)


247
248
249
250
251
252
# File 'lib/robot_lab/network.rb', line 247

def on_broadcast(&block)
  raise ArgumentError, "Block required for on_broadcast" unless block_given?

  @broadcast_handlers << block
  self
end

#parallel(name = nil, depends_on: :none) { ... } ⇒ self

Define a parallel execution block

Examples:

Named parallel group

parallel :fetch_data, depends_on: :validate do
  task :fetch_orders, orders_robot
  task :fetch_products, products_robot
end
task :process, processor, depends_on: :fetch_data

Parameters:

  • name (Symbol, nil) (defaults to: nil)

    optional name for the parallel group

  • depends_on (Symbol, Array) (defaults to: :none)

    dependencies for this group

Yields:

  • Block containing task definitions

Returns:

  • (self)


162
163
164
165
# File 'lib/robot_lab/network.rb', line 162

def parallel(name = nil, depends_on: :none, &)
  @pipeline.parallel(name, depends_on: depends_on, &)
  self
end

#reset_memoryself

Reset the shared memory.

Clears all values in the network’s shared memory. This is useful between runs if you want to start with a fresh memory state.

Returns:

  • (self)


261
262
263
264
# File 'lib/robot_lab/network.rb', line 261

def reset_memory
  @memory.reset
  self
end

#robot(name) ⇒ Robot? Also known as: []

Get a robot by name

Parameters:

  • name (String, Symbol)

Returns:



271
272
273
# File 'lib/robot_lab/network.rb', line 271

def robot(name)
  @robots[name.to_s]
end

#run(**run_context) ⇒ SimpleFlow::Result

Run the network with the given context

All robots share the network’s memory during execution. The memory is passed to each robot and can be used for inter-robot communication.

Examples:

result = network.run(message: "I need help with billing", user_id: 123)
result.value  # => RobotResult from last robot
result.context[:classifier]  # => RobotResult from classifier

Parameters:

  • run_context (Hash)

    context passed to all robots (message:, user_id:, etc.)

Returns:

  • (SimpleFlow::Result)

    final pipeline result



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/robot_lab/network.rb', line 180

def run(**run_context)
  # Include shared memory in run params so robots can access it
  run_context[:network_memory] = @memory

  # Pass network's config so robots can inherit it
  run_context[:network_config] = @config unless @config.empty?

  if @parallel_mode == :ractor
    run_with_ractor_scheduler(run_context)
  else
    initial_result = SimpleFlow::Result.new(
      run_context,
      context: { run_params: run_context }
    )
    @pipeline.call_parallel(initial_result, max_concurrent: @config.max_concurrent_robots)
  end
end

#task(name, robot, context: {}, mcp: :none, tools: :none, memory: nil, config: nil, depends_on: :none, poller_group: :default) ⇒ self

Add a robot as a pipeline task with optional per-task configuration

Examples:

Entry point task

task :classifier, classifier_robot, depends_on: :none

Task with context and tools

task :billing, billing_robot,
     context: { department: "billing", escalation: 2 },
     tools: [RefundTool, InvoiceTool],
     depends_on: :optional

Task with dependencies

task :writer, writer_robot, depends_on: [:analyst]

Parameters:

  • name (Symbol)

    task name

  • robot (Robot)

    the robot instance

  • context (Hash) (defaults to: {})

    task-specific context (deep-merged with run params)

  • mcp (Symbol, Array) (defaults to: :none)

    MCP server config (:none, :inherit, or array)

  • tools (Symbol, Array) (defaults to: :none)

    tools config (:none, :inherit, or array)

  • memory (Memory, Hash, nil) (defaults to: nil)

    task-specific memory

  • depends_on (Symbol, Array<Symbol>) (defaults to: :none)

    dependencies (:none, :optional, or task names)

Returns:

  • (self)


126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/robot_lab/network.rb', line 126

def task(name, robot, context: {}, mcp: :none, tools: :none, memory: nil, config: nil, depends_on: :none,
         poller_group: :default)
  task_wrapper = Task.new(
    name: name,
    robot: robot,
    context: context,
    mcp: mcp,
    tools: tools,
    memory: memory,
    config: config
  )

  # Register the group and assign the shared poller to the robot
  @bus_poller.add_group(poller_group)
  robot.assign_bus_poller(@bus_poller, group: poller_group) if robot.respond_to?(:assign_bus_poller, true)

  @robots[name.to_s] = robot
  @tasks[name.to_s] = task_wrapper
  @pipeline.step(name, task_wrapper, depends_on: depends_on)
  self
end

#to_dotString?

Export pipeline to DOT format (Graphviz)

Returns:

  • (String, nil)


326
327
328
# File 'lib/robot_lab/network.rb', line 326

def to_dot
  @pipeline.visualize_dot
end

#to_hHash

Converts the network to a hash representation

Returns:

  • (Hash)


342
343
344
345
346
347
348
349
350
# File 'lib/robot_lab/network.rb', line 342

def to_h
  {
    name: name,
    robots: @robots.keys,
    tasks: @tasks.keys,
    optional_tasks: @pipeline.optional_steps.to_a,
    config: (@config.empty? ? nil : @config.to_json_hash)
  }.compact
end

#to_mermaidString?

Export pipeline to Mermaid format

Returns:

  • (String, nil)


318
319
320
# File 'lib/robot_lab/network.rb', line 318

def to_mermaid
  @pipeline.visualize_mermaid
end

#visualizeString?

Visualize the pipeline as ASCII

Returns:

  • (String, nil)


310
311
312
# File 'lib/robot_lab/network.rb', line 310

def visualize
  @pipeline.visualize_ascii
end