Class: Trino::Client::StatementClient
- Inherits:
-
Object
- Object
- Trino::Client::StatementClient
- Defined in:
- lib/trino/client/statement_client.rb
Constant Summary collapse
- JSON_OPTIONS =
Trino can return too deep nested JSON
{ :max_nesting => false }
- RETRYABLE_STATUSES =
[502, 503, 504]
Instance Attribute Summary collapse
-
#query ⇒ Object
readonly
Returns the value of attribute query.
Instance Method Summary collapse
- #advance ⇒ Object
- #cancel_leaf_stage ⇒ Object
- #client_aborted? ⇒ Boolean
- #client_error? ⇒ Boolean
- #close ⇒ Object
- #current_results ⇒ Object
- #current_results_headers ⇒ Object
- #debug? ⇒ Boolean
- #exception!(e) ⇒ Object
- #faraday_get_with_retry(uri) ⇒ Object
- #finished? ⇒ Boolean
- #has_next? ⇒ Boolean
-
#initialize(faraday, query, options, next_uri = nil) ⇒ StatementClient
constructor
A new instance of StatementClient.
- #query_failed? ⇒ Boolean
- #query_id ⇒ Object
- #query_info ⇒ Object
- #query_succeeded? ⇒ Boolean
- #raise_if_timeout! ⇒ Object
- #raise_timeout_error! ⇒ Object
- #running? ⇒ Boolean
Constructor Details
#initialize(faraday, query, options, next_uri = nil) ⇒ StatementClient
Returns a new instance of StatementClient.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/trino/client/statement_client.rb', line 31 def initialize(faraday, query, , next_uri=nil) @faraday = faraday @options = @query = query @state = :running @retry_timeout = [:retry_timeout] || 120 if model_version = @options[:model_version] @models = ModelVersions.const_get("V#{model_version.to_s.gsub(".", "_")}") else @models = Models end @plan_timeout = [:plan_timeout] @query_timeout = [:query_timeout] if @plan_timeout || @query_timeout # this is set before the first call of faraday_get_with_retry so that # resuming StatementClient with next_uri is also under timeout control. @started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) end if next_uri response = faraday_get_with_retry(next_uri) @results_headers = response.headers @results = @models::QueryResults.decode(parse_body(response)) else post_query_request! end end |
Instance Attribute Details
#query ⇒ Object (readonly)
Returns the value of attribute query.
104 105 106 |
# File 'lib/trino/client/statement_client.rb', line 104 def query @query end |
Instance Method Details
#advance ⇒ Object
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/trino/client/statement_client.rb', line 155 def advance return false unless running? unless has_next? @state = :finished return false end uri = @results.next_uri response = faraday_get_with_retry(uri) @results_headers = response.headers @results = decode_model(uri, parse_body(response), @models::QueryResults) raise_if_timeout! return true end |
#cancel_leaf_stage ⇒ Object
278 279 280 281 282 283 284 |
# File 'lib/trino/client/statement_client.rb', line 278 def cancel_leaf_stage if uri = @results.partial_cancel_uri @faraday.delete do |req| req.url uri end end end |
#client_aborted? ⇒ Boolean
114 115 116 |
# File 'lib/trino/client/statement_client.rb', line 114 def client_aborted? @state == :client_aborted end |
#client_error? ⇒ Boolean
118 119 120 |
# File 'lib/trino/client/statement_client.rb', line 118 def client_error? @state == :client_error end |
#close ⇒ Object
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 |
# File 'lib/trino/client/statement_client.rb', line 286 def close return unless running? @state = :client_aborted begin if uri = @results.next_uri @faraday.delete do |req| req.url uri end end rescue => e end nil end |
#current_results ⇒ Object
134 135 136 |
# File 'lib/trino/client/statement_client.rb', line 134 def current_results @results end |
#current_results_headers ⇒ Object
138 139 140 |
# File 'lib/trino/client/statement_client.rb', line 138 def current_results_headers @results_headers end |
#debug? ⇒ Boolean
106 107 108 |
# File 'lib/trino/client/statement_client.rb', line 106 def debug? !!@options[:debug] end |
#exception!(e) ⇒ Object
150 151 152 153 |
# File 'lib/trino/client/statement_client.rb', line 150 def exception!(e) @state = :client_error raise e end |
#faraday_get_with_retry(uri) ⇒ Object
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
# File 'lib/trino/client/statement_client.rb', line 229 def faraday_get_with_retry(uri) with_retry_loop do begin response = @faraday.get(uri) rescue Faraday::TimeoutError, Faraday::ConnectionFailed throw :retry_with_backoff rescue => e exception! e end if response.status == 200 && !response.body.to_s.empty? response elsif RETRYABLE_STATUSES.include?(response.status) throw :retry_with_backoff else exception! TrinoHttpError.new(response.status, "Trino API error at #{uri} returned #{response.status}: #{response.body}") end end end |
#finished? ⇒ Boolean
122 123 124 |
# File 'lib/trino/client/statement_client.rb', line 122 def finished? @state == :finished end |
#has_next? ⇒ Boolean
146 147 148 |
# File 'lib/trino/client/statement_client.rb', line 146 def has_next? !!@results.next_uri end |
#query_failed? ⇒ Boolean
126 127 128 |
# File 'lib/trino/client/statement_client.rb', line 126 def query_failed? @results.error != nil end |
#query_id ⇒ Object
142 143 144 |
# File 'lib/trino/client/statement_client.rb', line 142 def query_id @results.id end |
#query_info ⇒ Object
174 175 176 177 178 |
# File 'lib/trino/client/statement_client.rb', line 174 def query_info uri = "/v1/query/#{@results.id}" response = faraday_get_with_retry(uri) decode_model(uri, parse_body(response), @models::QueryInfo) end |
#query_succeeded? ⇒ Boolean
130 131 132 |
# File 'lib/trino/client/statement_client.rb', line 130 def query_succeeded? @results.error == nil && finished? end |
#raise_if_timeout! ⇒ Object
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/trino/client/statement_client.rb', line 249 def raise_if_timeout! if @started_at return if finished? elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - @started_at if @query_timeout && elapsed > @query_timeout close raise_timeout_error! end if @plan_timeout && (@results == nil || @results.columns == nil) && elapsed > @plan_timeout # @results is not set (even first faraday_get_with_retry isn't called yet) or # result from Trino doesn't include result schema. Query planning isn't done yet. close raise_timeout_error! end end end |
#raise_timeout_error! ⇒ Object
270 271 272 273 274 275 276 |
# File 'lib/trino/client/statement_client.rb', line 270 def raise_timeout_error! if query_id = @results && @results.id exception! TrinoQueryTimeoutError.new("Query #{query_id} timed out") else exception! TrinoQueryTimeoutError.new("Query timed out") end end |
#running? ⇒ Boolean
110 111 112 |
# File 'lib/trino/client/statement_client.rb', line 110 def running? @state == :running end |