Class: Parse::Aggregation

Inherits:
Object
  • Object
show all
Defined in:
lib/parse/query.rb

Overview

Helper class for executing arbitrary MongoDB aggregation pipelines. Provides a consistent interface with results, raw, and result_pointers methods.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(query, pipeline, verbose: nil, mongo_direct: false, max_time_ms: nil) ⇒ Aggregation

Returns a new instance of Aggregation.

Parameters:

  • query (Parse::Query)

    the base query object

  • pipeline (Array<Hash>)

    the MongoDB aggregation pipeline stages

  • verbose (Boolean, nil) (defaults to: nil)

    whether to print verbose output (nil means use query's setting)

  • mongo_direct (Boolean) (defaults to: false)

    if true, uses MongoDB directly bypassing Parse Server (required for $literal)

  • max_time_ms (Integer, nil) (defaults to: nil)

    optional server-side time limit in milliseconds passed to MongoDB.aggregate when mongo_direct is true. Pass +nil+ (the default) for no cap.



5459
5460
5461
5462
5463
5464
5465
5466
5467
# File 'lib/parse/query.rb', line 5459

def initialize(query, pipeline, verbose: nil, mongo_direct: false, max_time_ms: nil)
  @query = query
  @pipeline = pipeline
  @cached_response = nil
  @mongo_direct = mongo_direct
  @max_time_ms = max_time_ms
  # Use provided verbose setting, or fall back to query's verbose_aggregate setting
  @verbose = verbose.nil? ? @query.instance_variable_get(:@verbose_aggregate) : verbose
end

Instance Attribute Details

#mongo_directBoolean (readonly)

Returns whether #execute! will route through MongoDB.aggregate instead of Parse Server's REST /aggregate endpoint.

Returns:



5451
5452
5453
# File 'lib/parse/query.rb', line 5451

def mongo_direct
  @mongo_direct
end

#pipelineArray<Hash> (readonly)

Returns the MongoDB aggregation pipeline stages this Aggregation will execute. Useful for previewing the routed pipeline before #execute!, for snapshot-based regression tests, and for debugging the REST-vs-mongo-direct translation.

Returns:

  • (Array<Hash>)

    the MongoDB aggregation pipeline stages this Aggregation will execute. Useful for previewing the routed pipeline before #execute!, for snapshot-based regression tests, and for debugging the REST-vs-mongo-direct translation.



5446
5447
5448
# File 'lib/parse/query.rb', line 5446

def pipeline
  @pipeline
end

Instance Method Details

#add_stages(*stages) ⇒ Aggregation

Add additional pipeline stages

Parameters:

  • stages (Array<Hash>)

    additional pipeline stages to append

Returns:



5664
5665
5666
5667
5668
# File 'lib/parse/query.rb', line 5664

def add_stages(*stages)
  @pipeline.concat(stages.flatten)
  @cached_response = nil # Clear cache when pipeline changes
  self
end

#any?Boolean

Check if there are any results

Returns:

  • (Boolean)

    true if there are results



5651
5652
5653
# File 'lib/parse/query.rb', line 5651

def any?
  count > 0
end

#countInteger

Returns the count of results

Returns:

  • (Integer)

    the number of results



5640
5641
5642
5643
5644
5645
5646
5647
# File 'lib/parse/query.rb', line 5640

def count
  response = execute!
  if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
    response.nil? ? 0 : response.count
  else
    response.error? ? 0 : response.result.count
  end
end

#empty?Boolean

Check if there are no results

Returns:

  • (Boolean)

    true if there are no results



5657
5658
5659
# File 'lib/parse/query.rb', line 5657

def empty?
  count == 0
end

#execute!Parse::Response, Array

Execute the aggregation pipeline and cache the response

Returns:



5471
5472
5473
5474
5475
5476
5477
5478
5479
5480
5481
5482
5483
5484
5485
5486
5487
5488
5489
5490
5491
5492
5493
5494
5495
5496
5497
5498
5499
5500
5501
5502
# File 'lib/parse/query.rb', line 5471

def execute!
  return @cached_response if @cached_response

  if @verbose
    puts "[VERBOSE AGGREGATE] Custom aggregation pipeline:"
    puts JSON.pretty_generate(@pipeline)
    puts "[VERBOSE AGGREGATE] Sending to: #{@query.instance_variable_get(:@table)}"
    puts "[VERBOSE AGGREGATE] Using MongoDB direct: #{@mongo_direct}"
  end

  if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
    @cached_response = execute_direct!
  else
    @cached_response = @query.client.aggregate_pipeline(
      @query.instance_variable_get(:@table),
      @pipeline,
      headers: {},
      **@query.send(:_opts),
    )
  end

  if @verbose
    if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
      puts "[VERBOSE AGGREGATE] Response result count: #{@cached_response&.count}"
    else
      puts "[VERBOSE AGGREGATE] Response success?: #{@cached_response.success?}"
      puts "[VERBOSE AGGREGATE] Response result count: #{@cached_response.result&.count}"
    end
  end

  @cached_response
end

#execute_direct!(max_time_ms: @max_time_ms) ⇒ Array<Hash>

Execute aggregation directly on MongoDB

Parameters:

  • max_time_ms (Integer, nil) (defaults to: @max_time_ms)

    optional server-side time limit (milliseconds). Defaults to the value passed to #initialize via the +max_time_ms:+ keyword.

Returns:



5508
5509
5510
5511
5512
# File 'lib/parse/query.rb', line 5508

def execute_direct!(max_time_ms: @max_time_ms)
  table = @query.instance_variable_get(:@table)
  auth_kwargs = @query.send(:mongo_direct_auth_kwargs)
  Parse::MongoDB.aggregate(table, @pipeline, max_time_ms: max_time_ms, **auth_kwargs)
end

#first(limit = 1) ⇒ Parse::Object+

Returns the first result from the aggregation

Parameters:

  • limit (Integer) (defaults to: 1)

    number of results to return

Returns:



5633
5634
5635
5636
# File 'lib/parse/query.rb', line 5633

def first(limit = 1)
  items = results.first(limit)
  limit == 1 ? items.first : items
end

#raw { ... } ⇒ Array<Hash>

Returns raw unprocessed results from the aggregation

Yields:

  • a block to iterate for each raw object in the result

Returns:

  • (Array<Hash>)

    raw Parse JSON hash results



5597
5598
5599
5600
5601
5602
5603
5604
5605
# File 'lib/parse/query.rb', line 5597

def raw(&block)
  response = execute!
  return [] if response.respond_to?(:error?) && response.error?

  items = response.respond_to?(:result) ? response.result : response
  items = [] unless items.is_a?(Array)
  return items.each(&block) if block_given?
  items
end

#result_pointers { ... } ⇒ Array<Parse::Pointer> Also known as: results_pointers

Returns only pointer objects for all matching results

Yields:

  • a block to iterate for each pointer object in the result

Returns:



5610
5611
5612
5613
5614
5615
5616
5617
5618
5619
5620
5621
5622
5623
5624
5625
# File 'lib/parse/query.rb', line 5610

def result_pointers(&block)
  response = execute!

  if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
    return [] if response.nil? || response.empty?
    # Convert MongoDB results to Parse format first
    converted = Parse::MongoDB.convert_documents_to_parse(response, @query.instance_variable_get(:@table))
    items = @query.send(:to_pointers, converted)
  else
    return [] if response.error?
    items = @query.send(:to_pointers, response.result)
  end

  return items.each(&block) if block_given?
  items
end

#results { ... } ⇒ Array<Parse::Object, AggregationResult> Also known as: all

Returns processed results from the aggregation.

  • Standard Parse documents (with objectId) are returned as Parse::Object instances
  • Custom aggregation results (from $group, $project, etc.) are returned as AggregationResult objects that support both hash access and method access

Yields:

  • a block to iterate for each object in the result

Returns:



5521
5522
5523
5524
5525
5526
5527
5528
5529
5530
5531
5532
5533
5534
5535
5536
5537
5538
5539
5540
# File 'lib/parse/query.rb', line 5521

def results(&block)
  response = execute!

  if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
    # For MongoDB direct, branch per-row on the *raw* document: real Parse
    # docs always carry _created_at / _updated_at, while $group rows reuse
    # _id as the group key. We must not feed group rows through
    # convert_document_to_parse, which would rename _id → objectId and
    # fool the Parse-document heuristic.
    return [] if response.nil? || response.empty?
    table = @query.instance_variable_get(:@table)
    items = response.map { |raw| convert_direct_aggregation_item(raw, table) }
  else
    return [] if response.error?
    items = response.result.map { |item| convert_aggregation_item(item) }
  end

  return items.each(&block) if block_given?
  items
end

#with_stages(*stages) ⇒ Aggregation

Create a new Aggregation with additional stages (non-mutating)

Parameters:

  • stages (Array<Hash>)

    additional pipeline stages to append

Returns:

  • (Aggregation)

    new aggregation object with combined pipeline



5673
5674
5675
# File 'lib/parse/query.rb', line 5673

def with_stages(*stages)
  Aggregation.new(@query, @pipeline + stages.flatten, verbose: @verbose)
end