Module: Pinot

Defined in:
lib/pinot/config.rb,
lib/pinot/errors.rb,
lib/pinot/logger.rb,
lib/pinot/request.rb,
lib/pinot/version.rb,
lib/pinot/response.rb,
lib/pinot/paginator.rb,
lib/pinot/transport.rb,
lib/pinot/connection.rb,
lib/pinot/tls_config.rb,
lib/pinot/grpc_config.rb,
lib/pinot/query_result.rb,
lib/pinot/grpc_transport.rb,
lib/pinot/broker_selector.rb,
lib/pinot/circuit_breaker.rb,
lib/pinot/instrumentation.rb,
lib/pinot/connection_factory.rb,
lib/pinot/prepared_statement.rb,
lib/pinot/controller_response.rb,
lib/pinot/simple_broker_selector.rb,
lib/pinot/proto/broker_service_pb.rb,
lib/pinot/zookeeper_broker_selector.rb,
lib/pinot/table_aware_broker_selector.rb,
lib/pinot/active_support_notifications.rb,
lib/pinot/controller_based_broker_selector.rb,
lib/pinot/proto/broker_service_services_pb.rb

Defined Under Namespace

Modules: ActiveSupportNotifications, Broker, BrokerSelector, Instrumentation, Logging, PreparedStatement Classes: AggregationResult, BrokerDto, BrokerNotFoundError, BrokerResponse, BrokerUnavailableError, CircuitBreaker, CircuitBreakerRegistry, ClientConfig, ConfigurationError, Connection, ControllerBasedBrokerSelector, ControllerConfig, ControllerResponse, Error, GrpcConfig, GrpcTransport, HttpClient, JsonHttpTransport, JsonNumber, Paginator, PinotException, PreparedStatementClosedError, PreparedStatementImpl, QueryResult, QueryTimeoutError, RateLimitError, Request, RespSchema, ResultTable, SelectionResults, SimpleBrokerSelector, TableAwareBrokerSelector, TableNotFoundError, TlsConfig, TransportError, ZookeeperBrokerSelector, ZookeeperConfig

Constant Summary collapse

VERSION =
"1.27.0"
INT32_MAX =
2_147_483_647
INT32_MIN =
-2_147_483_648
INT64_MAX =
9_223_372_036_854_775_807
INT64_MIN =
-9_223_372_036_854_775_808
FLOAT32_MAX =
3.4028235e+38

Class Method Summary collapse

Class Method Details

.from_broker_list(broker_list, http_client: nil) ⇒ Connection

Build a Connection from a static list of broker addresses.

conn = Pinot.from_broker_list(["broker1:8099", "broker2:8099"])

Parameters:

  • broker_list (Array<String>)

    broker host:port entries

  • http_client (HttpClient, nil) (defaults to: nil)

    optional pre-configured HTTP client

Returns:



9
10
11
12
# File 'lib/pinot/connection_factory.rb', line 9

def self.from_broker_list(broker_list, http_client: nil)
  config = ClientConfig.new(broker_list: broker_list)
  from_config(config, http_client: http_client)
end

.from_config(config, http_client: nil) ⇒ Connection

Build a Connection from a fully specified ClientConfig. This is the most flexible factory: it handles all transport types (HTTP, gRPC, ZooKeeper) and wires up the circuit breaker and retry logic from config flags.

config = Pinot::ClientConfig.new(
  broker_list:             ["broker:8099"],
  query_timeout_ms:        5_000,
  use_multistage_engine:   true,
  max_retries:             2,
  retry_interval_ms:       100,
  circuit_breaker_enabled: true
)
conn = Pinot.from_config(config)

Parameters:

  • config (ClientConfig)

    fully populated config object

  • http_client (HttpClient, nil) (defaults to: nil)

    optional pre-configured HTTP client

Returns:

Raises:



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/pinot/connection_factory.rb', line 48

def self.from_config(config, http_client: nil)
  config.validate!

  if config.grpc_config
    transport = GrpcTransport.new(config.grpc_config)
    selector  = SimpleBrokerSelector.new(config.grpc_config.broker_list)

    conn = Connection.new(
      transport: transport,
      broker_selector: selector,
      use_multistage_engine: config.use_multistage_engine || false,
      logger: config.logger,
      query_timeout_ms: config.query_timeout_ms
    )

    selector.init
    return conn
  end

  if config.zookeeper_config
    selector = ZookeeperBrokerSelector.new(zk_path: config.zookeeper_config.zk_path)
    selector.init

    inner = http_client || build_http_client(config)
    transport = JsonHttpTransport.new(
      http_client: inner,
      extra_headers: config.extra_http_header || {},
      logger: config.logger,
      timeout_ms: config.query_timeout_ms,
      max_retries: config.max_retries || 0,
      retry_interval_ms: config.retry_interval_ms || 200
    )

    return Connection.new(
      transport: transport,
      broker_selector: selector,
      use_multistage_engine: config.use_multistage_engine || false,
      logger: config.logger,
      query_timeout_ms: config.query_timeout_ms,
      circuit_breaker_registry: build_circuit_breaker_registry(config)
    )
  end

  inner = http_client || build_http_client(config)

  transport = JsonHttpTransport.new(
    http_client: inner,
    extra_headers: config.extra_http_header || {},
    logger: config.logger,
    max_retries: config.max_retries || 0,
    retry_interval_ms: config.retry_interval_ms || 200
  )

  selector = build_selector(config, inner)
  raise ConfigurationError, "must specify broker_list or controller_config" unless selector

  conn = Connection.new(
    transport: transport,
    broker_selector: selector,
    use_multistage_engine: config.use_multistage_engine || false,
    logger: config.logger,
    query_timeout_ms: config.query_timeout_ms,
    circuit_breaker_registry: build_circuit_breaker_registry(config)
  )

  selector.init
  conn
end

.from_controller(controller_address, http_client: nil) ⇒ Connection

Build a Connection backed by a Pinot controller for automatic broker discovery. The controller is polled in the background to keep the broker list fresh.

conn = Pinot.from_controller("controller:9000")

Parameters:

  • controller_address (String)

    controller host:port (or host:port)

  • http_client (HttpClient, nil) (defaults to: nil)

    optional pre-configured HTTP client

Returns:



22
23
24
25
26
27
# File 'lib/pinot/connection_factory.rb', line 22

def self.from_controller(controller_address, http_client: nil)
  config = ClientConfig.new(
    controller_config: ControllerConfig.new(controller_address: controller_address)
  )
  from_config(config, http_client: http_client)
end