Module: Pipeloader::Pipeliner

Defined in:
lib/pipeloader/pipeliner.rb

Overview

Runs a batch of SELECTs as one libpq pipelined burst on a PG::Connection.

Class Method Summary collapse

Class Method Details

.drain_block(pg, results, capture_error) ⇒ Object

Reads one pipeline sync block. Collects PGRES_TUPLES_OK into ‘results` (when given) and, when capturing, records the first query error via result.check.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/pipeloader/pipeliner.rb', line 64

def drain_block(pg, results, capture_error)
  error = nil
  loop do
    result = pg.get_result
    break if result.nil?

    status = result.result_status
    break if status == PG::PGRES_PIPELINE_SYNC

    if status == PG::PGRES_TUPLES_OK && results
      # Raw strings, so ActiveRecord casts via its own column types (and so we
      # never disturb the connection's type map that AR relies on).
      result.type_map = PG::TypeMapAllStrings.new
      results << [result.fields, result.values]
    elsif status != PG::PGRES_COMMAND_OK && capture_error
      begin
        result.check
      rescue PG::Error => e
        error ||= e
      end
    end
    pg.get_result # consume this result's nil delimiter
  end
  error
end

.finish_pipeline(pg) ⇒ Object

Leave the connection usable no matter how the batch ended. If the pipeline can’t be drained cleanly (e.g. the connection dropped mid-burst), reset it so the pool gets a healthy connection.



112
113
114
115
116
117
118
119
# File 'lib/pipeloader/pipeliner.rb', line 112

def finish_pipeline(pg)
  return if pg.pipeline_status == PG::PQ_PIPELINE_OFF

  loop { break if pg.get_result.nil? }
  pg.exit_pipeline_mode
rescue PG::Error
  reset_connection(pg)
end

.pipeline_batch(pg, queries) ⇒ Object

queries: array of [sql, params]. Returns array of [columns, rows] (raw strings), in the same order, having sent them all in a single round trip.

Prepared statements are cached for the lifetime of the REQUEST (the cache and name space are set up by Pipeloader::Trace per multiplex), so a shape is planned once per request and reused across every burst — not re-planned each burst. They’re thrown out when the next request begins: each request’s first burst DEALLOCATEs the previous request’s statements, piggybacked into the same pipeline so cleanup costs no extra round trip. Nothing outlives a request, so no plan goes stale across a reconnect or a migration.

If any query errors, the pipeline is drained to its sync point, the connection is restored to a usable state, and the first error is raised — never swallowed.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/pipeloader/pipeliner.rb', line 21

def pipeline_batch(pg, queries)
  cache = prepared_cache(pg)
  seq = pg.instance_variable_get(:@pipeloader_seq) || 0
  garbage = take_garbage(pg) # previous request's statements, to DEALLOCATE now

  # Name + pipelined Parse for each shape not yet prepared this request.
  to_prepare = []
  queries.each do |sql, _params|
    next if cache.key?(sql)
    name = "pipeloader_#{seq}_#{cache.size}"
    cache[sql] = name
    to_prepare << [name, sql]
  end

  error = nil
  results = []
  begin
    pg.enter_pipeline_mode

    # Block 1 — clean up the previous request, in its own sync so a stale name
    # (after a reconnect) can't abort this burst's real queries. Same round trip.
    unless garbage.empty?
      garbage.each { |name| pg.send_query_params("DEALLOCATE #{name}", []) }
      pg.pipeline_sync
    end

    # Block 2 — prepare new shapes, run every query.
    to_prepare.each { |name, sql| pg.send_prepare(name, sql) }
    queries.each { |sql, params| pg.send_query_prepared(cache[sql], params) }
    pg.pipeline_sync

    drain_block(pg, nil, false) unless garbage.empty? # cleanup results — ignore failures
    error = drain_block(pg, results, true)            # this burst's query results
  ensure
    finish_pipeline(pg)
  end

  raise error if error
  results
end

.prepared_cache(pg) ⇒ Object

The per-request name cache, lazily created (Trace normally seeds it).



91
92
93
94
95
96
97
98
# File 'lib/pipeloader/pipeliner.rb', line 91

def prepared_cache(pg)
  cache = pg.instance_variable_get(:@pipeloader_prepared)
  return cache if cache

  cache = {}
  pg.instance_variable_set(:@pipeloader_prepared, cache)
  cache
end

.reset_connection(pg) ⇒ Object



121
122
123
124
125
126
127
128
129
130
# File 'lib/pipeloader/pipeliner.rb', line 121

def reset_connection(pg)
  pg.reset
rescue PG::Error
  nil
ensure
  # The reset session has no prepared statements: drop the request's name cache
  # so later bursts re-prepare, and the pending garbage that's now gone with it.
  pg.instance_variable_get(:@pipeloader_prepared)&.clear
  pg.instance_variable_set(:@pipeloader_garbage, [])
end

.take_garbage(pg) ⇒ Object

Names left by previous requests, to DEALLOCATE on the next burst (cleared).



101
102
103
104
105
106
107
# File 'lib/pipeloader/pipeliner.rb', line 101

def take_garbage(pg)
  garbage = pg.instance_variable_get(:@pipeloader_garbage)
  return [] unless garbage && !garbage.empty?

  pg.instance_variable_set(:@pipeloader_garbage, [])
  garbage
end