Class: Pinot::Paginator
- Inherits:
-
Object
- Object
- Pinot::Paginator
- Includes:
- Enumerable
- Defined in:
- lib/pinot/paginator.rb
Overview
Cursor-based pagination using Pinot’s native server-side cursor API.
The broker stores the full result set and returns slices on demand. All fetch requests after the first are pinned to the broker that owns the cursor (brokerHost:brokerPort from the initial response), ensuring correct broker affinity.
Obtaining a Paginator
paginator = conn.paginate(
"SELECT * FROM myTable WHERE col > 0",
page_size: 500, # rows per page (default 1000)
table: nil, # used only for broker selection; omit for single-broker setups
extra_headers: {} # merged into every HTTP request
)
Iteration
# Page-by-page (each page is a BrokerResponse):
paginator.each_page { |resp| process(resp.result_table) }
# Row-by-row (each row is an Array of JsonNumber/String cells):
paginator.each_row { |row| puts row.map(&:to_s).join(", ") }
# Enumerable methods work because #each is aliased to #each_row:
rows = paginator.to_a
paginator.select { |row| row.first.to_s.to_i > 100 }
Cursor lifecycle
The cursor is deleted from the broker automatically after the last page is consumed. Call #delete explicitly for early cleanup (e.g. break out of loop). DELETE failures are swallowed — cursors expire naturally on the broker side.
Protocol
-
POST /query/sql?getCursor=true&numRows=N — submit query, get first page + requestId
-
GET /responseStore/id/results?offset=K&numRows=N — fetch subsequent pages
-
DELETE /responseStore/id — release cursor (best-effort)
Constant Summary collapse
- DEFAULT_PAGE_SIZE =
1000
Instance Method Summary collapse
-
#delete ⇒ Object
Delete the cursor from the broker early (also called automatically after exhaustion).
-
#each(&block) ⇒ Object
(also: #each_row)
Yields each row Array across all pages.
-
#each_page {|first| ... } ⇒ Object
Yields each page as a BrokerResponse.
-
#initialize(http_client, broker_address, query, page_size:, extra_headers: {}) ⇒ Paginator
constructor
A new instance of Paginator.
Constructor Details
#initialize(http_client, broker_address, query, page_size:, extra_headers: {}) ⇒ Paginator
Returns a new instance of Paginator.
46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/pinot/paginator.rb', line 46 def initialize(http_client, broker_address, query, page_size:, extra_headers: {}) raise ArgumentError, "page_size must be a positive integer" unless page_size.is_a?(Integer) && page_size > 0 @http_client = http_client @broker_address = broker_address @query = query @page_size = page_size @extra_headers = extra_headers @request_id = nil @cursor_base = nil # "http://host:port" — set after first response @exhausted = false end |
Instance Method Details
#delete ⇒ Object
Delete the cursor from the broker early (also called automatically after exhaustion).
100 101 102 103 104 105 106 107 108 |
# File 'lib/pinot/paginator.rb', line 100 def delete return unless @request_id && @cursor_base url = "#{@cursor_base}/responseStore/#{@request_id}" @http_client.delete(url, headers: json_headers) @request_id = nil rescue StandardError # best-effort; cursor will expire naturally end |
#each(&block) ⇒ Object Also known as: each_row
Yields each row Array across all pages. Returns an Enumerator without a block. Aliased as #each so Enumerable methods (.map, .select, .to_a, etc.) work.
89 90 91 92 93 94 95 |
# File 'lib/pinot/paginator.rb', line 89 def each(&block) return enum_for(:each) unless block_given? each_page do |response| response.result_table.rows.each(&block) end end |
#each_page {|first| ... } ⇒ Object
Yields each page as a BrokerResponse. Returns an Enumerator without a block.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/pinot/paginator.rb', line 61 def each_page return enum_for(:each_page) unless block_given? # Submit the query and get the first page + cursor metadata first = submit_cursor return if first.result_table.nil? || first.result_table.rows.empty? yield first fetched = first.num_rows || first.result_table.rows.size total = first.num_rows_result_set || 0 while fetched < total page = fetch_page(fetched) rows = page.result_table&.rows || [] break if rows.empty? yield page fetched += rows.size break if rows.size < @page_size end delete end |