Class: Pinot::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/pinot/connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(transport:, broker_selector:, use_multistage_engine: false, logger: nil, query_timeout_ms: nil, circuit_breaker_registry: nil) ⇒ Connection

Returns a new instance of Connection.



7
8
9
10
11
12
13
14
15
16
# File 'lib/pinot/connection.rb', line 7

def initialize(transport:, broker_selector:, use_multistage_engine: false, logger: nil,
               query_timeout_ms: nil, circuit_breaker_registry: nil)
  @transport = transport
  @broker_selector = broker_selector
  @use_multistage_engine = use_multistage_engine
  @trace = false
  @logger = logger
  @query_timeout_ms = query_timeout_ms
  @circuit_breaker_registry = circuit_breaker_registry
end

Instance Attribute Details

#query_timeout_msObject

Returns the value of attribute query_timeout_ms.



5
6
7
# File 'lib/pinot/connection.rb', line 5

def query_timeout_ms
  @query_timeout_ms
end

Instance Method Details

#close_traceObject



26
27
28
# File 'lib/pinot/connection.rb', line 26

def close_trace
  @trace = false
end

#execute_sql(table, query, query_timeout_ms: nil, headers: {}) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
# File 'lib/pinot/connection.rb', line 30

def execute_sql(table, query, query_timeout_ms: nil, headers: {})
  Pinot::Instrumentation.instrument(table: table, query: query) do
    logger.debug "Executing SQL on table=#{table}: #{query}"
    broker = @broker_selector.select_broker(table)
    effective_timeout = query_timeout_ms || @query_timeout_ms
    run_with_circuit_breaker(broker) do
      @transport.execute(broker, build_request(query, timeout_ms: effective_timeout),
                         extra_request_headers: headers)
    end
  end
end

#execute_sql_with_params(table, query_pattern, params, query_timeout_ms: nil, headers: {}) ⇒ Object



46
47
48
49
50
51
# File 'lib/pinot/connection.rb', line 46

def execute_sql_with_params(table, query_pattern, params, query_timeout_ms: nil, headers: {})
  query = format_query(query_pattern, params)
  execute_sql(table, query, query_timeout_ms: query_timeout_ms, headers: headers)
rescue => e
  raise e
end

#execute_sql_with_timeout(table, query, timeout_ms) ⇒ Object



42
43
44
# File 'lib/pinot/connection.rb', line 42

def execute_sql_with_timeout(table, query, timeout_ms)
  execute_sql(table, query, query_timeout_ms: timeout_ms)
end

#format_arg(value) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/pinot/connection.rb', line 80

def format_arg(value)
  case value
  when String
    "'#{value.gsub("'", "''")}'"
  when Integer
    value.to_s
  when Float
    value.to_s
  when TrueClass, FalseClass
    value.to_s
  when BigDecimal
    s = value.to_s("F")
    # Strip trailing .0 for whole numbers (mirrors Go's big.Int quoted format)
    s = s.sub(/\.0\z/, "") if s.end_with?(".0")
    "'#{s}'"
  when Time
    "'#{value.utc.strftime("%Y-%m-%d %H:%M:%S.") + format("%03d", value.utc.subsec * 1000)}'"
  else
    raise "unsupported type: #{value.class}"
  end
end

#format_query(pattern, params) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/pinot/connection.rb', line 61

def format_query(pattern, params)
  params ||= []
  placeholders = pattern.count("?")
  if placeholders != params.length
    raise "failed to format query: number of placeholders in queryPattern (#{placeholders}) does not match number of params (#{params.length})"
  end
  parts = pattern.split("?", -1)
  result = ""
  params.each_with_index do |param, i|
    formatted = begin
      format_arg(param)
    rescue => e
      raise "failed to format query: failed to format parameter: #{e.message}"
    end
    result += parts[i] + formatted
  end
  result + parts.last
end

#open_traceObject



22
23
24
# File 'lib/pinot/connection.rb', line 22

def open_trace
  @trace = true
end

#prepare(table, query_template) ⇒ Object

Raises:

  • (ArgumentError)


53
54
55
56
57
58
59
# File 'lib/pinot/connection.rb', line 53

def prepare(table, query_template)
  raise ArgumentError, "table name cannot be empty" if table.nil? || table.strip.empty?
  raise ArgumentError, "query template cannot be empty" if query_template.nil? || query_template.strip.empty?
  count = query_template.count("?")
  raise ArgumentError, "query template must contain at least one parameter placeholder (?)" if count == 0
  PreparedStatementImpl.new(connection: self, table: table, query_template: query_template)
end

#use_multistage_engine=(val) ⇒ Object



18
19
20
# File 'lib/pinot/connection.rb', line 18

def use_multistage_engine=(val)
  @use_multistage_engine = val
end