Ruby Pinot Client
A Ruby client library for Apache Pinot. Mirrors the API of the Go client and supports HTTP JSON transport, multiple broker discovery strategies, and parameterized queries.
Installation
Add to your Gemfile:
gem "pinot-client", "~> 1.0"
Or install directly:
gem install pinot-client
Quick Start
Start a local Pinot cluster using Docker:
docker run -d \
--name pinot-quickstart \
-p 8000:8000 \
apachepinot/pinot:1.5.0 QuickStart -type BATCH
Then query it from Ruby:
require "pinot"
client = Pinot.from_broker_list(["localhost:8000"])
resp = client.execute_sql("baseballStats", "SELECT count(*) AS cnt FROM baseballStats")
puts resp.result_table.get_long(0, 0) # => 97889
Examples
The examples/ directory contains runnable scripts demonstrating common usage patterns:
basic_query.rb— simple SQL query and result iterationprepared_statement.rb— parameterized queries with PreparedStatementcontroller_discovery.rb— automatic broker discovery via Controllerinstrumentation.rb— attaching metrics/logging hooksmultistage_query.rb— multi-stage query engine usage
Run any example against a local Pinot quickstart cluster:
ruby examples/basic_query.rb
Creating a Connection
From a broker list
client = Pinot.from_broker_list(["localhost:8000"])
For HTTPS brokers, include the scheme:
client = Pinot.from_broker_list(["https://pinot-broker.example.com"])
From a controller (dynamic broker discovery)
The client periodically polls the controller's /v2/brokers/tables API and automatically picks up broker changes.
client = Pinot.from_controller("localhost:9000")
Via gRPC
Requires the grpc gem (gem "grpc", "~> 1.65" in your Gemfile).
grpc_config = Pinot::GrpcConfig.new(
broker_list: ["localhost:8090"],
timeout: 10,
extra_metadata: { "Authorization" => "Bearer <token>" }
)
config = Pinot::ClientConfig.new(grpc_config: grpc_config)
client = Pinot.from_config(config)
From ZooKeeper (dynamic broker discovery)
Requires the zk gem (gem "zk" in your Gemfile). Watches /EXTERNALVIEW/brokerResource and automatically picks up broker changes.
zk_config = Pinot::ZookeeperConfig.new(zk_path: "localhost:2181")
config = Pinot::ClientConfig.new(zookeeper_config: zk_config)
client = Pinot.from_config(config)
From a ClientConfig
config = Pinot::ClientConfig.new(
broker_list: ["localhost:8000"],
http_timeout: 10, # seconds — sets open_timeout, read_timeout, and write_timeout on the underlying Net::HTTP connection
extra_http_header: { "Authorization" => "Bearer <token>" },
use_multistage_engine: false,
controller_config: Pinot::ControllerConfig.new(
controller_address: "localhost:9000",
update_freq_ms: 1000,
extra_controller_api_headers: { "Authorization" => "Bearer <token>" }
)
)
client = Pinot.from_config(config)
validate! raises Pinot::ConfigurationError early if the config is invalid.
With TLS
tls = Pinot::TlsConfig.new(
ca_cert_file: "/path/to/ca.pem",
client_cert_file: "/path/to/client.crt",
client_key_file: "/path/to/client.key",
insecure_skip_verify: false # set true to skip cert verification
)
config = Pinot::ClientConfig.new(
broker_list: ["https://pinot-broker.example.com:8000"],
tls_config: tls
)
client = Pinot.from_config(config)
Logging
By default the client logs warnings to stdout. Configure a custom logger globally or per-client:
# Global logger
Pinot::Logging.logger = Logger.new("pinot.log", level: Logger::DEBUG)
# Per-client logger via ClientConfig
config = Pinot::ClientConfig.new(
broker_list: ["localhost:8000"],
logger: Logger.new($stdout, level: Logger::INFO)
)
client = Pinot.from_config(config)
Instrumentation
Hook into every query execution for metrics, tracing, or alerting:
Pinot::Instrumentation.on_query = lambda do |event|
puts "#{event[:table]} #{event[:query][0..50]} " \
"#{event[:duration_ms].round(1)}ms " \
"success=#{event[:success]}"
end
The event hash contains: :table, :query, :duration_ms (Float), :success (Boolean), :error (Exception or nil).
Executing Queries
Simple SQL
resp = client.execute_sql("baseballStats", "SELECT playerName, homeRuns FROM baseballStats LIMIT 5")
Parameterized queries
Use ? placeholders — the client substitutes and quotes values safely:
resp = client.execute_sql_with_params(
"baseballStats",
"SELECT playerName FROM baseballStats WHERE teamID = ? AND yearID >= ?",
["SFN", 2000]
)
Multistage engine
client.use_multistage_engine = true
resp = client.execute_sql("baseballStats", "SELECT teamID, count(*) FROM baseballStats GROUP BY teamID")
Or enable it upfront via ClientConfig:
config = Pinot::ClientConfig.new(
broker_list: ["localhost:8000"],
use_multistage_engine: true
)
client = Pinot.from_config(config)
Per-request timeout
# Global default via config
config = Pinot::ClientConfig.new(
broker_list: ["localhost:8000"],
query_timeout_ms: 5000
)
# One-off override per query
resp = client.execute_sql_with_timeout("myTable", "SELECT * FROM myTable", 3000)
Trace
client.open_trace
resp = client.execute_sql("baseballStats", "SELECT count(*) FROM baseballStats")
client.close_trace
Error Handling
All errors raised by the client inherit from Pinot::Error < StandardError:
| Class | Raised when |
|---|---|
Pinot::BrokerNotFoundError |
No broker available (empty list or all offline) |
Pinot::TableNotFoundError |
Named table not found in broker map |
Pinot::TransportError |
Non-200 HTTP response from broker |
Pinot::PreparedStatementClosedError |
Operation on a closed prepared statement |
Pinot::ConfigurationError |
Invalid config (bad URL scheme, missing broker source) |
Example:
begin
resp = client.execute_sql("myTable", "SELECT * FROM myTable")
rescue Pinot::TableNotFoundError => e
puts "Table missing: #{e.}"
rescue Pinot::TransportError => e
puts "Broker error: #{e.}"
rescue Pinot::Error => e
puts "Pinot error: #{e.}"
end
Retry
Configure automatic retry with exponential backoff for transient errors (connection reset, timeout, HTTP 503):
config = Pinot::ClientConfig.new(
broker_list: ["localhost:8000"],
max_retries: 3,
retry_interval_ms: 200 # base interval; doubles each attempt (200ms, 400ms, 800ms)
)
client = Pinot.from_config(config)
Reading Results
execute_sql returns a Pinot::BrokerResponse. Results are in result_table:
rt = resp.result_table
rt.row_count # => number of rows
rt.column_count # => number of columns
rt.column_name(0) # => "playerName"
rt.column_data_type(0)# => "STRING"
rt.get(0, 0) # => raw cell value (Pinot::JsonNumber or String)
rt.get_string(0, 0) # => String
rt.get_int(0, 1) # => Integer (32-bit)
rt.get_long(0, 1) # => Integer (64-bit)
rt.get_float(0, 2) # => Float (32-bit range)
rt.get_double(0, 2) # => Float (64-bit)
Stat fields on BrokerResponse:
resp.time_used_ms
resp.num_docs_scanned
resp.total_docs
resp.exceptions # => Array<Pinot::PinotException>
resp.num_groups_limit_reached
Prepared Statements
Prepared statements are thread-safe and can be reused with different parameters.
stmt = client.prepare(
"baseballStats",
"SELECT playerName, sum(homeRuns) AS hr FROM baseballStats WHERE teamID = ? AND yearID >= ? GROUP BY playerName ORDER BY hr DESC LIMIT ?"
)
stmt.set_string(1, "NYA")
stmt.set_int(2, 2000)
stmt.set_int(3, 10)
resp = stmt.execute
# Or pass params inline:
resp = stmt.execute_with_params("BOS", 2000, 5)
# Reuse with new parameters:
stmt.clear_parameters
stmt.set_string(1, "LAA")
stmt.set_int(2, 2005)
stmt.set_int(3, 3)
resp = stmt.execute
stmt.close
Supported parameter types
| Ruby type | SQL rendering |
|---|---|
String |
'value' (single-quote escaped) |
Integer |
42 |
Float |
3.14 |
TrueClass / FalseClass |
true / false |
BigDecimal |
'1234567890' |
Time |
'2023-01-01 12:00:00.000' |
Running Tests
Unit tests
bundle install
bundle exec rspec spec/pinot/
Integration tests (requires a running Pinot cluster)
docker run -d --name pinot-quickstart -p 8000:8000 \
apachepinot/pinot:1.5.0 QuickStart -type BATCH
# wait ~2 minutes for the cluster to load data, then:
bundle exec rspec spec/integration/ --format documentation
Environment variables:
| Variable | Default | Description |
|---|---|---|
BROKER_HOST |
127.0.0.1 |
Pinot broker hostname |
BROKER_PORT |
8000 |
Pinot broker HTTP port |
Changelog
See CHANGELOG.md for a full history of releases.
License
Apache License 2.0 — see LICENSE.