Module: Pipeloader::Pipeliner
- Defined in:
- lib/pipeloader/pipeliner.rb
Overview
Runs a batch of SELECTs as one libpq pipelined burst on a PG::Connection.
Class Method Summary collapse
-
.drain_block(pg, results, capture_error) ⇒ Object
Reads one pipeline sync block.
-
.finish_pipeline(pg) ⇒ Object
Leave the connection usable no matter how the batch ended.
-
.pipeline_batch(pg, queries) ⇒ Object
queries: array of [sql, params].
-
.prepared_cache(pg) ⇒ Object
The per-request name cache, lazily created (Trace normally seeds it).
- .reset_connection(pg) ⇒ Object
-
.take_garbage(pg) ⇒ Object
Names left by previous requests, to DEALLOCATE on the next burst (cleared).
Class Method Details
.drain_block(pg, results, capture_error) ⇒ Object
Reads one pipeline sync block. Collects PGRES_TUPLES_OK into ‘results` (when given) and, when capturing, records the first query error via result.check.
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/pipeloader/pipeliner.rb', line 64 def drain_block(pg, results, capture_error) error = nil loop do result = pg.get_result break if result.nil? status = result.result_status break if status == PG::PGRES_PIPELINE_SYNC if status == PG::PGRES_TUPLES_OK && results # Raw strings, so ActiveRecord casts via its own column types (and so we # never disturb the connection's type map that AR relies on). result.type_map = PG::TypeMapAllStrings.new results << [result.fields, result.values] elsif status != PG::PGRES_COMMAND_OK && capture_error begin result.check rescue PG::Error => e error ||= e end end pg.get_result # consume this result's nil delimiter end error end |
.finish_pipeline(pg) ⇒ Object
Leave the connection usable no matter how the batch ended. If the pipeline can’t be drained cleanly (e.g. the connection dropped mid-burst), reset it so the pool gets a healthy connection.
112 113 114 115 116 117 118 119 |
# File 'lib/pipeloader/pipeliner.rb', line 112 def finish_pipeline(pg) return if pg.pipeline_status == PG::PQ_PIPELINE_OFF loop { break if pg.get_result.nil? } pg.exit_pipeline_mode rescue PG::Error reset_connection(pg) end |
.pipeline_batch(pg, queries) ⇒ Object
queries: array of [sql, params]. Returns array of [columns, rows] (raw strings), in the same order, having sent them all in a single round trip.
Prepared statements are cached for the lifetime of the REQUEST (the cache and name space are set up by Pipeloader::Trace per multiplex), so a shape is planned once per request and reused across every burst — not re-planned each burst. They’re thrown out when the next request begins: each request’s first burst DEALLOCATEs the previous request’s statements, piggybacked into the same pipeline so cleanup costs no extra round trip. Nothing outlives a request, so no plan goes stale across a reconnect or a migration.
If any query errors, the pipeline is drained to its sync point, the connection is restored to a usable state, and the first error is raised — never swallowed.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/pipeloader/pipeliner.rb', line 21 def pipeline_batch(pg, queries) cache = prepared_cache(pg) seq = pg.instance_variable_get(:@pipeloader_seq) || 0 garbage = take_garbage(pg) # previous request's statements, to DEALLOCATE now # Name + pipelined Parse for each shape not yet prepared this request. to_prepare = [] queries.each do |sql, _params| next if cache.key?(sql) name = "pipeloader_#{seq}_#{cache.size}" cache[sql] = name to_prepare << [name, sql] end error = nil results = [] begin pg.enter_pipeline_mode # Block 1 — clean up the previous request, in its own sync so a stale name # (after a reconnect) can't abort this burst's real queries. Same round trip. unless garbage.empty? garbage.each { |name| pg.send_query_params("DEALLOCATE #{name}", []) } pg.pipeline_sync end # Block 2 — prepare new shapes, run every query. to_prepare.each { |name, sql| pg.send_prepare(name, sql) } queries.each { |sql, params| pg.send_query_prepared(cache[sql], params) } pg.pipeline_sync drain_block(pg, nil, false) unless garbage.empty? # cleanup results — ignore failures error = drain_block(pg, results, true) # this burst's query results ensure finish_pipeline(pg) end raise error if error results end |
.prepared_cache(pg) ⇒ Object
The per-request name cache, lazily created (Trace normally seeds it).
91 92 93 94 95 96 97 98 |
# File 'lib/pipeloader/pipeliner.rb', line 91 def prepared_cache(pg) cache = pg.instance_variable_get(:@pipeloader_prepared) return cache if cache cache = {} pg.instance_variable_set(:@pipeloader_prepared, cache) cache end |
.reset_connection(pg) ⇒ Object
121 122 123 124 125 126 127 128 129 130 |
# File 'lib/pipeloader/pipeliner.rb', line 121 def reset_connection(pg) pg.reset rescue PG::Error nil ensure # The reset session has no prepared statements: drop the request's name cache # so later bursts re-prepare, and the pending garbage that's now gone with it. pg.instance_variable_get(:@pipeloader_prepared)&.clear pg.instance_variable_set(:@pipeloader_garbage, []) end |
.take_garbage(pg) ⇒ Object
Names left by previous requests, to DEALLOCATE on the next burst (cleared).
101 102 103 104 105 106 107 |
# File 'lib/pipeloader/pipeliner.rb', line 101 def take_garbage(pg) garbage = pg.instance_variable_get(:@pipeloader_garbage) return [] unless garbage && !garbage.empty? pg.instance_variable_set(:@pipeloader_garbage, []) garbage end |