Class: OpenRouter::StreamingClient

Inherits:
Client
  • Object
show all
Defined in:
lib/open_router/streaming_client.rb

Overview

Enhanced streaming client with better event handling and response reconstruction

Instance Attribute Summary

Attributes inherited from Client

#callbacks, #configuration, #usage_tracker

Instance Method Summary collapse

Methods inherited from Client

#clear_callbacks, #complete, #models, #on, #query_generation_stats, #responses, #select_model, #smart_complete, #smart_complete_with_fallback, #trigger_callbacks

Methods included from HTTP

#delete, #get, #multipart_post, #post

Constructor Details

#initialize(*args, **kwargs, &block) ⇒ StreamingClient

Initialize streaming client with additional streaming-specific options



7
8
9
10
11
12
13
14
15
16
# File 'lib/open_router/streaming_client.rb', line 7

def initialize(*args, **kwargs, &block)
  super(*args, **kwargs, &block)
  @streaming_callbacks = {
    on_chunk: [],
    on_start: [],
    on_finish: [],
    on_tool_call_chunk: [],
    on_error: []
  }
end

Instance Method Details

#on_stream(event, &block) ⇒ self

Register streaming-specific callbacks

Parameters:

  • event (Symbol)

    The streaming event to register for

  • block (Proc)

    The callback to execute

Returns:

  • (self)

    Returns self for method chaining



23
24
25
26
27
28
29
30
31
# File 'lib/open_router/streaming_client.rb', line 23

def on_stream(event, &block)
  unless @streaming_callbacks.key?(event)
    valid_events = @streaming_callbacks.keys.join(", ")
    raise ArgumentError, "Invalid streaming event: #{event}. Valid events are: #{valid_events}"
  end

  @streaming_callbacks[event] << block
  self
end

#stream(messages, options = nil, **kwargs, &block) ⇒ Object

Stream with a simple block interface

Examples:

Simple usage (unchanged)

client.stream(messages, model: "openai/gpt-4o-mini") do |chunk|
  print chunk
end

With CompletionOptions

opts = CompletionOptions.new(model: "gpt-4", temperature: 0.7)
client.stream(messages, opts) { |chunk| print chunk }

Parameters:

  • messages (Array<Hash>)

    Array of message hashes

  • options (CompletionOptions, Hash, nil) (defaults to: nil)

    Options object or hash with configuration

  • kwargs (Hash)

    Additional options (merged with options parameter)

  • block (Proc)

    Block to call for each content chunk

Raises:

  • (ArgumentError)


95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/open_router/streaming_client.rb', line 95

def stream(messages, options = nil, **kwargs, &block)
  raise ArgumentError, "Block required for streaming" unless block_given?

  stream_complete(
    messages,
    options,
    accumulate_response: false,
    **kwargs
  ) do |chunk|
    content = extract_content_from_chunk(chunk)
    block.call(content) if content
  end
end

#stream_complete(messages, options = nil, accumulate_response: true, **kwargs, &block) ⇒ Response?

Enhanced streaming completion with better event handling and response reconstruction

Examples:

Simple usage (unchanged)

client.stream_complete(messages, model: "gpt-4")

With CompletionOptions

opts = CompletionOptions.new(model: "gpt-4", temperature: 0.7)
client.stream_complete(messages, opts)

Options with overrides

client.stream_complete(messages, base_opts, temperature: 0.9)

Parameters:

  • messages (Array<Hash>)

    Array of message hashes

  • options (CompletionOptions, Hash, nil) (defaults to: nil)

    Options object or hash with configuration

  • accumulate_response (Boolean) (defaults to: true)

    Whether to accumulate and return complete response

  • kwargs (Hash)

    Additional options (merged with options parameter)

  • block (Proc)

    Optional block to call for each chunk (in addition to registered callbacks)

Returns:

  • (Response, nil)

    Complete response if accumulate_response is true, nil otherwise



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/open_router/streaming_client.rb', line 51

def stream_complete(messages, options = nil, accumulate_response: true, **kwargs, &block)
  opts = normalize_options(options, kwargs)
  response_accumulator = ResponseAccumulator.new if accumulate_response

  # Set up streaming handler (pass optional per-call block)
  stream_handler = build_stream_handler(response_accumulator, &block)

  # Trigger start callback
  trigger_streaming_callbacks(:on_start, { model: opts.model, messages: messages })

  begin
    # Execute the streaming request using parent's complete method
    complete(messages, opts, stream: stream_handler)

    # Return accumulated response if requested
    if accumulate_response && response_accumulator
      final_response = response_accumulator.build_response
      trigger_streaming_callbacks(:on_finish, final_response)
      final_response
    else
      trigger_streaming_callbacks(:on_finish, nil)
      nil
    end
  rescue StandardError => e
    trigger_streaming_callbacks(:on_error, e)
    raise
  end
end