llm_gateway
Provide a unified translation interface for LLM Provider API's, While allowing developers to have as much control as possible, This does make it more complicated because we dont want developers to be blocked at using something that the provider supports. As time progress the library will mature and support more responses
Table of Contents
- Principles:
- Installation
- Supported Providers
- Quick Start: Streaming (all events)
- Migration guides
- Tools
- Image Input
- Thinking / Reasoning
- Cross-Provider Handoffs
- Context Serialization
- OAuth
Principles:
- Transcription integrity is most important
- Input messages must have bidirectional integrity
- Allow developers as much control as possible
Installation
gem install llm_gateway
Or add it to your Gemfile:
gem "llm_gateway"
Supported Providers
| Provider | Provider Key | Auth | API Surface |
|---|---|---|---|
| Anthropic | anthropic_messages |
API key | Messages |
| OpenAI | openai_completions |
API key | Chat Completions |
| OpenAI | openai_responses |
API key | Responses |
| OpenAI Codex | openai_codex |
OAuth | Responses |
| Groq | groq_completions |
API key | Chat Completions |
Legacy keys (*_apikey_*, *_oauth_*) are still supported for backward compatibility.
Quick Start: Streaming (all events)
require "llm_gateway"
require "json"
# Build a provider adapter directly (not via prebuilt config)
adapter = LlmGateway.build_provider(
provider: "openai_responses", # or anthropic_messages, groq_completions, ...
api_key: ENV.fetch("OPENAI_API_KEY"),
model_key: "gpt-5.4"
)
tools = [
{
name: "get_time",
description: "Get the current time",
input_schema: {
type: "object",
properties: {
timezone: { type: "string", description: "Optional timezone, e.g. America/New_York" }
}
}
}
]
transcript = [
{ role: "user", content: "What time is it? Think briefly, then call get_time." }
]
streamed_tool_args = Hash.new { |h, k| h[k] = +"" }
response = adapter.stream(transcript, tools: tools, reasoning: "high") do |event|
case event.type
# AssistantStreamMessageEvent
when :message_start
puts "\n[message_start] #{event.delta.inspect}"
when :message_delta
puts "\n[message_delta] #{event.delta.inspect} usage+=#{event.usage_increment.inspect}"
when :message_end
puts "\n[message_end]"
# Text events
when :text_start
puts "\n[text_start] index=#{event.content_index}"
print event.delta unless event.delta.empty?
when :text_delta
print event.delta
when :text_end
puts "\n[text_end] index=#{event.content_index}"
# Tool-call events
when :tool_start
puts "\n[tool_start] id=#{event.id} name=#{event.name} index=#{event.content_index}"
when :tool_delta
streamed_tool_args[event.content_index] << event.delta
print event.delta
when :tool_end
puts "\n[tool_end] index=#{event.content_index}"
begin
puts "tool args: #{JSON.parse(streamed_tool_args[event.content_index])}"
rescue JSON::ParserError
puts "tool args (partial/raw): #{streamed_tool_args[event.content_index]}"
end
# Reasoning events
when :reasoning_start
puts "\n[reasoning_start] sig=#{event.respond_to?(:signature) ? event.signature : ""}"
print event.delta
when :reasoning_delta
print event.delta
when :reasoning_end
puts "\n[reasoning_end]"
end
end
# Final AssistantMessage (assembled from the stream)
puts "\n\n=== Final assistant message ==="
puts "id: #{response.id}"
puts "model: #{response.model}"
puts "provider/api: #{response.provider}/#{response.api}"
puts "role: #{response.role}"
puts "stop_reason: #{response.stop_reason}"
puts "error_message: #{response..inspect}" if response.
puts "usage: #{response.usage.inspect}"
response.content.each do |block|
case block.type
when "text"
puts "text: #{block.text}"
when "reasoning"
puts "reasoning: #{block.reasoning}"
puts "signature: #{block.signature}" if block.respond_to?(:signature) && block.signature
when "tool_use"
puts "tool_use: #{block.name}(#{block.input.inspect}) id=#{block.id}"
end
end
Stream callback event families:
AssistantStreamMessageEvent::message_start,:message_delta,:message_endAssistantStreamEvent(and subclasses):- Text:
:text_start,:text_delta,:text_end - Tool call:
:tool_start,:tool_delta,:tool_end - Reasoning:
:reasoning_start,:reasoning_delta,:reasoning_end
- Text:
Stream API without handling events (final result only)
If you only care about the final AssistantMessage, call stream without a block:
require "llm_gateway"
adapter = LlmGateway.build_provider(
provider: "openai_apikey_responses",
api_key: ENV.fetch("OPENAI_API_KEY"),
model_key: "gpt-5.4"
)
result = adapter.stream("Write one short sentence about Ruby.")
puts result.role # "assistant"
puts result.stop_reason # "stop" (usually)
puts result.usage.inspect
text = result.content
.select { |block| block.type == "text" }
.map(&:text)
.join
puts text
Migration guides
- Migrating from
chattostream— usestreamwithout a block when you only need the final response.
Tools
Defining Tools
weather_tool = {
name: "get_weather",
description: "Get current weather for a location",
input_schema: {
type: "object",
properties: {
location: { type: "string", description: "City name or coordinates" },
units: {
type: "string",
enum: ["celsius", "fahrenheit"],
default: "celsius"
}
},
required: ["location"]
}
}
Handling Tool Calls
Use stream without a block, inspect returned tool_use blocks, execute tools, append tool_result, then continue:
require "llm_gateway"
require "json"
adapter = LlmGateway.build_provider(
provider: "openai_apikey_responses",
api_key: ENV.fetch("OPENAI_API_KEY"),
model_key: "gpt-5.4"
)
weather_tool = {
name: "get_weather",
description: "Get current weather for a location",
input_schema: {
type: "object",
properties: {
location: { type: "string" },
units: { type: "string", enum: ["celsius", "fahrenheit"], default: "celsius" }
},
required: ["location"]
}
}
def execute_weather_api(args)
# Replace with real API call
{
location: args[:location] || args["location"],
units: args[:units] || args["units"] || "celsius",
temperature: 14,
condition: "Cloudy"
}
end
transcript = [
{ role: "user", content: "What is the weather in London?" }
]
# 1) First model pass (stream API, no event block)
response = adapter.stream(transcript, tools: [weather_tool])
transcript << response.to_h
# 2) Execute tool calls returned by the model
response.content.each do |block|
next unless block.type == "tool_use"
tool_result = execute_weather_api(block.input)
transcript << {
role: "developer",
content: [
{
type: "tool_result",
tool_use_id: block.id,
content: JSON.generate(tool_result)
}
]
}
end
# 3) Continue the conversation after tool execution
if response.content.any? { |b| b.type == "tool_use" }
final_response = adapter.stream(transcript, tools: [weather_tool])
final_text = final_response.content
.select { |b| b.type == "text" }
.map(&:text)
.join
puts final_text
end
Notes:
- Tool calls are returned as
ToolCallblocks withtype: "tool_use",id,name, andinput. - Tool results are sent back in the transcript as
{ type: "tool_result", tool_use_id:, content: }blocks. - For multimodal-capable models,
tool_resultcontent can include image blocks when supported by the provider/model.
Image Input
Send images by including an image content block in a user message.
require "llm_gateway"
require "base64"
adapter = LlmGateway.build_provider(
provider: "openai_apikey_responses",
api_key: ENV.fetch("OPENAI_API_KEY"),
model_key: "gpt-5.4"
)
image_b64 = Base64.strict_encode64(File.binread("./chart.png"))
= [
{
role: "user",
content: [
{ type: "text", text: "What do you see in this image?" },
{ type: "image", data: image_b64, media_type: "image/png" }
]
}
]
result = adapter.stream() # stream API, no event block
text = result.content
.select { |b| b.type == "text" }
.map(&:text)
.join
puts text
Tip: use a model/provider combination that supports vision input.
Thinking / Reasoning
You can request higher-effort reasoning by passing reasoning: to stream.
require "llm_gateway"
adapter = LlmGateway.build_provider(
provider: "openai_apikey_responses",
api_key: ENV.fetch("OPENAI_API_KEY"),
model_key: "gpt-5.4"
)
result = adapter.stream(
"Think step by step and then compute 482 * 17.",
reasoning: "high"
)
puts "stop_reason: #{result.stop_reason}"
puts "usage: #{result.usage.inspect}" # may include reasoning_tokens depending on provider
result.content.each do |block|
case block.type
when "reasoning"
puts "[reasoning] #{block.reasoning}"
puts "[signature] #{block.signature}" if block.respond_to?(:signature) && block.signature
when "text"
puts "[text] #{block.text}"
end
end
Streaming Thinking Content
If you want incremental thinking/reasoning tokens as they arrive, pass a block to stream and handle reasoning events:
reasoning_text = +""
result = adapter.stream("Solve 99 * 99 with brief reasoning.", reasoning: "high") do |event|
case event.type
when :reasoning_start
print "\n[thinking start]\n"
reasoning_text << event.delta
when :reasoning_delta
reasoning_text << event.delta
print event.delta
when :reasoning_end
print "\n[thinking end]\n"
end
end
puts "\nCollected reasoning chars: #{reasoning_text.length}"
puts "Final stop_reason: #{result.stop_reason}"
How reasoning values are mapped
llm_gateway normalizes provider-specific reasoning/thinking output into shared structures:
- Stream events:
:reasoning_start/:reasoning_delta/:reasoning_end
- Final content block:
ReasoningContentwithtype: "reasoning"- fields:
reasoningand optionalsignature
- Usage accounting:
- normalized in
result.usagewhen provided by the upstream API - may include
:reasoning_tokensplus standard token counters
- normalized in
In practice this means you can:
- listen to
:reasoning_*stream event variants, and - always read final reasoning text from
result.contentblocks whereblock.type == "reasoning".
Notes:
- Reasoning output appears as
ReasoningContentblocks withtype: "reasoning". - Some providers/models expose explicit reasoning content; others may only reflect reasoning effort in usage fields.
- In streamed callbacks, reasoning events are emitted as
:reasoning_*variants.
Cross-Provider Handoffs
Internally, llm_gateway handles handoffs by normalizing message history into a provider-agnostic shape, then remapping that shape to the target provider API on each request.
What happens under the hood on stream/chat:
Normalize input
- String input is converted to a user message.
systemis normalized into system message objects.- Prior assistant turns (including
response.to_h) are treated as structured transcript entries.
Map into canonical gateway format
- Provider-specific differences (content block names, tool-call shapes, reasoning/thinking variants) are unified into shared structs.
Sanitize for target provider/model
- Before sending, messages are sanitized for the destination provider/API/model.
- Unsupported or provider-specific fields are adjusted/translated where possible.
Map to outbound provider payload
- The adapter input mapper converts canonical messages/tools/options into the exact wire format expected by the selected provider endpoint.
Map response back to canonical output
- Stream chunks are mapped into normalized stream events.
- Final output is accumulated into a normalized
AssistantMessage(id,model,usage,stop_reason,content, etc.).
Why this matters:
- A transcript produced by one provider can be reused with another provider without manually rewriting message structure.
- Tool calls/reasoning/text are exposed through a consistent API even when upstream event formats differ.
- Your app can keep one conversation state format while switching providers for cost, latency, capability, or reliability reasons.
Context Serialization
llm_gateway contexts are plain Ruby hashes/arrays, so they can be serialized to JSON and restored later.
require "llm_gateway"
require "json"
adapter = LlmGateway.build_provider(
provider: "openai_apikey_responses",
api_key: ENV.fetch("OPENAI_API_KEY"),
model_key: "gpt-5.4"
)
# Build context (transcript)
transcript = [
{ role: "user", content: "Plan a 3-day trip to Tokyo." }
]
# Run one turn and persist assistant output
first = adapter.stream(transcript)
transcript << first.to_h
# Serialize (store in DB/file/cache)
json_context = JSON.generate(transcript)
# ...later / elsewhere...
restored_transcript = JSON.parse(json_context)
# Continue conversation from restored context
restored_transcript << { role: "user", content: "Now make it budget-friendly." }
second = adapter.stream(restored_transcript)
puts second.content.select { |b| b.type == "text" }.map(&:text).join
What to persist:
- full transcript array (including assistant messages from
response.to_h) - any tool result messages you appended
- optional app metadata (user id, conversation id, timestamps) alongside the transcript
Tip: if you serialize to JSON, keys become strings on parse; llm_gateway accepts standard hash input and normalizes internally.
OAuth
Use OAuth-capable providers (for example openai_codex and anthropic_oauth_messages) by supplying an access_token when building the adapter.
Get initial tokens (Codex / OpenAI OAuth)
require "llm_gateway"
flow = LlmGateway::Clients::OpenAI::OAuthFlow.new
# 1) Start flow (generate auth URL + PKCE verifier + state)
start = flow.start
puts "Open in browser: #{start[:authorization_url]}"
# 2) After user auth, paste redirect URL (or raw code)
# Example: http://localhost:1455/auth/callback?code=...&state=...
print "Paste callback URL or code: "
input = STDIN.gets&.strip
# 3) Exchange for initial tokens
tokens = flow.exchange_code(input, start[:code_verifier], expected_state: start[:state])
puts tokens
# => {
# access_token: "...",
# refresh_token: "...",
# expires_at: <Time>,
# account_id: "..."
# }
Get initial tokens (Anthropic OAuth)
require "llm_gateway"
flow = LlmGateway::Clients::ClaudeCode::OAuthFlow.new
# 1) Start flow (auth URL + PKCE verifier + state)
start = flow.start
puts "Open in browser: #{start[:authorization_url]}"
# 2) After user auth, paste callback URL (or code)
# Example callback contains ?code=...&state=...
print "Paste callback URL or code: "
input = STDIN.gets&.strip
# 3) Exchange for initial tokens
tokens = flow.exchange_code(input, start[:code_verifier], state: start[:state])
puts tokens
# => {
# access_token: "...",
# refresh_token: "...",
# expires_at: <Time>
# }
Get a refresh token
Exchange refresh token for access token
Use the built-in token managers in this repo. on_token_refresh block will be called when the refresh token is updated and should be persisted.
OpenAI Codex OAuth:
require "llm_gateway"
manager = LlmGateway::Clients::OpenAI::TokenManager.new(
refresh_token: stored_refresh_token,
access_token: stored_access_token, # optional
expires_at: stored_expires_at # optional
)
manager.on_token_refresh = lambda do |new_access_token, new_refresh_token, new_expires_at|
# Persist updated credentials in your DB/secrets store
end
current_access_token = manager.access_token
Anthropic OAuth:
require "llm_gateway"
manager = LlmGateway::Clients::ClaudeCode::TokenManager.new(
refresh_token: stored_refresh_token,
access_token: stored_access_token, # optional
expires_at: stored_expires_at, # optional
client_id: ENV.fetch("ANTHROPIC_CLIENT_ID"),
client_secret: ENV["ANTHROPIC_CLIENT_SECRET"] # optional depending on app setup
)
manager.on_token_refresh = lambda do |new_access_token, new_refresh_token, new_expires_at|
# Persist updated credentials
end
current_access_token = manager.access_token
Pass access token in provider requests
Build the provider with the current access token:
adapter = LlmGateway.build_provider(
provider: "openai_codex",
access_token: current_access_token,
model_key: "gpt-5.4"
)
result = adapter.stream("Hello from OAuth auth")
puts result.content.select { |b| b.type == "text" }.map(&:text).join
If your app refreshes tokens in the background, rebuild the adapter (or recreate client state) with the newest access_token before subsequent calls.
Token refresh responsibility
Library’s role (llm_gateway)
- Provides token manager helpers.
- Detects expiry from expires_at.
- Refreshes access token when asked (ensure_valid_token / refresh methods).
- Returns updated token values and triggers on_token_refresh callback after successful refresh.
- Uses whatever access token you pass into provider requests.
User/app’s role
- Persist tokens securely (DB/secrets store).
- Store and pass access_token, refresh_token, expires_at into the token manager.
- Implement on_token_refresh to save updated credentials.
- Decide refresh/retry policy at app level (e.g., retry failed request after refresh when appropriate).
- Rebuild client/provider state with latest access token for future calls.
In short: library executes refresh mechanics; your app owns token lifecycle persistence and operational policy.