Class: RobotLab::Streaming::Context

Inherits:
Object
  • Object
show all
Defined in:
lib/robot_lab/streaming/context.rb

Overview

Context for managing streaming events during execution

StreamingContext provides methods for publishing events with automatic sequencing, timestamping, and ID generation.

Examples:

context = Context.new(
  run_id: "run_123",
  message_id: "msg_456",
  scope: "network",
  publish: ->(event) { broadcast(event) }
)

context.publish_event(event: "text.delta", data: { delta: "Hello" })

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(run_id:, message_id:, scope:, publish:, parent_run_id: nil, sequence_counter: nil) ⇒ Context

Creates a new streaming Context.

Parameters:

  • run_id (String)

    unique run identifier

  • message_id (String)

    current message identifier

  • scope (String, Symbol)

    context scope

  • publish (Proc)

    callback for publishing events

  • parent_run_id (String, nil) (defaults to: nil)

    parent run identifier

  • sequence_counter (SequenceCounter, nil) (defaults to: nil)

    shared sequence counter



39
40
41
42
43
44
45
46
# File 'lib/robot_lab/streaming/context.rb', line 39

def initialize(run_id:, message_id:, scope:, publish:, parent_run_id: nil, sequence_counter: nil)
  @run_id = run_id
  @parent_run_id = parent_run_id
  @message_id = message_id
  @scope = scope.to_s
  @publish = publish
  @sequence = sequence_counter || SequenceCounter.new
end

Instance Attribute Details

#message_idString (readonly)

Returns the current message identifier.

Returns:

  • (String)

    the current message identifier



29
# File 'lib/robot_lab/streaming/context.rb', line 29

attr_reader :run_id, :parent_run_id, :message_id, :scope

#parent_run_idString? (readonly)

Returns the parent run identifier for nested contexts.

Returns:

  • (String, nil)

    the parent run identifier for nested contexts



29
# File 'lib/robot_lab/streaming/context.rb', line 29

attr_reader :run_id, :parent_run_id, :message_id, :scope

#run_idString (readonly)

Returns the unique run identifier.

Returns:

  • (String)

    the unique run identifier



29
30
31
# File 'lib/robot_lab/streaming/context.rb', line 29

def run_id
  @run_id
end

#scopeObject (readonly)

Returns the value of attribute scope.



29
# File 'lib/robot_lab/streaming/context.rb', line 29

attr_reader :run_id, :parent_run_id, :message_id, :scope

Instance Method Details

#create_child_context(robot_run_id) ⇒ Context

Create a child context for nested robot runs

Parameters:

  • robot_run_id (String)

Returns:



70
71
72
73
74
75
76
77
78
79
# File 'lib/robot_lab/streaming/context.rb', line 70

def create_child_context(robot_run_id)
  Context.new(
    run_id: robot_run_id,
    parent_run_id: @run_id,
    message_id: generate_message_id,
    scope: "robot",
    publish: @publish,
    sequence_counter: @sequence  # Share sequence counter
  )
end

#create_context_with_shared_sequence(run_id:, message_id:, scope:) ⇒ Context

Create context with shared sequence counter

Parameters:

  • run_id (String)
  • message_id (String)
  • scope (String)

Returns:



88
89
90
91
92
93
94
95
96
# File 'lib/robot_lab/streaming/context.rb', line 88

def create_context_with_shared_sequence(run_id:, message_id:, scope:)
  Context.new(
    run_id: run_id,
    message_id: message_id,
    scope: scope,
    publish: @publish,
    sequence_counter: @sequence
  )
end

#generate_message_idString

Generate a new message ID

Returns:

  • (String)


122
123
124
# File 'lib/robot_lab/streaming/context.rb', line 122

def generate_message_id
  SecureRandom.uuid
end

#generate_part_idString

Generate a part ID (OpenAI-compatible, max 40 chars)

Returns:

  • (String)


102
103
104
105
106
107
# File 'lib/robot_lab/streaming/context.rb', line 102

def generate_part_id
  short_msg_id = @message_id[0, 8]
  timestamp = (Time.now.to_f * 1000).to_i.to_s[-6..]
  random = SecureRandom.hex(4)
  "part_#{short_msg_id}_#{timestamp}_#{random}"
end

#generate_step_id(base_name) ⇒ String

Generate a step ID for Inngest compatibility

Parameters:

  • base_name (String)

Returns:

  • (String)


114
115
116
# File 'lib/robot_lab/streaming/context.rb', line 114

def generate_step_id(base_name)
  "publish-#{@sequence.current}:#{base_name}"
end

#publish_event(event:, data: {}) ⇒ Object

Publish an event

Parameters:

  • event (String)

    Event type

  • data (Hash) (defaults to: {})

    Event data



53
54
55
56
57
58
59
60
61
62
63
# File 'lib/robot_lab/streaming/context.rb', line 53

def publish_event(event:, data: {})
  chunk = build_chunk(event, data)

  begin
    @publish.call(chunk)
  rescue StandardError => e
    RobotLab.config.logger&.warn("Streaming error: #{e.message}")
  end

  chunk
end