Class: RobotLab::Network

Inherits:
Object
  • Object
show all
Includes:
Utils
Defined in:
lib/robot_lab/network.rb

Overview

Orchestrates multiple robots in a pipeline workflow

Network is a thin wrapper around SimpleFlow::Pipeline that provides a clean DSL for defining robot workflows with sequential, parallel, and conditional execution.

Shared Memory

Networks provide a shared reactive memory that all robots can read and write. Robots can subscribe to memory keys and be notified when values change, or use blocking reads to wait for values from other robots.

Broadcast Messages

Networks support a broadcast channel for network-wide announcements. Use ‘broadcast` to send messages to all robots, and `on_broadcast` to register handlers for incoming broadcasts.

Examples:

Sequential execution

network = RobotLab.create_network(name: "pipeline") do
  task :analyst, analyst_robot, depends_on: :none
  task :writer, writer_robot, depends_on: [:analyst]
end

With per-task context

network = RobotLab.create_network(name: "support") do
  task :classifier, classifier_robot, depends_on: :none
  task :billing, billing_robot,
       context: { department: "billing" },
       tools: [RefundTool],
       depends_on: :optional
end

Parallel execution with shared memory

network = RobotLab.create_network(name: "analysis") do
  task :fetch, fetcher_robot, depends_on: :none
  task :sentiment, sentiment_robot, depends_on: [:fetch]
  task :entities, entity_robot, depends_on: [:fetch]
  task :summarize, summary_robot, depends_on: [:sentiment, :entities]
end

# In sentiment_robot:
memory.set(:sentiment, analyze_sentiment(text))

# In summarize_robot:
results = memory.get(:sentiment, :entities, wait: 60)

Broadcasting

network.on_broadcast do |message|
  puts "Received: #{message[:event]}"
end

network.broadcast(event: :pause, reason: "rate limit")

Constant Summary collapse

BROADCAST_KEY =

Reserved key for broadcast messages in memory

:_network_broadcast

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, concurrency: :auto, memory: nil, config: nil, parallel_mode: :async) { ... } ⇒ Network

Creates a new Network instance.

Examples:

network = Network.new(name: "support") do
  task :classifier, classifier, depends_on: :none
  task :billing, billing_robot, context: { dept: "billing" }, depends_on: :optional
end

Parameters:

  • name (String)

    unique identifier for the network

  • concurrency (Symbol) (defaults to: :auto)

    concurrency model (:auto, :threads, :async)

  • memory (Memory, nil) (defaults to: nil)

    optional pre-configured memory instance

Yields:

  • Block for defining pipeline tasks



89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/robot_lab/network.rb', line 89

def initialize(name:, concurrency: :auto, memory: nil, config: nil, parallel_mode: :async, &block)
  @name = name.to_s
  @robots = {}
  @tasks = {}
  @pipeline = SimpleFlow::Pipeline.new(concurrency: concurrency)
  @memory = memory || Memory.new(network_name: @name)
  @config = config || RunConfig.new
  @parallel_mode = parallel_mode
  @broadcast_handlers = []
  @bus_poller = BusPoller.new.start

  instance_eval(&block) if block_given?
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



74
75
76
# File 'lib/robot_lab/network.rb', line 74

def config
  @config
end

#memoryMemory (readonly)

Returns shared memory for all robots in the network.

Returns:

  • (Memory)

    shared memory for all robots in the network



74
# File 'lib/robot_lab/network.rb', line 74

attr_reader :name, :pipeline, :robots, :memory, :config, :parallel_mode

#nameString (readonly)

Returns unique identifier for the network.

Returns:

  • (String)

    unique identifier for the network



74
75
76
# File 'lib/robot_lab/network.rb', line 74

def name
  @name
end

#parallel_modeObject (readonly)

Returns the value of attribute parallel_mode.



74
75
76
# File 'lib/robot_lab/network.rb', line 74

def parallel_mode
  @parallel_mode
end

#pipelineSimpleFlow::Pipeline (readonly)

Returns the underlying pipeline.

Returns:

  • (SimpleFlow::Pipeline)

    the underlying pipeline



74
# File 'lib/robot_lab/network.rb', line 74

attr_reader :name, :pipeline, :robots, :memory, :config, :parallel_mode

#robotsHash<String, Robot> (readonly)

Returns robots in this network, keyed by name.

Returns:

  • (Hash<String, Robot>)

    robots in this network, keyed by name



74
# File 'lib/robot_lab/network.rb', line 74

attr_reader :name, :pipeline, :robots, :memory, :config, :parallel_mode

Instance Method Details

#add_robot(robot) ⇒ self

Add a robot to the network without adding it as a task

Useful for dynamically adding robots that will be referenced later.

Parameters:

  • robot (Robot)

    the robot instance to add

Returns:

  • (self)

Raises:

  • (ArgumentError)

    if a robot with the same name already exists



296
297
298
299
300
301
302
303
# File 'lib/robot_lab/network.rb', line 296

def add_robot(robot)
  if @robots.key?(robot.name)
    raise ArgumentError, "Robot '#{robot.name}' already exists in network '#{@name}'"
  end

  @robots[robot.name] = robot
  self
end

#available_robotsArray<Robot>

Get all robots in the network

Returns:



284
285
286
# File 'lib/robot_lab/network.rb', line 284

def available_robots
  @robots.values
end

#broadcast(payload) ⇒ self

Broadcast a message to all robots in the network.

This sends a network-wide message that all robots subscribed via ‘on_broadcast` will receive asynchronously.

Examples:

Pause all robots

network.broadcast(event: :pause, reason: "rate limit hit")

Signal completion

network.broadcast(event: :phase_complete, phase: "analysis")

Parameters:

  • payload (Hash)

    the message payload

Returns:

  • (self)


211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/robot_lab/network.rb', line 211

def broadcast(payload)
  message = {
    payload: payload,
    network: @name,
    timestamp: Time.now
  }

  # Notify handlers asynchronously
  @broadcast_handlers.each do |handler|
    dispatch_async { handler.call(message) }
  end

  # Also set in memory so robots can subscribe via memory.subscribe
  @memory.set(BROADCAST_KEY, message)

  self
end

#execution_planString?

Get the execution plan

Returns:

  • (String, nil)


333
334
335
# File 'lib/robot_lab/network.rb', line 333

def execution_plan
  @pipeline.execution_plan
end

#on_broadcast {|Hash| ... } ⇒ self

Register a handler for broadcast messages.

The handler is called asynchronously whenever ‘broadcast` is called.

Examples:

network.on_broadcast do |message|
  case message[:payload][:event]
  when :pause
    pause_current_work
  when :resume
    resume_work
  end
end

Yields:

  • (Hash)

    the broadcast message with :payload, :network, :timestamp

Returns:

  • (self)

Raises:

  • (ArgumentError)


246
247
248
249
250
251
# File 'lib/robot_lab/network.rb', line 246

def on_broadcast(&block)
  raise ArgumentError, "Block required for on_broadcast" unless block_given?

  @broadcast_handlers << block
  self
end

#parallel(name = nil, depends_on: :none) { ... } ⇒ self

Define a parallel execution block

Examples:

Named parallel group

parallel :fetch_data, depends_on: :validate do
  task :fetch_orders, orders_robot
  task :fetch_products, products_robot
end
task :process, processor, depends_on: :fetch_data

Parameters:

  • name (Symbol, nil) (defaults to: nil)

    optional name for the parallel group

  • depends_on (Symbol, Array) (defaults to: :none)

    dependencies for this group

Yields:

  • Block containing task definitions

Returns:

  • (self)


161
162
163
164
# File 'lib/robot_lab/network.rb', line 161

def parallel(name = nil, depends_on: :none, &block)
  @pipeline.parallel(name, depends_on: depends_on, &block)
  self
end

#reset_memoryself

Reset the shared memory.

Clears all values in the network’s shared memory. This is useful between runs if you want to start with a fresh memory state.

Returns:

  • (self)


260
261
262
263
# File 'lib/robot_lab/network.rb', line 260

def reset_memory
  @memory.reset
  self
end

#robot(name) ⇒ Robot? Also known as: []

Get a robot by name

Parameters:

  • name (String, Symbol)

Returns:



270
271
272
# File 'lib/robot_lab/network.rb', line 270

def robot(name)
  @robots[name.to_s]
end

#run(**run_context) ⇒ SimpleFlow::Result

Run the network with the given context

All robots share the network’s memory during execution. The memory is passed to each robot and can be used for inter-robot communication.

Examples:

result = network.run(message: "I need help with billing", user_id: 123)
result.value  # => RobotResult from last robot
result.context[:classifier]  # => RobotResult from classifier

Parameters:

  • run_context (Hash)

    context passed to all robots (message:, user_id:, etc.)

Returns:

  • (SimpleFlow::Result)

    final pipeline result



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/robot_lab/network.rb', line 179

def run(**run_context)
  # Include shared memory in run params so robots can access it
  run_context[:network_memory] = @memory

  # Pass network's config so robots can inherit it
  run_context[:network_config] = @config unless @config.empty?

  if @parallel_mode == :ractor
    run_with_ractor_scheduler(run_context)
  else
    initial_result = SimpleFlow::Result.new(
      run_context,
      context: { run_params: run_context }
    )
    @pipeline.call_parallel(initial_result)
  end
end

#task(name, robot, context: {}, mcp: :none, tools: :none, memory: nil, config: nil, depends_on: :none, poller_group: :default) ⇒ self

Add a robot as a pipeline task with optional per-task configuration

Examples:

Entry point task

task :classifier, classifier_robot, depends_on: :none

Task with context and tools

task :billing, billing_robot,
     context: { department: "billing", escalation: 2 },
     tools: [RefundTool, InvoiceTool],
     depends_on: :optional

Task with dependencies

task :writer, writer_robot, depends_on: [:analyst]

Parameters:

  • name (Symbol)

    task name

  • robot (Robot)

    the robot instance

  • context (Hash) (defaults to: {})

    task-specific context (deep-merged with run params)

  • mcp (Symbol, Array) (defaults to: :none)

    MCP server config (:none, :inherit, or array)

  • tools (Symbol, Array) (defaults to: :none)

    tools config (:none, :inherit, or array)

  • memory (Memory, Hash, nil) (defaults to: nil)

    task-specific memory

  • depends_on (Symbol, Array<Symbol>) (defaults to: :none)

    dependencies (:none, :optional, or task names)

Returns:

  • (self)


126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/robot_lab/network.rb', line 126

def task(name, robot, context: {}, mcp: :none, tools: :none, memory: nil, config: nil, depends_on: :none, poller_group: :default)
  task_wrapper = Task.new(
    name: name,
    robot: robot,
    context: context,
    mcp: mcp,
    tools: tools,
    memory: memory,
    config: config
  )

  # Register the group and assign the shared poller to the robot
  @bus_poller.add_group(poller_group)
  robot.assign_bus_poller(@bus_poller, group: poller_group) if robot.respond_to?(:assign_bus_poller, true)

  @robots[name.to_s] = robot
  @tasks[name.to_s] = task_wrapper
  @pipeline.step(name, task_wrapper, depends_on: depends_on)
  self
end

#to_dotString?

Export pipeline to DOT format (Graphviz)

Returns:

  • (String, nil)


325
326
327
# File 'lib/robot_lab/network.rb', line 325

def to_dot
  @pipeline.visualize_dot
end

#to_hHash

Converts the network to a hash representation

Returns:

  • (Hash)


341
342
343
344
345
346
347
348
349
# File 'lib/robot_lab/network.rb', line 341

def to_h
  {
    name: name,
    robots: @robots.keys,
    tasks: @tasks.keys,
    optional_tasks: @pipeline.optional_steps.to_a,
    config: (@config.empty? ? nil : @config.to_json_hash)
  }.compact
end

#to_mermaidString?

Export pipeline to Mermaid format

Returns:

  • (String, nil)


317
318
319
# File 'lib/robot_lab/network.rb', line 317

def to_mermaid
  @pipeline.visualize_mermaid
end

#visualizeString?

Visualize the pipeline as ASCII

Returns:

  • (String, nil)


309
310
311
# File 'lib/robot_lab/network.rb', line 309

def visualize
  @pipeline.visualize_ascii
end