Class: Pinot::Connection
- Inherits:
-
Object
- Object
- Pinot::Connection
- Defined in:
- lib/pinot/connection.rb
Overview
Main entry point for querying Apache Pinot over HTTP.
Build a Connection via the factory helpers rather than instantiating directly:
# Static broker list
conn = Pinot.from_broker_list(["broker1:8099", "broker2:8099"])
# Controller-managed broker discovery
conn = Pinot.from_controller("controller:9000")
# Full configuration
conn = Pinot.from_config(Pinot::ClientConfig.new(
broker_list: ["broker:8099"],
query_timeout_ms: 5_000,
use_multistage_engine: true,
max_retries: 2,
circuit_breaker_enabled: true
))
Instance Attribute Summary collapse
-
#query_timeout_ms ⇒ Integer?
Default query timeout in milliseconds; can be overridden per-call.
Instance Method Summary collapse
- #close_trace ⇒ Object
-
#execute_many(queries, max_concurrency: nil) ⇒ Array<QueryResult>
Execute multiple queries in parallel and return results in the same order.
-
#execute_sql(table, query, query_timeout_ms: nil, headers: {}) ⇒ BrokerResponse
Execute a SQL query against
tableand return a BrokerResponse. -
#execute_sql_with_params(table, query_pattern, params, query_timeout_ms: nil, headers: {}) ⇒ BrokerResponse
Execute a parameterised query by substituting
paramsintoquery_pattern. -
#execute_sql_with_timeout(table, query, timeout_ms) ⇒ Object
Convenience wrapper around execute_sql with an explicit timeout.
- #format_arg(value) ⇒ Object
- #format_query(pattern, params) ⇒ Object
-
#initialize(transport:, broker_selector:, use_multistage_engine: false, logger: nil, query_timeout_ms: nil, circuit_breaker_registry: nil) ⇒ Connection
constructor
A new instance of Connection.
- #open_trace ⇒ Object
-
#paginate(query, page_size: Paginator::DEFAULT_PAGE_SIZE, table: nil, extra_headers: {}) ⇒ Paginator
Return a Paginator for cursor-based iteration over large result sets.
-
#prepare(table, query_template) ⇒ PreparedStatementImpl
Create a PreparedStatement from a query template with ? placeholders.
- #use_multistage_engine=(val) ⇒ Object
Constructor Details
#initialize(transport:, broker_selector:, use_multistage_engine: false, logger: nil, query_timeout_ms: nil, circuit_breaker_registry: nil) ⇒ Connection
Returns a new instance of Connection.
26 27 28 29 30 31 32 33 34 35 |
# File 'lib/pinot/connection.rb', line 26 def initialize(transport:, broker_selector:, use_multistage_engine: false, logger: nil, query_timeout_ms: nil, circuit_breaker_registry: nil) @transport = transport @broker_selector = broker_selector @use_multistage_engine = use_multistage_engine @trace = false @logger = logger @query_timeout_ms = query_timeout_ms @circuit_breaker_registry = circuit_breaker_registry end |
Instance Attribute Details
#query_timeout_ms ⇒ Integer?
Returns default query timeout in milliseconds; can be overridden per-call.
24 25 26 |
# File 'lib/pinot/connection.rb', line 24 def query_timeout_ms @query_timeout_ms end |
Instance Method Details
#close_trace ⇒ Object
45 46 47 |
# File 'lib/pinot/connection.rb', line 45 def close_trace @trace = false end |
#execute_many(queries, max_concurrency: nil) ⇒ Array<QueryResult>
Execute multiple queries in parallel and return results in the same order.
Each element of queries must be a Hash with keys :table and :query (Strings or Symbols). An optional :query_timeout_ms key overrides the per-query timeout.
Each slot in the returned Array is a QueryResult with either a response or an error — failures are isolated so one bad query does not raise for the whole batch.
results = conn.execute_many([
{ table: "orders", query: "SELECT count(*) FROM orders" },
{ table: "products", query: "SELECT count(*) FROM products" }
], max_concurrency: 4)
results.each do |r|
puts r.success? ? r.response.result_table.get_long(0, 0) : r.error.
end
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/pinot/connection.rb', line 121 def execute_many(queries, max_concurrency: nil) return [] if queries.empty? results = Array.new(queries.size) # Queue acts as a counting semaphore: pre-filled with N tokens. sem = max_concurrency ? build_semaphore(max_concurrency) : nil threads = queries.each_with_index.map do |item, idx| table = item[:table] || item["table"] || "" query = item[:query] || item["query"] || "" timeout_ms = item[:query_timeout_ms] || item["query_timeout_ms"] Thread.new do sem&.pop # acquire begin resp = execute_sql(table, query, query_timeout_ms: timeout_ms) results[idx] = QueryResult.new(table: table, query: query, response: resp, error: nil) rescue => e results[idx] = QueryResult.new(table: table, query: query, response: nil, error: e) ensure sem&.push(:token) # release end end end threads.each(&:join) results end |
#execute_sql(table, query, query_timeout_ms: nil, headers: {}) ⇒ BrokerResponse
Execute a SQL query against table and return a BrokerResponse.
61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/pinot/connection.rb', line 61 def execute_sql(table, query, query_timeout_ms: nil, headers: {}) Pinot::Instrumentation.instrument(table: table, query: query) do logger.debug "Executing SQL on table=#{table}: #{query}" broker = @broker_selector.select_broker(table) effective_timeout = query_timeout_ms || @query_timeout_ms run_with_circuit_breaker(broker) do @transport.execute(broker, build_request(query, timeout_ms: effective_timeout), extra_request_headers: headers) end end end |
#execute_sql_with_params(table, query_pattern, params, query_timeout_ms: nil, headers: {}) ⇒ BrokerResponse
Execute a parameterised query by substituting params into query_pattern. Each ? placeholder in the pattern is replaced by the corresponding value using safe type-aware formatting (strings are quoted and escaped).
91 92 93 94 95 96 |
# File 'lib/pinot/connection.rb', line 91 def execute_sql_with_params(table, query_pattern, params, query_timeout_ms: nil, headers: {}) query = format_query(query_pattern, params) execute_sql(table, query, query_timeout_ms: query_timeout_ms, headers: headers) rescue => e raise e end |
#execute_sql_with_timeout(table, query, timeout_ms) ⇒ Object
Convenience wrapper around execute_sql with an explicit timeout. Equivalent to execute_sql(table, query, query_timeout_ms: timeout_ms).
75 76 77 |
# File 'lib/pinot/connection.rb', line 75 def execute_sql_with_timeout(table, query, timeout_ms) execute_sql(table, query, query_timeout_ms: timeout_ms) end |
#format_arg(value) ⇒ Object
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/pinot/connection.rb', line 212 def format_arg(value) case value when String "'#{value.gsub("'", "''")}'" when Integer value.to_s when Float value.to_s when TrueClass, FalseClass value.to_s when BigDecimal s = value.to_s("F") # Strip trailing .0 for whole numbers (mirrors Go's big.Int quoted format) s = s.sub(/\.0\z/, "") if s.end_with?(".0") "'#{s}'" when Time "'#{value.utc.strftime("%Y-%m-%d %H:%M:%S.") + format("%03d", value.utc.subsec * 1000)}'" else raise "unsupported type: #{value.class}" end end |
#format_query(pattern, params) ⇒ Object
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/pinot/connection.rb', line 193 def format_query(pattern, params) params ||= [] placeholders = pattern.count("?") if placeholders != params.length raise "failed to format query: number of placeholders in queryPattern (#{placeholders}) does not match number of params (#{params.length})" end parts = pattern.split("?", -1) result = "" params.each_with_index do |param, i| formatted = begin format_arg(param) rescue => e raise "failed to format query: failed to format parameter: #{e.}" end result += parts[i] + formatted end result + parts.last end |
#open_trace ⇒ Object
41 42 43 |
# File 'lib/pinot/connection.rb', line 41 def open_trace @trace = true end |
#paginate(query, page_size: Paginator::DEFAULT_PAGE_SIZE, table: nil, extra_headers: {}) ⇒ Paginator
Return a Paginator for cursor-based iteration over large result sets.
The query must include a LIMIT clause; the broker stores the result set and returns it in page_size row slices on demand.
paginator = conn.paginate("SELECT * FROM myTable LIMIT 50000", page_size: 500)
paginator.each_row { |row| puts row.map(&:to_s).join(", ") }
163 164 165 166 167 168 169 170 171 172 |
# File 'lib/pinot/connection.rb', line 163 def paginate(query, page_size: Paginator::DEFAULT_PAGE_SIZE, table: nil, extra_headers: {}) broker = @broker_selector.select_broker(table || "") Paginator.new( @transport.http_client, broker, query, page_size: page_size, extra_headers: extra_headers ) end |
#prepare(table, query_template) ⇒ PreparedStatementImpl
Create a PreparedStatement from a query template with ? placeholders.
stmt = conn.prepare("myTable", "SELECT * FROM myTable WHERE id = ? AND name = ?")
stmt.set(1, 42)
stmt.set(2, "Alice")
resp = stmt.execute
185 186 187 188 189 190 191 |
# File 'lib/pinot/connection.rb', line 185 def prepare(table, query_template) raise ArgumentError, "table name cannot be empty" if table.nil? || table.strip.empty? raise ArgumentError, "query template cannot be empty" if query_template.nil? || query_template.strip.empty? count = query_template.count("?") raise ArgumentError, "query template must contain at least one parameter placeholder (?)" if count == 0 PreparedStatementImpl.new(connection: self, table: table, query_template: query_template) end |
#use_multistage_engine=(val) ⇒ Object
37 38 39 |
# File 'lib/pinot/connection.rb', line 37 def use_multistage_engine=(val) @use_multistage_engine = val end |