Class: Parse::Aggregation

Inherits:
Object
  • Object
show all
Defined in:
lib/parse/query.rb

Overview

Helper class for executing arbitrary MongoDB aggregation pipelines. Provides a consistent interface with results, raw, and result_pointers methods.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(query, pipeline, verbose: nil, mongo_direct: false, max_time_ms: nil, raw_values: false, raw_field_names: false) ⇒ Aggregation

Returns a new instance of Aggregation.

Parameters:

  • query (Parse::Query)

    the base query object

  • pipeline (Array<Hash>)

    the MongoDB aggregation pipeline stages

  • verbose (Boolean, nil) (defaults to: nil)

    whether to print verbose output (nil means use query's setting)

  • mongo_direct (Boolean) (defaults to: false)

    if true, uses MongoDB directly bypassing Parse Server (required for $literal)

  • max_time_ms (Integer, nil) (defaults to: nil)

    optional server-side time limit in milliseconds passed to MongoDB.aggregate when mongo_direct is true. Pass +nil+ (the default) for no cap.

  • raw_values (Boolean) (defaults to: false)

    when true, passes +rawValues: true+ to the Parse Server REST aggregate endpoint (PS 9.9.0+). Has no effect on the mongo-direct path.

  • raw_field_names (Boolean) (defaults to: false)

    when true, passes +rawFieldNames: true+ to the Parse Server REST aggregate endpoint (PS 9.9.0+). Has no effect on the mongo-direct path.



5731
5732
5733
5734
5735
5736
5737
5738
5739
5740
5741
5742
# File 'lib/parse/query.rb', line 5731

def initialize(query, pipeline, verbose: nil, mongo_direct: false, max_time_ms: nil,
               raw_values: false, raw_field_names: false)
  @query = query
  @pipeline = pipeline
  @cached_response = nil
  @mongo_direct = mongo_direct
  @max_time_ms = max_time_ms
  @raw_values = raw_values
  @raw_field_names = raw_field_names
  # Use provided verbose setting, or fall back to query's verbose_aggregate setting
  @verbose = verbose.nil? ? @query.instance_variable_get(:@verbose_aggregate) : verbose
end

Instance Attribute Details

#mongo_directBoolean (readonly)

Returns whether #execute! will route through MongoDB.aggregate instead of Parse Server's REST /aggregate endpoint.

Returns:



5719
5720
5721
# File 'lib/parse/query.rb', line 5719

def mongo_direct
  @mongo_direct
end

#pipelineArray<Hash> (readonly)

Returns the MongoDB aggregation pipeline stages this Aggregation will execute. Useful for previewing the routed pipeline before #execute!, for snapshot-based regression tests, and for debugging the REST-vs-mongo-direct translation.

Returns:

  • (Array<Hash>)

    the MongoDB aggregation pipeline stages this Aggregation will execute. Useful for previewing the routed pipeline before #execute!, for snapshot-based regression tests, and for debugging the REST-vs-mongo-direct translation.



5714
5715
5716
# File 'lib/parse/query.rb', line 5714

def pipeline
  @pipeline
end

Instance Method Details

#add_stages(*stages) ⇒ Aggregation

Add additional pipeline stages

Parameters:

  • stages (Array<Hash>)

    additional pipeline stages to append

Returns:



5949
5950
5951
5952
5953
# File 'lib/parse/query.rb', line 5949

def add_stages(*stages)
  @pipeline.concat(stages.flatten)
  @cached_response = nil # Clear cache when pipeline changes
  self
end

#any?Boolean

Check if there are any results

Returns:

  • (Boolean)

    true if there are results



5936
5937
5938
# File 'lib/parse/query.rb', line 5936

def any?
  count > 0
end

#countInteger

Returns the count of results

Returns:

  • (Integer)

    the number of results



5925
5926
5927
5928
5929
5930
5931
5932
# File 'lib/parse/query.rb', line 5925

def count
  response = execute!
  if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
    response.nil? ? 0 : response.count
  else
    response.error? ? 0 : response.result.count
  end
end

#empty?Boolean

Check if there are no results

Returns:

  • (Boolean)

    true if there are no results



5942
5943
5944
# File 'lib/parse/query.rb', line 5942

def empty?
  count == 0
end

#execute!Parse::Response, Array

Execute the aggregation pipeline and cache the response

Returns:



5746
5747
5748
5749
5750
5751
5752
5753
5754
5755
5756
5757
5758
5759
5760
5761
5762
5763
5764
5765
5766
5767
5768
5769
5770
5771
5772
5773
5774
5775
5776
5777
5778
5779
# File 'lib/parse/query.rb', line 5746

def execute!
  return @cached_response if @cached_response

  if @verbose
    puts "[VERBOSE AGGREGATE] Custom aggregation pipeline:"
    puts JSON.pretty_generate(@pipeline)
    puts "[VERBOSE AGGREGATE] Sending to: #{@query.instance_variable_get(:@table)}"
    puts "[VERBOSE AGGREGATE] Using MongoDB direct: #{@mongo_direct}"
  end

  if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
    @cached_response = execute_direct!
  else
    @cached_response = @query.client.aggregate_pipeline(
      @query.instance_variable_get(:@table),
      @pipeline,
      headers: {},
      raw_values: @raw_values,
      raw_field_names: @raw_field_names,
      **@query.send(:_opts),
    )
  end

  if @verbose
    if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
      puts "[VERBOSE AGGREGATE] Response result count: #{@cached_response&.count}"
    else
      puts "[VERBOSE AGGREGATE] Response success?: #{@cached_response.success?}"
      puts "[VERBOSE AGGREGATE] Response result count: #{@cached_response.result&.count}"
    end
  end

  @cached_response
end

#execute_direct!(max_time_ms: @max_time_ms) ⇒ Array<Hash>

Execute aggregation directly on MongoDB

Parameters:

  • max_time_ms (Integer, nil) (defaults to: @max_time_ms)

    optional server-side time limit (milliseconds). Defaults to the value passed to #initialize via the +max_time_ms:+ keyword.

Returns:



5785
5786
5787
5788
5789
5790
5791
5792
5793
# File 'lib/parse/query.rb', line 5785

def execute_direct!(max_time_ms: @max_time_ms)
  table = @query.instance_variable_get(:@table)
  auth_kwargs = @query.send(:mongo_direct_auth_kwargs)
  # Forward the parent query's index hint so `query.hint(...).aggregate(...)`
  # honors it on the mongo-direct path too (parity with results_direct /
  # count_direct / distinct_direct).
  hint = @query.instance_variable_get(:@hint)
  Parse::MongoDB.aggregate(table, @pipeline, max_time_ms: max_time_ms, hint: hint, **auth_kwargs)
end

#first(limit = 1) ⇒ Parse::Object+

Returns the first result from the aggregation

Parameters:

  • limit (Integer) (defaults to: 1)

    number of results to return

Returns:



5918
5919
5920
5921
# File 'lib/parse/query.rb', line 5918

def first(limit = 1)
  items = results.first(limit)
  limit == 1 ? items.first : items
end

#raw { ... } ⇒ Array<Hash>

Returns raw unprocessed results from the aggregation

Yields:

  • a block to iterate for each raw object in the result

Returns:

  • (Array<Hash>)

    raw Parse JSON hash results



5882
5883
5884
5885
5886
5887
5888
5889
5890
# File 'lib/parse/query.rb', line 5882

def raw(&block)
  response = execute!
  return [] if response.respond_to?(:error?) && response.error?

  items = response.respond_to?(:result) ? response.result : response
  items = [] unless items.is_a?(Array)
  return items.each(&block) if block_given?
  items
end

#result_pointers { ... } ⇒ Array<Parse::Pointer> Also known as: results_pointers

Returns only pointer objects for all matching results

Yields:

  • a block to iterate for each pointer object in the result

Returns:



5895
5896
5897
5898
5899
5900
5901
5902
5903
5904
5905
5906
5907
5908
5909
5910
# File 'lib/parse/query.rb', line 5895

def result_pointers(&block)
  response = execute!

  if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
    return [] if response.nil? || response.empty?
    # Convert MongoDB results to Parse format first
    converted = Parse::MongoDB.convert_documents_to_parse(response, @query.instance_variable_get(:@table))
    items = @query.send(:to_pointers, converted)
  else
    return [] if response.error?
    items = @query.send(:to_pointers, response.result)
  end

  return items.each(&block) if block_given?
  items
end

#results { ... } ⇒ Array<Parse::Object, AggregationResult> Also known as: all

Returns processed results from the aggregation.

  • Standard Parse documents (with objectId) are returned as Parse::Object instances
  • Custom aggregation results (from $group, $project, etc.) are returned as AggregationResult objects that support both hash access and method access

Yields:

  • a block to iterate for each object in the result

Returns:



5802
5803
5804
5805
5806
5807
5808
5809
5810
5811
5812
5813
5814
5815
5816
5817
5818
5819
5820
5821
# File 'lib/parse/query.rb', line 5802

def results(&block)
  response = execute!

  if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
    # For MongoDB direct, branch per-row on the *raw* document: real Parse
    # docs always carry _created_at / _updated_at, while $group rows reuse
    # _id as the group key. We must not feed group rows through
    # convert_document_to_parse, which would rename _id → objectId and
    # fool the Parse-document heuristic.
    return [] if response.nil? || response.empty?
    table = @query.instance_variable_get(:@table)
    items = response.map { |raw| convert_direct_aggregation_item(raw, table) }
  else
    return [] if response.error?
    items = response.result.map { |item| convert_aggregation_item(item) }
  end

  return items.each(&block) if block_given?
  items
end

#with_stages(*stages) ⇒ Aggregation

Create a new Aggregation with additional stages (non-mutating)

Parameters:

  • stages (Array<Hash>)

    additional pipeline stages to append

Returns:

  • (Aggregation)

    new aggregation object with combined pipeline



5958
5959
5960
# File 'lib/parse/query.rb', line 5958

def with_stages(*stages)
  Aggregation.new(@query, @pipeline + stages.flatten, verbose: @verbose)
end