Class: OpenRouter::StreamingClient
- Defined in:
- lib/open_router/streaming_client.rb
Overview
Enhanced streaming client with better event handling and response reconstruction
Instance Attribute Summary
Attributes inherited from Client
#callbacks, #configuration, #usage_tracker
Instance Method Summary collapse
-
#initialize(*args, **kwargs, &block) ⇒ StreamingClient
constructor
Initialize streaming client with additional streaming-specific options.
-
#on_stream(event, &block) ⇒ self
Register streaming-specific callbacks.
-
#stream(messages, options = nil, **kwargs, &block) ⇒ Object
Stream with a simple block interface.
-
#stream_complete(messages, options = nil, accumulate_response: true, **kwargs, &block) ⇒ Response?
Enhanced streaming completion with better event handling and response reconstruction.
Methods inherited from Client
#clear_callbacks, #complete, #models, #on, #query_generation_stats, #responses, #select_model, #smart_complete, #smart_complete_with_fallback, #trigger_callbacks
Methods included from HTTP
#delete, #get, #multipart_post, #post
Constructor Details
#initialize(*args, **kwargs, &block) ⇒ StreamingClient
Initialize streaming client with additional streaming-specific options
7 8 9 10 11 12 13 14 15 16 |
# File 'lib/open_router/streaming_client.rb', line 7 def initialize(*args, **kwargs, &block) super(*args, **kwargs, &block) @streaming_callbacks = { on_chunk: [], on_start: [], on_finish: [], on_tool_call_chunk: [], on_error: [] } end |
Instance Method Details
#on_stream(event, &block) ⇒ self
Register streaming-specific callbacks
23 24 25 26 27 28 29 30 31 |
# File 'lib/open_router/streaming_client.rb', line 23 def on_stream(event, &block) unless @streaming_callbacks.key?(event) valid_events = @streaming_callbacks.keys.join(", ") raise ArgumentError, "Invalid streaming event: #{event}. Valid events are: #{valid_events}" end @streaming_callbacks[event] << block self end |
#stream(messages, options = nil, **kwargs, &block) ⇒ Object
Stream with a simple block interface
95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/open_router/streaming_client.rb', line 95 def stream(, = nil, **kwargs, &block) raise ArgumentError, "Block required for streaming" unless block_given? stream_complete( , , accumulate_response: false, **kwargs ) do |chunk| content = extract_content_from_chunk(chunk) block.call(content) if content end end |
#stream_complete(messages, options = nil, accumulate_response: true, **kwargs, &block) ⇒ Response?
Enhanced streaming completion with better event handling and response reconstruction
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/open_router/streaming_client.rb', line 51 def stream_complete(, = nil, accumulate_response: true, **kwargs, &block) opts = (, kwargs) response_accumulator = ResponseAccumulator.new if accumulate_response # Set up streaming handler (pass optional per-call block) stream_handler = build_stream_handler(response_accumulator, &block) # Trigger start callback trigger_streaming_callbacks(:on_start, { model: opts.model, messages: }) begin # Execute the streaming request using parent's complete method complete(, opts, stream: stream_handler) # Return accumulated response if requested if accumulate_response && response_accumulator final_response = response_accumulator.build_response trigger_streaming_callbacks(:on_finish, final_response) final_response else trigger_streaming_callbacks(:on_finish, nil) nil end rescue StandardError => e trigger_streaming_callbacks(:on_error, e) raise end end |