Module: Pipeloader

Defined in:
lib/pipeloader.rb,
lib/pipeloader/batch.rb,
lib/pipeloader/source.rb,
lib/pipeloader/version.rb,
lib/pipeloader/ar_patch.rb,
lib/pipeloader/pipeliner.rb,
lib/pipeloader/batch/model.rb,
lib/pipeloader/field_exact.rb,
lib/pipeloader/batch/context.rb,
lib/pipeloader/batch/fetcher.rb,
lib/pipeloader/batch/batch_proxy.rb,
lib/pipeloader/batch/batch_loader.rb,
lib/pipeloader/batch/relationship.rb,
lib/pipeloader/batch/fetcher_state.rb,
lib/pipeloader/batch/load_grouping.rb,
lib/pipeloader/batch/load_interceptor.rb

Overview

Pipeloader makes a graphql-ruby query resolve its ActiveRecord SELECTs through a libpq pipeline: one round trip per tree level, transparently. Resolvers stay plain AR — no Futures, no dataloader.load, no field changes — because AR’s own query path is intercepted during response building.

class AppSchema < GraphQL::Schema
  use Pipeloader     # adds GraphQL::Dataloader (fibers) + the AR patch
end

Defined Under Namespace

Modules: ARPatch, Batch, FieldSelects, Pipeliner, Trace, TypeOptIn Classes: FusionExtension, FusionSource, ProjectionExtension, Source

Constant Summary collapse

VERSION =
"0.0.2"
ARRAY_ENCODER =
PG::TextEncoder::Array.new

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.auto_fuseObject

Returns the value of attribute auto_fuse.



8
9
10
# File 'lib/pipeloader/field_exact.rb', line 8

def auto_fuse
  @auto_fuse
end

.field_exactObject

Returns the value of attribute field_exact.



8
9
10
# File 'lib/pipeloader/field_exact.rb', line 8

def field_exact
  @field_exact
end

.queriesObject

Per-query stats (single-threaded; reset at the start of each query).



23
24
25
# File 'lib/pipeloader.rb', line 23

def queries
  @queries
end

.round_tripsObject

Per-query stats (single-threaded; reset at the start of each query).



23
24
25
# File 'lib/pipeloader.rb', line 23

def round_trips
  @round_trips
end

Class Method Details

.any_relation(model, key, values, columns) ⇒ Object

Build ‘<key> = ANY($1)` with a single array bind, so a fused query is one stable prepared statement regardless of batch size (an IN-list is a distinct statement per length and re-plans with a custom plan each execution; ANY(array) plans once as a generic array scan). PostgreSQL-specific, which is fine: fusion is the gathering side of pipelining, and only PostgreSQL pipelines (see fusable_* —they gate it). The fused query flows through the AR patch, so it’s pipelined.



165
166
167
168
# File 'lib/pipeloader/field_exact.rb', line 165

def self.any_relation(model, key, values, columns)
  qualified = "#{model.quoted_table_name}.#{model.connection.quote_column_name(key)}"
  model.where("#{qualified} = ANY(?)", ARRAY_ENCODER.encode(values)).select(*columns)
end

.bare_association?(reflection, relation) ⇒ Boolean

The relation must be the bare association — nothing chained onto it. The fused query is rebuilt from the reflection, so any chained order / where / limit would be silently dropped; comparing the relation’s SQL to the untouched association scope catches all of them at once (order, where, limit, joins, group, …). Ordered or filtered collections fall back to the per-parent query, which keeps them.

Returns:

  • (Boolean)


289
290
291
292
293
# File 'lib/pipeloader/field_exact.rb', line 289

def self.bare_association?(reflection, relation)
  owner_key = relation.where_values_hash[reflection.foreign_key]
  !owner_key.nil? &&
    relation.to_sql == reflection.klass.where(reflection.foreign_key => owner_key).to_sql
end

.begin_request!(pg) ⇒ Object

Prepared statements are scoped to one request. Each multiplex gets a fresh, uniquely-prefixed name space; pipeline_batch fills it in and reuses it across bursts, so a shape is planned once per request rather than once per burst.



60
61
62
63
# File 'lib/pipeloader.rb', line 60

def self.begin_request!(pg)
  pg.instance_variable_set(:@pipeloader_seq, (pg.instance_variable_get(:@pipeloader_seq) || 0) + 1)
  pg.instance_variable_set(:@pipeloader_prepared, {})
end

.end_request!(pg) ⇒ Object

Hand the request’s statements to the next one to DEALLOCATE (piggybacked onto its first burst — no extra round trip here). A plan therefore never outlives the request that made it.



68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/pipeloader.rb', line 68

def self.end_request!(pg)
  cache = pg.instance_variable_get(:@pipeloader_prepared)
  pg.remove_instance_variable(:@pipeloader_prepared) if pg.instance_variable_defined?(:@pipeloader_prepared)
  return if cache.nil? || cache.empty?

  garbage = pg.instance_variable_get(:@pipeloader_garbage)
  unless garbage
    garbage = []
    pg.instance_variable_set(:@pipeloader_garbage, garbage)
  end
  garbage.concat(cache.values)
end

.fusable_belongs_to?(assoc) ⇒ Boolean

Only fuse when each returned row maps back to exactly one parent with zero ambiguity: a non-polymorphic, unscoped belongs_to keyed by a single-column primary key (unique), on PostgreSQL (the only adapter that pipelines/fuses). Anything else keeps the per-parent projected query.

Returns:

  • (Boolean)


238
239
240
241
242
243
244
# File 'lib/pipeloader/field_exact.rb', line 238

def self.fusable_belongs_to?(assoc)
  assoc.belongs_to? && !assoc.polymorphic? && assoc.scope.nil? &&
    assoc.klass.primary_key.is_a?(String) &&
    assoc.klass.connection.adapter_name == "PostgreSQL"
rescue StandardError
  false
end

.fusable_has_many?(reflection, relation) ⇒ Boolean

Fuse only a plain has_many: no scope, no ‘through`, and no per-parent limit/offset (those need a lateral join, not a flat IN — order is preserved by group_by, so it’s fine). Single-column keys only. Then group_by(fk) is exact.

Returns:

  • (Boolean)


272
273
274
275
276
277
278
279
280
281
282
# File 'lib/pipeloader/field_exact.rb', line 272

def self.fusable_has_many?(reflection, relation)
  reflection.macro == :has_many &&
    reflection.through_reflection.nil? &&
    reflection.scope.nil? &&
    reflection.foreign_key.is_a?(String) &&
    reflection.active_record_primary_key.is_a?(String) &&
    reflection.klass.connection.adapter_name == "PostgreSQL" &&
    bare_association?(reflection, relation)
rescue StandardError
  false
end

.fusable_has_one?(assoc) ⇒ Boolean

has_one fuses like has_many but with a single-row demux, so it’s exact only when the FK is genuinely 1:1 — proven by a unique index on the FK. Without it, “first” is ambiguous and we keep the per-parent query.

Returns:

  • (Boolean)


249
250
251
252
253
254
255
256
# File 'lib/pipeloader/field_exact.rb', line 249

def self.fusable_has_one?(assoc)
  assoc.macro == :has_one && assoc.through_reflection.nil? && assoc.scope.nil? &&
    assoc.foreign_key.is_a?(String) && assoc.active_record_primary_key.is_a?(String) &&
    assoc.klass.connection.adapter_name == "PostgreSQL" &&
    unique_fk_index?(assoc)
rescue StandardError
  false
end

.fuse(dataloader, model, kind, key, columns, value) ⇒ Object

Issue one fused association lookup. ‘kind` is :by_pk (belongs_to, demux by primary key, single), :by_fk_one (has_one, demux by FK, first) or :by_fk_many (has_many, demux by FK, array). All fused lookups on a connection share ONE FusionSource.



182
183
184
185
# File 'lib/pipeloader/field_exact.rb', line 182

def self.fuse(dataloader, model, kind, key, columns, value)
  dataloader.with(FusionSource, model.connection.raw_connection)
            .load([kind, model, key, columns, value].freeze)
end

.pipelining_supported?(conn) ⇒ Boolean

Pipelining is libpq-specific. PostgreSQL pipelines; SQLite can’t, but the opt-in column projection is plain ActiveRecord and still applies, so SQLite is allowed with pipelining disabled. Any other adapter is unsupported.

Running SQLite un-pipelined is safe because SQLite is embedded: its queries are in-process calls with no network round trip, so there’s nothing for a dataloader or pipeline to collapse. N+1 there is just cheap local calls, not the latency amplification pipelining exists to remove.

Returns:

  • (Boolean)


47
48
49
50
51
52
53
54
55
# File 'lib/pipeloader.rb', line 47

def self.pipelining_supported?(conn)
  case conn.adapter_name
  when "PostgreSQL" then true
  when "SQLite"     then false
  else
    raise "Pipeloader supports PostgreSQL (pipelined) and SQLite " \
          "(field narrowing only); #{conn.adapter_name} is not supported."
  end
end

.project_columns(model, lookahead) ⇒ Object

Returns the exact column list for a model + selection, or nil meaning “can’t prove it’s safe — fetch whole rows.”



297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
# File 'lib/pipeloader/field_exact.rb', line 297

def self.project_columns(model, lookahead)
  columns = model.column_names
  needed = [model.primary_key]
  lookahead.selections.each do |sel|
    field = sel.field
    if field.respond_to?(:pipeloader_selects) && field.pipeloader_selects
      needed.concat(field.pipeloader_selects)                 # explicit escape hatch
    elsif field.owner.instance_methods(false).include?(field.resolver_method)
      return nil                                              # custom resolver: opaque
    elsif columns.include?(field.method_str)
      needed << field.method_str                              # plain column
    elsif (assoc = model.reflect_on_association(field.method_str.to_sym))
      needed << assoc.foreign_key if assoc.belongs_to?        # FK for a belongs_to
      # has_many keys off the model's PK, already included
    else
      return nil                                              # unknown accessor: opaque
    end
  end
  needed.compact.uniq.map(&:to_s)
end

.reset_stats!Object



28
29
30
31
# File 'lib/pipeloader.rb', line 28

def self.reset_stats!
  self.round_trips = 0
  self.queries = 0
end

.sql_and_params(relation) ⇒ Object

The [sql, params] an ‘any_relation` would send through the AR patch — pulled out so FusionSource can enqueue it on Pipeloader::Source directly (via `request`, without forcing) instead of letting `.to_a` force one query at a time.



173
174
175
176
177
# File 'lib/pipeloader/field_exact.rb', line 173

def self.sql_and_params(relation)
  conn = relation.klass.connection
  sql, binds = conn.send(:to_sql_and_binds, relation.arel)
  [sql, binds.map { |b| b.respond_to?(:value_for_database) ? b.value_for_database : b }]
end

.unique_fk_index?(assoc) ⇒ Boolean

A unique index on exactly the FK column proves at most one child per parent. Memoized per reflection.

Returns:

  • (Boolean)


260
261
262
263
264
265
266
267
# File 'lib/pipeloader/field_exact.rb', line 260

def self.unique_fk_index?(assoc)
  @unique_fk_indexes ||= {}
  return @unique_fk_indexes[assoc.object_id] if @unique_fk_indexes.key?(assoc.object_id)

  fk = assoc.foreign_key
  @unique_fk_indexes[assoc.object_id] =
    assoc.klass.connection.indexes(assoc.klass.table_name).any? { |ix| ix.unique && Array(ix.columns) == [fk] }
end

.use(schema) ⇒ Object



33
34
35
36
37
# File 'lib/pipeloader.rb', line 33

def self.use(schema)
  schema.use(GraphQL::Dataloader) # run resolution in fibers so SELECTs can gather
  ARPatch.install!
  schema.trace_with(Trace)
end