Module: Pipeloader::Pipeliner
- Defined in:
- lib/pipeloader/pipeliner.rb
Overview
Runs a batch of SELECTs as one libpq pipelined burst on a PG::Connection.
Class Method Summary collapse
-
.drain_block(pg, results, capture_error) ⇒ Object
Reads one pipeline sync block.
-
.finish_pipeline(pg) ⇒ Object
Leave the connection usable no matter how the batch ended.
-
.pipeline_batch(pg, queries) ⇒ Object
queries: array of [sql, params].
-
.prepared_cache(pg) ⇒ Object
The per-request name cache, lazily created (Trace normally seeds it).
- .reset_connection(pg) ⇒ Object
-
.take_garbage(pg) ⇒ Object
Names left by previous requests, to DEALLOCATE on the next burst (cleared).
Class Method Details
.drain_block(pg, results, capture_error) ⇒ Object
Reads one pipeline sync block. Collects PGRES_TUPLES_OK into ‘results` (when given) and, when capturing, records the first query error via result.check.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/pipeloader/pipeliner.rb', line 65 def drain_block(pg, results, capture_error) error = nil loop do result = pg.get_result break if result.nil? status = result.result_status break if status == PG::PGRES_PIPELINE_SYNC if status == PG::PGRES_TUPLES_OK && results # Read values as strings (so we never disturb the connection's type map # that AR relies on), and capture each column's result OID + modifier so # the Source can resolve the real type via the adapter's get_oid_type — # including computed/aliased columns the model has no column type for. # ftype/fmod are result metadata, unaffected by the all-strings map. result.type_map = PG::TypeMapAllStrings.new oids = Array.new(result.nfields) { |i| [result.ftype(i), result.fmod(i)] } results << [result.fields, result.values, oids] elsif status != PG::PGRES_COMMAND_OK && capture_error begin result.check rescue PG::Error => e error ||= e end end pg.get_result # consume this result's nil delimiter end error end |
.finish_pipeline(pg) ⇒ Object
Leave the connection usable no matter how the batch ended. If the pipeline can’t be drained cleanly (e.g. the connection dropped mid-burst), reset it so the pool gets a healthy connection.
117 118 119 120 121 122 123 124 |
# File 'lib/pipeloader/pipeliner.rb', line 117 def finish_pipeline(pg) return if pg.pipeline_status == PG::PQ_PIPELINE_OFF loop { break if pg.get_result.nil? } pg.exit_pipeline_mode rescue PG::Error reset_connection(pg) end |
.pipeline_batch(pg, queries) ⇒ Object
queries: array of [sql, params]. Returns array of [columns, rows, oids] (raw string values + per-column [oid, fmod] so the Source can type them), in the same order, having sent them all in a single round trip.
Prepared statements are cached for the lifetime of the REQUEST (the cache and name space are set up by Pipeloader::Trace per multiplex), so a shape is planned once per request and reused across every burst — not re-planned each burst. They’re thrown out when the next request begins: each request’s first burst DEALLOCATEs the previous request’s statements, piggybacked into the same pipeline so cleanup costs no extra round trip. Nothing outlives a request, so no plan goes stale across a reconnect or a migration.
If any query errors, the pipeline is drained to its sync point, the connection is restored to a usable state, and the first error is raised — never swallowed.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/pipeloader/pipeliner.rb', line 22 def pipeline_batch(pg, queries) cache = prepared_cache(pg) seq = pg.instance_variable_get(:@pipeloader_seq) || 0 garbage = take_garbage(pg) # previous request's statements, to DEALLOCATE now # Name + pipelined Parse for each shape not yet prepared this request. to_prepare = [] queries.each do |sql, _params| next if cache.key?(sql) name = "pipeloader_#{seq}_#{cache.size}" cache[sql] = name to_prepare << [name, sql] end error = nil results = [] begin pg.enter_pipeline_mode # Block 1 — clean up the previous request, in its own sync so a stale name # (after a reconnect) can't abort this burst's real queries. Same round trip. unless garbage.empty? garbage.each { |name| pg.send_query_params("DEALLOCATE #{name}", []) } pg.pipeline_sync end # Block 2 — prepare new shapes, run every query. to_prepare.each { |name, sql| pg.send_prepare(name, sql) } queries.each { |sql, params| pg.send_query_prepared(cache[sql], params) } pg.pipeline_sync drain_block(pg, nil, false) unless garbage.empty? # cleanup results — ignore failures error = drain_block(pg, results, true) # this burst's query results ensure finish_pipeline(pg) end raise error if error results end |
.prepared_cache(pg) ⇒ Object
The per-request name cache, lazily created (Trace normally seeds it).
96 97 98 99 100 101 102 103 |
# File 'lib/pipeloader/pipeliner.rb', line 96 def prepared_cache(pg) cache = pg.instance_variable_get(:@pipeloader_prepared) return cache if cache cache = {} pg.instance_variable_set(:@pipeloader_prepared, cache) cache end |
.reset_connection(pg) ⇒ Object
126 127 128 129 130 131 132 133 134 135 |
# File 'lib/pipeloader/pipeliner.rb', line 126 def reset_connection(pg) pg.reset rescue PG::Error nil ensure # The reset session has no prepared statements: drop the request's name cache # so later bursts re-prepare, and the pending garbage that's now gone with it. pg.instance_variable_get(:@pipeloader_prepared)&.clear pg.instance_variable_set(:@pipeloader_garbage, []) end |
.take_garbage(pg) ⇒ Object
Names left by previous requests, to DEALLOCATE on the next burst (cleared).
106 107 108 109 110 111 112 |
# File 'lib/pipeloader/pipeliner.rb', line 106 def take_garbage(pg) garbage = pg.instance_variable_get(:@pipeloader_garbage) return [] unless garbage && !garbage.empty? pg.instance_variable_set(:@pipeloader_garbage, []) garbage end |