Class: Pinot::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/pinot/connection.rb

Overview

Main entry point for querying Apache Pinot over HTTP.

Build a Connection via the factory helpers rather than instantiating directly:

# Static broker list
conn = Pinot.from_broker_list(["broker1:8099", "broker2:8099"])

# Controller-managed broker discovery
conn = Pinot.from_controller("controller:9000")

# Full configuration
conn = Pinot.from_config(Pinot::ClientConfig.new(
  broker_list:             ["broker:8099"],
  query_timeout_ms:        5_000,
  use_multistage_engine:   true,
  max_retries:             2,
  circuit_breaker_enabled: true
))

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(transport:, broker_selector:, use_multistage_engine: false, logger: nil, query_timeout_ms: nil, circuit_breaker_registry: nil) ⇒ Connection

Returns a new instance of Connection.



26
27
28
29
30
31
32
33
34
35
# File 'lib/pinot/connection.rb', line 26

def initialize(transport:, broker_selector:, use_multistage_engine: false, logger: nil,
               query_timeout_ms: nil, circuit_breaker_registry: nil)
  @transport = transport
  @broker_selector = broker_selector
  @use_multistage_engine = use_multistage_engine
  @trace = false
  @logger = logger
  @query_timeout_ms = query_timeout_ms
  @circuit_breaker_registry = circuit_breaker_registry
end

Instance Attribute Details

#query_timeout_msInteger?

Returns default query timeout in milliseconds; can be overridden per-call.

Returns:

  • (Integer, nil)

    default query timeout in milliseconds; can be overridden per-call



24
25
26
# File 'lib/pinot/connection.rb', line 24

def query_timeout_ms
  @query_timeout_ms
end

Instance Method Details

#close_traceObject



45
46
47
# File 'lib/pinot/connection.rb', line 45

def close_trace
  @trace = false
end

#execute_many(queries, max_concurrency: nil) ⇒ Array<QueryResult>

Execute multiple queries in parallel and return results in the same order.

Each element of queries must be a Hash with keys :table and :query (Strings or Symbols). An optional :query_timeout_ms key overrides the per-query timeout.

Each slot in the returned Array is a QueryResult with either a response or an error — failures are isolated so one bad query does not raise for the whole batch.

results = conn.execute_many([
  { table: "orders",   query: "SELECT count(*) FROM orders" },
  { table: "products", query: "SELECT count(*) FROM products" }
], max_concurrency: 4)

results.each do |r|
  puts r.success? ? r.response.result_table.get_long(0, 0) : r.error.message
end

Parameters:

  • queries (Array<Hash>)

    query descriptors

  • max_concurrency (Integer, nil) (defaults to: nil)

    maximum simultaneous in-flight queries; nil means unlimited

Returns:



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/pinot/connection.rb', line 121

def execute_many(queries, max_concurrency: nil)
  return [] if queries.empty?

  results  = Array.new(queries.size)
  # Queue acts as a counting semaphore: pre-filled with N tokens.
  sem = max_concurrency ? build_semaphore(max_concurrency) : nil

  threads = queries.each_with_index.map do |item, idx|
    table      = item[:table]           || item["table"]           || ""
    query      = item[:query]           || item["query"]           || ""
    timeout_ms = item[:query_timeout_ms] || item["query_timeout_ms"]

    Thread.new do
      sem&.pop   # acquire
      begin
        resp = execute_sql(table, query, query_timeout_ms: timeout_ms)
        results[idx] = QueryResult.new(table: table, query: query, response: resp, error: nil)
      rescue => e
        results[idx] = QueryResult.new(table: table, query: query, response: nil, error: e)
      ensure
        sem&.push(:token)  # release
      end
    end
  end

  threads.each(&:join)
  results
end

#execute_sql(table, query, query_timeout_ms: nil, headers: {}) ⇒ BrokerResponse

Execute a SQL query against table and return a BrokerResponse.

Parameters:

  • table (String)

    Pinot table name (used for broker selection)

  • query (String)

    SQL query string

  • query_timeout_ms (Integer, nil) (defaults to: nil)

    per-call timeout override (ms); overrides the connection-level query_timeout_ms

  • headers (Hash) (defaults to: {})

    extra HTTP headers merged into this request only

Returns:

Raises:



61
62
63
64
65
66
67
68
69
70
71
# File 'lib/pinot/connection.rb', line 61

def execute_sql(table, query, query_timeout_ms: nil, headers: {})
  Pinot::Instrumentation.instrument(table: table, query: query) do
    logger.debug "Executing SQL on table=#{table}: #{query}"
    broker = @broker_selector.select_broker(table)
    effective_timeout = query_timeout_ms || @query_timeout_ms
    run_with_circuit_breaker(broker) do
      @transport.execute(broker, build_request(query, timeout_ms: effective_timeout),
                         extra_request_headers: headers)
    end
  end
end

#execute_sql_with_params(table, query_pattern, params, query_timeout_ms: nil, headers: {}) ⇒ BrokerResponse

Execute a parameterised query by substituting params into query_pattern. Each ? placeholder in the pattern is replaced by the corresponding value using safe type-aware formatting (strings are quoted and escaped).

Parameters:

  • table (String)

    Pinot table name

  • query_pattern (String)

    SQL template, e.g. “SELECT * FROM t WHERE id = ?”

  • params (Array)

    ordered values; supported types: String, Integer, Float, TrueClass, FalseClass, BigDecimal, Time

  • query_timeout_ms (Integer, nil) (defaults to: nil)

    per-call timeout override (ms)

  • headers (Hash) (defaults to: {})

    extra HTTP headers for this request

Returns:

Raises:

  • (RuntimeError)

    placeholder / param count mismatch or unsupported type



91
92
93
94
95
96
# File 'lib/pinot/connection.rb', line 91

def execute_sql_with_params(table, query_pattern, params, query_timeout_ms: nil, headers: {})
  query = format_query(query_pattern, params)
  execute_sql(table, query, query_timeout_ms: query_timeout_ms, headers: headers)
rescue => e
  raise e
end

#execute_sql_with_timeout(table, query, timeout_ms) ⇒ Object

Convenience wrapper around execute_sql with an explicit timeout. Equivalent to execute_sql(table, query, query_timeout_ms: timeout_ms).



75
76
77
# File 'lib/pinot/connection.rb', line 75

def execute_sql_with_timeout(table, query, timeout_ms)
  execute_sql(table, query, query_timeout_ms: timeout_ms)
end

#format_arg(value) ⇒ Object



212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/pinot/connection.rb', line 212

def format_arg(value)
  case value
  when String
    "'#{value.gsub("'", "''")}'"
  when Integer
    value.to_s
  when Float
    value.to_s
  when TrueClass, FalseClass
    value.to_s
  when BigDecimal
    s = value.to_s("F")
    # Strip trailing .0 for whole numbers (mirrors Go's big.Int quoted format)
    s = s.sub(/\.0\z/, "") if s.end_with?(".0")
    "'#{s}'"
  when Time
    "'#{value.utc.strftime("%Y-%m-%d %H:%M:%S.") + format("%03d", value.utc.subsec * 1000)}'"
  else
    raise "unsupported type: #{value.class}"
  end
end

#format_query(pattern, params) ⇒ Object



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/pinot/connection.rb', line 193

def format_query(pattern, params)
  params ||= []
  placeholders = pattern.count("?")
  if placeholders != params.length
    raise "failed to format query: number of placeholders in queryPattern (#{placeholders}) does not match number of params (#{params.length})"
  end
  parts = pattern.split("?", -1)
  result = ""
  params.each_with_index do |param, i|
    formatted = begin
      format_arg(param)
    rescue => e
      raise "failed to format query: failed to format parameter: #{e.message}"
    end
    result += parts[i] + formatted
  end
  result + parts.last
end

#open_traceObject



41
42
43
# File 'lib/pinot/connection.rb', line 41

def open_trace
  @trace = true
end

#paginate(query, page_size: Paginator::DEFAULT_PAGE_SIZE, table: nil, extra_headers: {}) ⇒ Paginator

Return a Paginator for cursor-based iteration over large result sets.

The query must include a LIMIT clause; the broker stores the result set and returns it in page_size row slices on demand.

paginator = conn.paginate("SELECT * FROM myTable LIMIT 50000", page_size: 500)
paginator.each_row { |row| puts row.map(&:to_s).join(", ") }

Parameters:

  • query (String)

    SQL query (should include LIMIT)

  • page_size (Integer) (defaults to: Paginator::DEFAULT_PAGE_SIZE)

    rows per page (default Paginator::DEFAULT_PAGE_SIZE = 1000)

  • table (String, nil) (defaults to: nil)

    used only for broker selection; nil picks any broker

  • extra_headers (Hash) (defaults to: {})

    merged into every HTTP request of this cursor session

Returns:



163
164
165
166
167
168
169
170
171
172
# File 'lib/pinot/connection.rb', line 163

def paginate(query, page_size: Paginator::DEFAULT_PAGE_SIZE, table: nil, extra_headers: {})
  broker = @broker_selector.select_broker(table || "")
  Paginator.new(
    @transport.http_client,
    broker,
    query,
    page_size:     page_size,
    extra_headers: extra_headers
  )
end

#prepare(table, query_template) ⇒ PreparedStatementImpl

Create a PreparedStatement from a query template with ? placeholders.

stmt = conn.prepare("myTable", "SELECT * FROM myTable WHERE id = ? AND name = ?")
stmt.set(1, 42)
stmt.set(2, "Alice")
resp = stmt.execute

Parameters:

  • table (String)

    Pinot table name (non-empty)

  • query_template (String)

    SQL with one or more ? placeholders

Returns:

Raises:

  • (ArgumentError)

    if table or query_template is blank, or contains no placeholders



185
186
187
188
189
190
191
# File 'lib/pinot/connection.rb', line 185

def prepare(table, query_template)
  raise ArgumentError, "table name cannot be empty" if table.nil? || table.strip.empty?
  raise ArgumentError, "query template cannot be empty" if query_template.nil? || query_template.strip.empty?
  count = query_template.count("?")
  raise ArgumentError, "query template must contain at least one parameter placeholder (?)" if count == 0
  PreparedStatementImpl.new(connection: self, table: table, query_template: query_template)
end

#use_multistage_engine=(val) ⇒ Object



37
38
39
# File 'lib/pinot/connection.rb', line 37

def use_multistage_engine=(val)
  @use_multistage_engine = val
end