Class: Pinot::Connection
- Inherits:
-
Object
- Object
- Pinot::Connection
- Defined in:
- lib/pinot/connection.rb
Instance Attribute Summary collapse
-
#query_timeout_ms ⇒ Object
Returns the value of attribute query_timeout_ms.
Instance Method Summary collapse
- #close_trace ⇒ Object
- #execute_sql(table, query) ⇒ Object
- #execute_sql_with_params(table, query_pattern, params) ⇒ Object
- #execute_sql_with_timeout(table, query, timeout_ms) ⇒ Object
- #format_arg(value) ⇒ Object
- #format_query(pattern, params) ⇒ Object
-
#initialize(transport:, broker_selector:, use_multistage_engine: false, logger: nil, query_timeout_ms: nil) ⇒ Connection
constructor
A new instance of Connection.
- #open_trace ⇒ Object
- #prepare(table, query_template) ⇒ Object
- #use_multistage_engine=(val) ⇒ Object
Constructor Details
#initialize(transport:, broker_selector:, use_multistage_engine: false, logger: nil, query_timeout_ms: nil) ⇒ Connection
Returns a new instance of Connection.
7 8 9 10 11 12 13 14 |
# File 'lib/pinot/connection.rb', line 7 def initialize(transport:, broker_selector:, use_multistage_engine: false, logger: nil, query_timeout_ms: nil) @transport = transport @broker_selector = broker_selector @use_multistage_engine = use_multistage_engine @trace = false @logger = logger @query_timeout_ms = query_timeout_ms end |
Instance Attribute Details
#query_timeout_ms ⇒ Object
Returns the value of attribute query_timeout_ms.
5 6 7 |
# File 'lib/pinot/connection.rb', line 5 def query_timeout_ms @query_timeout_ms end |
Instance Method Details
#close_trace ⇒ Object
24 25 26 |
# File 'lib/pinot/connection.rb', line 24 def close_trace @trace = false end |
#execute_sql(table, query) ⇒ Object
28 29 30 31 32 33 34 35 36 |
# File 'lib/pinot/connection.rb', line 28 def execute_sql(table, query) Pinot::Instrumentation.instrument(table: table, query: query) do logger.debug "Executing SQL on table=#{table}: #{query}" broker = @broker_selector.select_broker(table) @transport.execute(broker, build_request(query)) end rescue => e raise "unable to execute SQL on table #{table}: #{e.}" end |
#execute_sql_with_params(table, query_pattern, params) ⇒ Object
46 47 48 49 50 51 52 |
# File 'lib/pinot/connection.rb', line 46 def execute_sql_with_params(table, query_pattern, params) query = format_query(query_pattern, params) execute_sql(table, query) rescue => e # Re-raise format errors directly (they already have the right message) raise e end |
#execute_sql_with_timeout(table, query, timeout_ms) ⇒ Object
38 39 40 41 42 43 44 |
# File 'lib/pinot/connection.rb', line 38 def execute_sql_with_timeout(table, query, timeout_ms) logger.debug "Executing SQL with timeout=#{timeout_ms}ms on table=#{table}: #{query}" broker = @broker_selector.select_broker(table) @transport.execute(broker, build_request(query, timeout_ms: timeout_ms)) rescue => e raise "unable to execute SQL on table #{table}: #{e.}" end |
#format_arg(value) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/pinot/connection.rb', line 81 def format_arg(value) case value when String "'#{value.gsub("'", "''")}'" when Integer value.to_s when Float value.to_s when TrueClass, FalseClass value.to_s when BigDecimal s = value.to_s("F") # Strip trailing .0 for whole numbers (mirrors Go's big.Int quoted format) s = s.sub(/\.0\z/, "") if s.end_with?(".0") "'#{s}'" when Time "'#{value.utc.strftime("%Y-%m-%d %H:%M:%S.") + format("%03d", value.utc.subsec * 1000)}'" else raise "unsupported type: #{value.class}" end end |
#format_query(pattern, params) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/pinot/connection.rb', line 62 def format_query(pattern, params) params ||= [] placeholders = pattern.count("?") if placeholders != params.length raise "failed to format query: number of placeholders in queryPattern (#{placeholders}) does not match number of params (#{params.length})" end parts = pattern.split("?", -1) result = "" params.each_with_index do |param, i| formatted = begin format_arg(param) rescue => e raise "failed to format query: failed to format parameter: #{e.}" end result += parts[i] + formatted end result + parts.last end |
#open_trace ⇒ Object
20 21 22 |
# File 'lib/pinot/connection.rb', line 20 def open_trace @trace = true end |
#prepare(table, query_template) ⇒ Object
54 55 56 57 58 59 60 |
# File 'lib/pinot/connection.rb', line 54 def prepare(table, query_template) raise ArgumentError, "table name cannot be empty" if table.nil? || table.strip.empty? raise ArgumentError, "query template cannot be empty" if query_template.nil? || query_template.strip.empty? count = query_template.count("?") raise ArgumentError, "query template must contain at least one parameter placeholder (?)" if count == 0 PreparedStatementImpl.new(connection: self, table: table, query_template: query_template) end |
#use_multistage_engine=(val) ⇒ Object
16 17 18 |
# File 'lib/pinot/connection.rb', line 16 def use_multistage_engine=(val) @use_multistage_engine = val end |