Class: Pinot::Connection
- Inherits:
-
Object
- Object
- Pinot::Connection
- Defined in:
- lib/pinot/connection.rb
Instance Attribute Summary collapse
-
#query_timeout_ms ⇒ Object
Returns the value of attribute query_timeout_ms.
Instance Method Summary collapse
- #close_trace ⇒ Object
- #execute_many(queries, max_concurrency: nil) ⇒ Object
- #execute_sql(table, query, query_timeout_ms: nil, headers: {}) ⇒ Object
- #execute_sql_with_params(table, query_pattern, params, query_timeout_ms: nil, headers: {}) ⇒ Object
- #execute_sql_with_timeout(table, query, timeout_ms) ⇒ Object
- #format_arg(value) ⇒ Object
- #format_query(pattern, params) ⇒ Object
-
#initialize(transport:, broker_selector:, use_multistage_engine: false, logger: nil, query_timeout_ms: nil, circuit_breaker_registry: nil) ⇒ Connection
constructor
A new instance of Connection.
- #open_trace ⇒ Object
- #prepare(table, query_template) ⇒ Object
- #use_multistage_engine=(val) ⇒ Object
Constructor Details
#initialize(transport:, broker_selector:, use_multistage_engine: false, logger: nil, query_timeout_ms: nil, circuit_breaker_registry: nil) ⇒ Connection
Returns a new instance of Connection.
7 8 9 10 11 12 13 14 15 16 |
# File 'lib/pinot/connection.rb', line 7 def initialize(transport:, broker_selector:, use_multistage_engine: false, logger: nil, query_timeout_ms: nil, circuit_breaker_registry: nil) @transport = transport @broker_selector = broker_selector @use_multistage_engine = use_multistage_engine @trace = false @logger = logger @query_timeout_ms = query_timeout_ms @circuit_breaker_registry = circuit_breaker_registry end |
Instance Attribute Details
#query_timeout_ms ⇒ Object
Returns the value of attribute query_timeout_ms.
5 6 7 |
# File 'lib/pinot/connection.rb', line 5 def query_timeout_ms @query_timeout_ms end |
Instance Method Details
#close_trace ⇒ Object
26 27 28 |
# File 'lib/pinot/connection.rb', line 26 def close_trace @trace = false end |
#execute_many(queries, max_concurrency: nil) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/pinot/connection.rb', line 53 def execute_many(queries, max_concurrency: nil) return [] if queries.empty? results = Array.new(queries.size) # Queue acts as a counting semaphore: pre-filled with N tokens. sem = max_concurrency ? build_semaphore(max_concurrency) : nil threads = queries.each_with_index.map do |item, idx| table = item[:table] || item["table"] || "" query = item[:query] || item["query"] || "" timeout_ms = item[:query_timeout_ms] || item["query_timeout_ms"] Thread.new do sem&.pop # acquire begin resp = execute_sql(table, query, query_timeout_ms: timeout_ms) results[idx] = QueryResult.new(table: table, query: query, response: resp, error: nil) rescue => e results[idx] = QueryResult.new(table: table, query: query, response: nil, error: e) ensure sem&.push(:token) # release end end end threads.each(&:join) results end |
#execute_sql(table, query, query_timeout_ms: nil, headers: {}) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/pinot/connection.rb', line 30 def execute_sql(table, query, query_timeout_ms: nil, headers: {}) Pinot::Instrumentation.instrument(table: table, query: query) do logger.debug "Executing SQL on table=#{table}: #{query}" broker = @broker_selector.select_broker(table) effective_timeout = query_timeout_ms || @query_timeout_ms run_with_circuit_breaker(broker) do @transport.execute(broker, build_request(query, timeout_ms: effective_timeout), extra_request_headers: headers) end end end |
#execute_sql_with_params(table, query_pattern, params, query_timeout_ms: nil, headers: {}) ⇒ Object
46 47 48 49 50 51 |
# File 'lib/pinot/connection.rb', line 46 def execute_sql_with_params(table, query_pattern, params, query_timeout_ms: nil, headers: {}) query = format_query(query_pattern, params) execute_sql(table, query, query_timeout_ms: query_timeout_ms, headers: headers) rescue => e raise e end |
#execute_sql_with_timeout(table, query, timeout_ms) ⇒ Object
42 43 44 |
# File 'lib/pinot/connection.rb', line 42 def execute_sql_with_timeout(table, query, timeout_ms) execute_sql(table, query, query_timeout_ms: timeout_ms) end |
#format_arg(value) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/pinot/connection.rb', line 109 def format_arg(value) case value when String "'#{value.gsub("'", "''")}'" when Integer value.to_s when Float value.to_s when TrueClass, FalseClass value.to_s when BigDecimal s = value.to_s("F") # Strip trailing .0 for whole numbers (mirrors Go's big.Int quoted format) s = s.sub(/\.0\z/, "") if s.end_with?(".0") "'#{s}'" when Time "'#{value.utc.strftime("%Y-%m-%d %H:%M:%S.") + format("%03d", value.utc.subsec * 1000)}'" else raise "unsupported type: #{value.class}" end end |
#format_query(pattern, params) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/pinot/connection.rb', line 90 def format_query(pattern, params) params ||= [] placeholders = pattern.count("?") if placeholders != params.length raise "failed to format query: number of placeholders in queryPattern (#{placeholders}) does not match number of params (#{params.length})" end parts = pattern.split("?", -1) result = "" params.each_with_index do |param, i| formatted = begin format_arg(param) rescue => e raise "failed to format query: failed to format parameter: #{e.}" end result += parts[i] + formatted end result + parts.last end |
#open_trace ⇒ Object
22 23 24 |
# File 'lib/pinot/connection.rb', line 22 def open_trace @trace = true end |
#prepare(table, query_template) ⇒ Object
82 83 84 85 86 87 88 |
# File 'lib/pinot/connection.rb', line 82 def prepare(table, query_template) raise ArgumentError, "table name cannot be empty" if table.nil? || table.strip.empty? raise ArgumentError, "query template cannot be empty" if query_template.nil? || query_template.strip.empty? count = query_template.count("?") raise ArgumentError, "query template must contain at least one parameter placeholder (?)" if count == 0 PreparedStatementImpl.new(connection: self, table: table, query_template: query_template) end |
#use_multistage_engine=(val) ⇒ Object
18 19 20 |
# File 'lib/pinot/connection.rb', line 18 def use_multistage_engine=(val) @use_multistage_engine = val end |