Module: Pipeloader::Pipeliner

Defined in:
lib/pipeloader/pipeliner.rb

Overview

Runs a batch of SELECTs as one libpq pipelined burst on a PG::Connection.

Class Method Summary collapse

Class Method Details

.drain_block(pg, results, capture_error) ⇒ Object

Reads one pipeline sync block. Collects PGRES_TUPLES_OK into ‘results` (when given) and, when capturing, records the first query error via result.check.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/pipeloader/pipeliner.rb', line 65

def drain_block(pg, results, capture_error)
  error = nil
  loop do
    result = pg.get_result
    break if result.nil?

    status = result.result_status
    break if status == PG::PGRES_PIPELINE_SYNC

    if status == PG::PGRES_TUPLES_OK && results
      # Read values as strings (so we never disturb the connection's type map
      # that AR relies on), and capture each column's result OID + modifier so
      # the Source can resolve the real type via the adapter's get_oid_type —
      # including computed/aliased columns the model has no column type for.
      # ftype/fmod are result metadata, unaffected by the all-strings map.
      result.type_map = PG::TypeMapAllStrings.new
      oids = Array.new(result.nfields) { |i| [result.ftype(i), result.fmod(i)] }
      results << [result.fields, result.values, oids]
    elsif status != PG::PGRES_COMMAND_OK && capture_error
      begin
        result.check
      rescue PG::Error => e
        error ||= e
      end
    end
    pg.get_result # consume this result's nil delimiter
  end
  error
end

.finish_pipeline(pg) ⇒ Object

Leave the connection usable no matter how the batch ended. If the pipeline can’t be drained cleanly (e.g. the connection dropped mid-burst), reset it so the pool gets a healthy connection.



117
118
119
120
121
122
123
124
# File 'lib/pipeloader/pipeliner.rb', line 117

def finish_pipeline(pg)
  return if pg.pipeline_status == PG::PQ_PIPELINE_OFF

  loop { break if pg.get_result.nil? }
  pg.exit_pipeline_mode
rescue PG::Error
  reset_connection(pg)
end

.pipeline_batch(pg, queries) ⇒ Object

queries: array of [sql, params]. Returns array of [columns, rows, oids] (raw string values + per-column [oid, fmod] so the Source can type them), in the same order, having sent them all in a single round trip.

Prepared statements are cached for the lifetime of the REQUEST (the cache and name space are set up by Pipeloader::Trace per multiplex), so a shape is planned once per request and reused across every burst — not re-planned each burst. They’re thrown out when the next request begins: each request’s first burst DEALLOCATEs the previous request’s statements, piggybacked into the same pipeline so cleanup costs no extra round trip. Nothing outlives a request, so no plan goes stale across a reconnect or a migration.

If any query errors, the pipeline is drained to its sync point, the connection is restored to a usable state, and the first error is raised — never swallowed.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/pipeloader/pipeliner.rb', line 22

def pipeline_batch(pg, queries)
  cache = prepared_cache(pg)
  seq = pg.instance_variable_get(:@pipeloader_seq) || 0
  garbage = take_garbage(pg) # previous request's statements, to DEALLOCATE now

  # Name + pipelined Parse for each shape not yet prepared this request.
  to_prepare = []
  queries.each do |sql, _params|
    next if cache.key?(sql)
    name = "pipeloader_#{seq}_#{cache.size}"
    cache[sql] = name
    to_prepare << [name, sql]
  end

  error = nil
  results = []
  begin
    pg.enter_pipeline_mode

    # Block 1 — clean up the previous request, in its own sync so a stale name
    # (after a reconnect) can't abort this burst's real queries. Same round trip.
    unless garbage.empty?
      garbage.each { |name| pg.send_query_params("DEALLOCATE #{name}", []) }
      pg.pipeline_sync
    end

    # Block 2 — prepare new shapes, run every query.
    to_prepare.each { |name, sql| pg.send_prepare(name, sql) }
    queries.each { |sql, params| pg.send_query_prepared(cache[sql], params) }
    pg.pipeline_sync

    drain_block(pg, nil, false) unless garbage.empty? # cleanup results — ignore failures
    error = drain_block(pg, results, true)            # this burst's query results
  ensure
    finish_pipeline(pg)
  end

  raise error if error
  results
end

.prepared_cache(pg) ⇒ Object

The per-request name cache, lazily created (Trace normally seeds it).



96
97
98
99
100
101
102
103
# File 'lib/pipeloader/pipeliner.rb', line 96

def prepared_cache(pg)
  cache = pg.instance_variable_get(:@pipeloader_prepared)
  return cache if cache

  cache = {}
  pg.instance_variable_set(:@pipeloader_prepared, cache)
  cache
end

.reset_connection(pg) ⇒ Object



126
127
128
129
130
131
132
133
134
135
# File 'lib/pipeloader/pipeliner.rb', line 126

def reset_connection(pg)
  pg.reset
rescue PG::Error
  nil
ensure
  # The reset session has no prepared statements: drop the request's name cache
  # so later bursts re-prepare, and the pending garbage that's now gone with it.
  pg.instance_variable_get(:@pipeloader_prepared)&.clear
  pg.instance_variable_set(:@pipeloader_garbage, [])
end

.take_garbage(pg) ⇒ Object

Names left by previous requests, to DEALLOCATE on the next burst (cleared).



106
107
108
109
110
111
112
# File 'lib/pipeloader/pipeliner.rb', line 106

def take_garbage(pg)
  garbage = pg.instance_variable_get(:@pipeloader_garbage)
  return [] unless garbage && !garbage.empty?

  pg.instance_variable_set(:@pipeloader_garbage, [])
  garbage
end