Module: Pipeloader
- Defined in:
- lib/pipeloader.rb,
lib/pipeloader/batch.rb,
lib/pipeloader/source.rb,
lib/pipeloader/version.rb,
lib/pipeloader/ar_patch.rb,
lib/pipeloader/pipeliner.rb,
lib/pipeloader/batch/model.rb,
lib/pipeloader/field_exact.rb,
lib/pipeloader/batch/context.rb,
lib/pipeloader/batch/fetcher.rb,
lib/pipeloader/batch/batch_proxy.rb,
lib/pipeloader/batch/batch_loader.rb,
lib/pipeloader/batch/relationship.rb,
lib/pipeloader/batch/fetcher_state.rb,
lib/pipeloader/batch/load_grouping.rb,
lib/pipeloader/batch/load_interceptor.rb
Overview
Pipeloader makes a graphql-ruby query resolve its ActiveRecord SELECTs through a libpq pipeline: one round trip per tree level, transparently. Resolvers stay plain AR — no Futures, no dataloader.load, no field changes — because AR’s own query path is intercepted during response building.
class AppSchema < GraphQL::Schema
use Pipeloader # adds GraphQL::Dataloader (fibers) + the AR patch
end
Defined Under Namespace
Modules: ARPatch, Batch, FieldSelects, Pipeliner, Trace, TypeOptIn Classes: FusionExtension, FusionSource, ProjectionExtension, Source
Constant Summary collapse
- VERSION =
"0.0.3"- ARRAY_ENCODER =
PG::TextEncoder::Array.new
Class Attribute Summary collapse
-
.auto_fuse ⇒ Object
Returns the value of attribute auto_fuse.
-
.field_exact ⇒ Object
Returns the value of attribute field_exact.
-
.queries ⇒ Object
Per-query stats (single-threaded; reset at the start of each query).
-
.round_trips ⇒ Object
Per-query stats (single-threaded; reset at the start of each query).
Class Method Summary collapse
-
.any_relation(model, key, values, columns) ⇒ Object
Build ‘<key> = ANY($1)` with a single array bind, so a fused query is one stable prepared statement regardless of batch size (an IN-list is a distinct statement per length and re-plans with a custom plan each execution; ANY(array) plans once as a generic array scan).
-
.bare_association?(reflection, relation) ⇒ Boolean
The relation must be the bare association — nothing chained onto it.
-
.begin_request!(pg) ⇒ Object
Prepared statements are scoped to one request.
-
.end_request!(pg) ⇒ Object
Hand the request’s statements to the next one to DEALLOCATE (piggybacked onto its first burst — no extra round trip here).
-
.fusable_belongs_to?(assoc) ⇒ Boolean
Only fuse when each returned row maps back to exactly one parent with zero ambiguity: a non-polymorphic, unscoped belongs_to keyed by a single-column primary key (unique), on PostgreSQL (the only adapter that pipelines/fuses).
-
.fusable_has_many?(reflection, relation) ⇒ Boolean
Fuse only a plain has_many: no scope, no ‘through`, and no per-parent limit/offset (those need a lateral join, not a flat IN — order is preserved by group_by, so it’s fine).
-
.fusable_has_one?(assoc) ⇒ Boolean
has_one fuses like has_many but with a single-row demux, so it’s exact only when the FK is genuinely 1:1 — proven by a unique index on the FK.
-
.fuse(dataloader, model, kind, key, columns, value) ⇒ Object
Issue one fused association lookup.
-
.pipelining_supported?(conn) ⇒ Boolean
Pipelining is libpq-specific.
-
.project_columns(model, lookahead) ⇒ Object
Returns the exact column list for a model + selection, or nil meaning “can’t prove it’s safe — fetch whole rows.”.
- .reset_stats! ⇒ Object
-
.sql_and_params(relation) ⇒ Object
The [sql, params] an ‘any_relation` would send through the AR patch — pulled out so FusionSource can enqueue it on Pipeloader::Source directly (via `request`, without forcing) instead of letting `.to_a` force one query at a time.
-
.unique_fk_index?(assoc) ⇒ Boolean
A unique index on exactly the FK column proves at most one child per parent.
- .use(schema) ⇒ Object
Class Attribute Details
.auto_fuse ⇒ Object
Returns the value of attribute auto_fuse.
8 9 10 |
# File 'lib/pipeloader/field_exact.rb', line 8 def auto_fuse @auto_fuse end |
.field_exact ⇒ Object
Returns the value of attribute field_exact.
8 9 10 |
# File 'lib/pipeloader/field_exact.rb', line 8 def field_exact @field_exact end |
.queries ⇒ Object
Per-query stats (single-threaded; reset at the start of each query).
23 24 25 |
# File 'lib/pipeloader.rb', line 23 def queries @queries end |
.round_trips ⇒ Object
Per-query stats (single-threaded; reset at the start of each query).
23 24 25 |
# File 'lib/pipeloader.rb', line 23 def round_trips @round_trips end |
Class Method Details
.any_relation(model, key, values, columns) ⇒ Object
Build ‘<key> = ANY($1)` with a single array bind, so a fused query is one stable prepared statement regardless of batch size (an IN-list is a distinct statement per length and re-plans with a custom plan each execution; ANY(array) plans once as a generic array scan). PostgreSQL-specific, which is fine: fusion is the gathering side of pipelining, and only PostgreSQL pipelines (see fusable_* —they gate it). The fused query flows through the AR patch, so it’s pipelined.
165 166 167 168 |
# File 'lib/pipeloader/field_exact.rb', line 165 def self.any_relation(model, key, values, columns) qualified = "#{model.quoted_table_name}.#{model.connection.quote_column_name(key)}" model.where("#{qualified} = ANY(?)", ARRAY_ENCODER.encode(values)).select(*columns) end |
.bare_association?(reflection, relation) ⇒ Boolean
The relation must be the bare association — nothing chained onto it. The fused query is rebuilt from the reflection, so any chained order / where / limit would be silently dropped; comparing the relation’s SQL to the untouched association scope catches all of them at once (order, where, limit, joins, group, …). Ordered or filtered collections fall back to the per-parent query, which keeps them.
289 290 291 292 293 |
# File 'lib/pipeloader/field_exact.rb', line 289 def self.(reflection, relation) owner_key = relation.where_values_hash[reflection.foreign_key] !owner_key.nil? && relation.to_sql == reflection.klass.where(reflection.foreign_key => owner_key).to_sql end |
.begin_request!(pg) ⇒ Object
Prepared statements are scoped to one request. Each multiplex gets a fresh, uniquely-prefixed name space; pipeline_batch fills it in and reuses it across bursts, so a shape is planned once per request rather than once per burst.
60 61 62 63 |
# File 'lib/pipeloader.rb', line 60 def self.begin_request!(pg) pg.instance_variable_set(:@pipeloader_seq, (pg.instance_variable_get(:@pipeloader_seq) || 0) + 1) pg.instance_variable_set(:@pipeloader_prepared, {}) end |
.end_request!(pg) ⇒ Object
Hand the request’s statements to the next one to DEALLOCATE (piggybacked onto its first burst — no extra round trip here). A plan therefore never outlives the request that made it.
68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/pipeloader.rb', line 68 def self.end_request!(pg) cache = pg.instance_variable_get(:@pipeloader_prepared) pg.remove_instance_variable(:@pipeloader_prepared) if pg.instance_variable_defined?(:@pipeloader_prepared) return if cache.nil? || cache.empty? garbage = pg.instance_variable_get(:@pipeloader_garbage) unless garbage garbage = [] pg.instance_variable_set(:@pipeloader_garbage, garbage) end garbage.concat(cache.values) end |
.fusable_belongs_to?(assoc) ⇒ Boolean
Only fuse when each returned row maps back to exactly one parent with zero ambiguity: a non-polymorphic, unscoped belongs_to keyed by a single-column primary key (unique), on PostgreSQL (the only adapter that pipelines/fuses). Anything else keeps the per-parent projected query.
238 239 240 241 242 243 244 |
# File 'lib/pipeloader/field_exact.rb', line 238 def self.fusable_belongs_to?(assoc) assoc.belongs_to? && !assoc.polymorphic? && assoc.scope.nil? && assoc.klass.primary_key.is_a?(String) && assoc.klass.connection.adapter_name == "PostgreSQL" rescue StandardError false end |
.fusable_has_many?(reflection, relation) ⇒ Boolean
Fuse only a plain has_many: no scope, no ‘through`, and no per-parent limit/offset (those need a lateral join, not a flat IN — order is preserved by group_by, so it’s fine). Single-column keys only. Then group_by(fk) is exact.
272 273 274 275 276 277 278 279 280 281 282 |
# File 'lib/pipeloader/field_exact.rb', line 272 def self.fusable_has_many?(reflection, relation) reflection.macro == :has_many && reflection.through_reflection.nil? && reflection.scope.nil? && reflection.foreign_key.is_a?(String) && reflection.active_record_primary_key.is_a?(String) && reflection.klass.connection.adapter_name == "PostgreSQL" && (reflection, relation) rescue StandardError false end |
.fusable_has_one?(assoc) ⇒ Boolean
has_one fuses like has_many but with a single-row demux, so it’s exact only when the FK is genuinely 1:1 — proven by a unique index on the FK. Without it, “first” is ambiguous and we keep the per-parent query.
249 250 251 252 253 254 255 256 |
# File 'lib/pipeloader/field_exact.rb', line 249 def self.fusable_has_one?(assoc) assoc.macro == :has_one && assoc.through_reflection.nil? && assoc.scope.nil? && assoc.foreign_key.is_a?(String) && assoc.active_record_primary_key.is_a?(String) && assoc.klass.connection.adapter_name == "PostgreSQL" && unique_fk_index?(assoc) rescue StandardError false end |
.fuse(dataloader, model, kind, key, columns, value) ⇒ Object
Issue one fused association lookup. ‘kind` is :by_pk (belongs_to, demux by primary key, single), :by_fk_one (has_one, demux by FK, first) or :by_fk_many (has_many, demux by FK, array). All fused lookups on a connection share ONE FusionSource.
182 183 184 185 |
# File 'lib/pipeloader/field_exact.rb', line 182 def self.fuse(dataloader, model, kind, key, columns, value) dataloader.with(FusionSource, model.connection) .load([kind, model, key, columns, value].freeze) end |
.pipelining_supported?(conn) ⇒ Boolean
Pipelining is libpq-specific. PostgreSQL pipelines; SQLite can’t, but the opt-in column projection is plain ActiveRecord and still applies, so SQLite is allowed with pipelining disabled. Any other adapter is unsupported.
Running SQLite un-pipelined is safe because SQLite is embedded: its queries are in-process calls with no network round trip, so there’s nothing for a dataloader or pipeline to collapse. N+1 there is just cheap local calls, not the latency amplification pipelining exists to remove.
47 48 49 50 51 52 53 54 55 |
# File 'lib/pipeloader.rb', line 47 def self.pipelining_supported?(conn) case conn.adapter_name when "PostgreSQL" then true when "SQLite" then false else raise "Pipeloader supports PostgreSQL (pipelined) and SQLite " \ "(field narrowing only); #{conn.adapter_name} is not supported." end end |
.project_columns(model, lookahead) ⇒ Object
Returns the exact column list for a model + selection, or nil meaning “can’t prove it’s safe — fetch whole rows.”
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 |
# File 'lib/pipeloader/field_exact.rb', line 297 def self.project_columns(model, lookahead) columns = model.column_names needed = [model.primary_key] lookahead.selections.each do |sel| field = sel.field if field.respond_to?(:pipeloader_selects) && field.pipeloader_selects needed.concat(field.pipeloader_selects) # explicit escape hatch elsif field.owner.instance_methods(false).include?(field.resolver_method) return nil # custom resolver: opaque elsif columns.include?(field.method_str) needed << field.method_str # plain column elsif (assoc = model.reflect_on_association(field.method_str.to_sym)) needed << assoc.foreign_key if assoc.belongs_to? # FK for a belongs_to # has_many keys off the model's PK, already included else return nil # unknown accessor: opaque end end needed.compact.uniq.map(&:to_s) end |
.reset_stats! ⇒ Object
28 29 30 31 |
# File 'lib/pipeloader.rb', line 28 def self.reset_stats! self.round_trips = 0 self.queries = 0 end |
.sql_and_params(relation) ⇒ Object
The [sql, params] an ‘any_relation` would send through the AR patch — pulled out so FusionSource can enqueue it on Pipeloader::Source directly (via `request`, without forcing) instead of letting `.to_a` force one query at a time.
173 174 175 176 177 |
# File 'lib/pipeloader/field_exact.rb', line 173 def self.sql_and_params(relation) conn = relation.klass.connection sql, binds = conn.send(:to_sql_and_binds, relation.arel) [sql, binds.map { |b| b.respond_to?(:value_for_database) ? b.value_for_database : b }] end |
.unique_fk_index?(assoc) ⇒ Boolean
A unique index on exactly the FK column proves at most one child per parent. Memoized per reflection.
260 261 262 263 264 265 266 267 |
# File 'lib/pipeloader/field_exact.rb', line 260 def self.unique_fk_index?(assoc) @unique_fk_indexes ||= {} return @unique_fk_indexes[assoc.object_id] if @unique_fk_indexes.key?(assoc.object_id) fk = assoc.foreign_key @unique_fk_indexes[assoc.object_id] = assoc.klass.connection.indexes(assoc.klass.table_name).any? { |ix| ix.unique && Array(ix.columns) == [fk] } end |