Class: Pinot::Paginator

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/pinot/paginator.rb

Overview

Cursor-based pagination using Pinot’s native server-side cursor API.

The broker stores the full result set and returns slices on demand. All fetch requests after the first are pinned to the broker that owns the cursor (brokerHost:brokerPort from the initial response), ensuring correct broker affinity.

Obtaining a Paginator

paginator = conn.paginate(
  "SELECT * FROM myTable WHERE col > 0",
  page_size:     500,    # rows per page (default 1000)
  table:         nil,    # used only for broker selection; omit for single-broker setups
  extra_headers: {}      # merged into every HTTP request
)

Iteration

# Page-by-page (each page is a BrokerResponse):
paginator.each_page { |resp| process(resp.result_table) }

# Row-by-row (each row is an Array of JsonNumber/String cells):
paginator.each_row { |row| puts row.map(&:to_s).join(", ") }

# Enumerable methods work because #each is aliased to #each_row:
rows = paginator.to_a
paginator.select { |row| row.first.to_s.to_i > 100 }

Cursor lifecycle

The cursor is deleted from the broker automatically after the last page is consumed. Call #delete explicitly for early cleanup (e.g. break out of loop). DELETE failures are swallowed — cursors expire naturally on the broker side.

Protocol

  1. POST /query/sql?getCursor=true&numRows=N — submit query, get first page + requestId

  2. GET /responseStore/id/results?offset=K&numRows=N — fetch subsequent pages

  3. DELETE /responseStore/id — release cursor (best-effort)

Constant Summary collapse

DEFAULT_PAGE_SIZE =
1000

Instance Method Summary collapse

Constructor Details

#initialize(http_client, broker_address, query, page_size:, extra_headers: {}) ⇒ Paginator

Returns a new instance of Paginator.

Raises:

  • (ArgumentError)


46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/pinot/paginator.rb', line 46

def initialize(http_client, broker_address, query, page_size:, extra_headers: {})
  raise ArgumentError, "page_size must be a positive integer" unless page_size.is_a?(Integer) && page_size > 0

  @http_client    = http_client
  @broker_address = broker_address
  @query          = query
  @page_size      = page_size
  @extra_headers  = extra_headers

  @request_id  = nil
  @cursor_base = nil  # "http://host:port" — set after first response
  @exhausted   = false
end

Instance Method Details

#deleteObject

Delete the cursor from the broker early (also called automatically after exhaustion).



100
101
102
103
104
105
106
107
108
# File 'lib/pinot/paginator.rb', line 100

def delete
  return unless @request_id && @cursor_base

  url = "#{@cursor_base}/responseStore/#{@request_id}"
  @http_client.delete(url, headers: json_headers)
  @request_id = nil
rescue StandardError
  # best-effort; cursor will expire naturally
end

#each(&block) ⇒ Object Also known as: each_row

Yields each row Array across all pages. Returns an Enumerator without a block. Aliased as #each so Enumerable methods (.map, .select, .to_a, etc.) work.



89
90
91
92
93
94
95
# File 'lib/pinot/paginator.rb', line 89

def each(&block)
  return enum_for(:each) unless block_given?

  each_page do |response|
    response.result_table.rows.each(&block)
  end
end

#each_page {|first| ... } ⇒ Object

Yields each page as a BrokerResponse. Returns an Enumerator without a block.

Yields:

  • (first)


61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/pinot/paginator.rb', line 61

def each_page
  return enum_for(:each_page) unless block_given?

  # Submit the query and get the first page + cursor metadata
  first = submit_cursor
  return if first.result_table.nil? || first.result_table.rows.empty?

  yield first

  fetched = first.num_rows || first.result_table.rows.size
  total   = first.num_rows_result_set || 0

  while fetched < total
    page = fetch_page(fetched)
    rows = page.result_table&.rows || []
    break if rows.empty?

    yield page

    fetched += rows.size
    break if rows.size < @page_size
  end

  delete
end