Class: RobotLab::BusPoller Private

Inherits:
Object
  • Object
show all
Defined in:
lib/robot_lab/bus_poller.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Centralizes bus delivery serialization for robots.

Each robot’s TypedBus subscription calls enqueue() instead of handling deliveries inline. BusPoller ensures per-robot sequential processing: if a robot is already processing a delivery, new ones are queued and drained after the current one completes.

Unlike the old per-robot @bus_processing flag, BusPoller is mutex- protected and shared across robots, giving a single point of control for delivery ordering. Delivery still happens in the caller’s execution context (Async fiber or OS thread), so synchronous test semantics are preserved.

Poller Groups

Robots can be assigned to named groups via the Network DSL:

task :fast_bot, fast_robot, depends_on: :none
task :slow_bot, slow_robot, depends_on: :none, poller_group: :slow

Groups are purely organizational — they share the same mutex-based drain mechanism. In Async contexts, slow robots naturally yield during LLM HTTP calls without blocking fast robots.

Constant Summary collapse

QUEUE_CAPACITY =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Capacity of per-robot RactorQueue delivery queues.

512

Instance Method Summary collapse

Constructor Details

#initializeBusPoller

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Creates a new BusPoller.



36
37
38
39
40
41
# File 'lib/robot_lab/bus_poller.rb', line 36

def initialize
  @mutex        = Mutex.new
  @robot_busy   = {}   # robot_name => Boolean
  @robot_queues = {}   # robot_name => RactorQueue<delivery>
  @groups       = [:default]
end

Instance Method Details

#add_group(name) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Add a named poller group.

Idempotent. Groups are informational labels — they do not create separate queues or threads.

Parameters:

  • name (Symbol)


95
96
97
# File 'lib/robot_lab/bus_poller.rb', line 95

def add_group(name)
  @mutex.synchronize { @groups << name unless @groups.include?(name) }
end

#enqueue(robot:, delivery:, group: :default) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Enqueue a delivery for a robot.

If the robot is not currently processing, the delivery is handled immediately in the caller’s context (Async fiber or OS thread). If the robot is busy, the delivery is queued and will be drained after the current delivery completes.

Parameters:

  • robot (Robot)

    the robot that will process the delivery

  • delivery (Object)

    the TypedBus delivery object

  • group (Symbol) (defaults to: :default)

    poller group label (informational only)



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/robot_lab/bus_poller.rb', line 70

def enqueue(robot:, delivery:, group: :default)
  should_process = @mutex.synchronize do
    name = robot.name
    @robot_queues[name] ||= RactorQueue.new(capacity: QUEUE_CAPACITY)
    @robot_busy[name]   ||= false

    if @robot_busy[name]
      @robot_queues[name].push(delivery)
      false
    else
      @robot_busy[name] = true
      true
    end
  end

  process_and_drain(robot, delivery) if should_process
end

#groupsArray<Symbol>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Names of all registered poller groups.

Returns:

  • (Array<Symbol>)


102
103
104
# File 'lib/robot_lab/bus_poller.rb', line 102

def groups
  @mutex.synchronize { @groups.dup }
end

#running?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Always true — BusPoller needs no background threads.

Returns:

  • (Boolean)


109
110
111
# File 'lib/robot_lab/bus_poller.rb', line 109

def running?
  true
end

#startself

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

No-op — BusPoller has no background threads to start. Kept for API symmetry with the old design.

Returns:

  • (self)


47
48
49
# File 'lib/robot_lab/bus_poller.rb', line 47

def start
  self
end

#stopself

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

No-op — no threads to stop.

Returns:

  • (self)


54
55
56
# File 'lib/robot_lab/bus_poller.rb', line 54

def stop(*)
  self
end