Module: Pipeloader::Pipeliner

Defined in:
lib/pipeloader/pipeliner.rb

Overview

Runs a batch of SELECTs as one libpq pipelined burst on a PG::Connection.

Class Method Summary collapse

Class Method Details

.pipeline_batch(pg, queries) ⇒ Object

queries: array of [sql, params]. Returns array of [columns, rows] (raw strings), in the same order, having sent them all in a single round trip.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/pipeloader/pipeliner.rb', line 10

def pipeline_batch(pg, queries)
  prepared = pg.instance_variable_get(:@pipeloader_prepared)
  unless prepared
    prepared = {}
    pg.instance_variable_set(:@pipeloader_prepared, prepared)
  end

  # Prepare any unseen SQL before entering pipeline mode (Parse can't be
  # issued synchronously mid-pipeline). GraphQL has a bounded set of query
  # shapes, so this amortizes to ~one parse per shape for the connection's
  # life; thereafter every execution reuses the named statement.
  queries.each do |sql, _params|
    prepared[sql] ||= begin
      name = "pipeloader_#{prepared.size}"
      pg.prepare(name, sql)
      name
    end
  end

  pg.enter_pipeline_mode
  queries.each { |sql, params| pg.send_query_prepared(prepared[sql], params) }
  pg.pipeline_sync

  results = []
  loop do
    result = pg.get_result
    break if result.nil?
    break if result.result_status == PG::PGRES_PIPELINE_SYNC

    # Raw strings, so ActiveRecord casts via its own column types (and so we
    # never disturb the connection's type map that AR relies on).
    result.type_map = PG::TypeMapAllStrings.new
    results << [result.fields, result.values]
    pg.get_result # per-query nil delimiter
  end
  pg.exit_pipeline_mode
  results
end