Module: Pipeloader::Pipeliner
- Defined in:
- lib/pipeloader/pipeliner.rb
Overview
Runs a batch of SELECTs as one libpq pipelined burst on a PG::Connection.
Class Method Summary collapse
-
.pipeline_batch(pg, queries) ⇒ Object
queries: array of [sql, params].
Class Method Details
.pipeline_batch(pg, queries) ⇒ Object
queries: array of [sql, params]. Returns array of [columns, rows] (raw strings), in the same order, having sent them all in a single round trip.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/pipeloader/pipeliner.rb', line 10 def pipeline_batch(pg, queries) prepared = pg.instance_variable_get(:@pipeloader_prepared) unless prepared prepared = {} pg.instance_variable_set(:@pipeloader_prepared, prepared) end # Prepare any unseen SQL before entering pipeline mode (Parse can't be # issued synchronously mid-pipeline). GraphQL has a bounded set of query # shapes, so this amortizes to ~one parse per shape for the connection's # life; thereafter every execution reuses the named statement. queries.each do |sql, _params| prepared[sql] ||= begin name = "pipeloader_#{prepared.size}" pg.prepare(name, sql) name end end pg.enter_pipeline_mode queries.each { |sql, params| pg.send_query_prepared(prepared[sql], params) } pg.pipeline_sync results = [] loop do result = pg.get_result break if result.nil? break if result.result_status == PG::PGRES_PIPELINE_SYNC # Raw strings, so ActiveRecord casts via its own column types (and so we # never disturb the connection's type map that AR relies on). result.type_map = PG::TypeMapAllStrings.new results << [result.fields, result.values] pg.get_result # per-query nil delimiter end pg.exit_pipeline_mode results end |