Class: Parse::Aggregation

Inherits:
Object
  • Object
show all
Defined in:
lib/parse/query.rb

Overview

Helper class for executing arbitrary MongoDB aggregation pipelines. Provides a consistent interface with results, raw, and result_pointers methods.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(query, pipeline, verbose: nil, mongo_direct: false, max_time_ms: nil, raw_values: false, raw_field_names: false, allow_internal_fields: false) ⇒ Aggregation

Returns a new instance of Aggregation.

Parameters:

  • query (Parse::Query)

    the base query object

  • pipeline (Array<Hash>)

    the MongoDB aggregation pipeline stages

  • verbose (Boolean, nil) (defaults to: nil)

    whether to print verbose output (nil means use query's setting)

  • mongo_direct (Boolean) (defaults to: false)

    if true, uses MongoDB directly bypassing Parse Server (required for $literal)

  • max_time_ms (Integer, nil) (defaults to: nil)

    optional server-side time limit in milliseconds passed to MongoDB.aggregate when mongo_direct is true. Pass +nil+ (the default) for no cap.

  • raw_values (Boolean) (defaults to: false)

    when true, passes +rawValues: true+ to the Parse Server REST aggregate endpoint (PS 9.9.0+). Has no effect on the mongo-direct path.

  • raw_field_names (Boolean) (defaults to: false)

    when true, passes +rawFieldNames: true+ to the Parse Server REST aggregate endpoint (PS 9.9.0+). Has no effect on the mongo-direct path.

  • allow_internal_fields (Boolean) (defaults to: false)

    when true, the mongo-direct path forwards +allow_internal_fields: true+ to MongoDB.aggregate so SDK-built ACL $match stages that legitimately reference +_rperm+ / +_wperm+ (emitted by Query#readable_by, +#publicly_readable+, and friends) pass the pipeline-security internal-fields denylist — matching the parity already held by +results_direct+ / +count_direct+ / +distinct_direct+. Set +true+ ONLY when this Aggregation's pipeline was built entirely from SDK constraint translation (no caller-supplied stages); the credential-field guard (_hashed_password, session tokens, auth data) is what +allow_internal_fields+ relaxes, so it must never be set on a pipeline that interpolates user input. Defaults to +false+.



5920
5921
5922
5923
5924
5925
5926
5927
5928
5929
5930
5931
5932
# File 'lib/parse/query.rb', line 5920

def initialize(query, pipeline, verbose: nil, mongo_direct: false, max_time_ms: nil,
               raw_values: false, raw_field_names: false, allow_internal_fields: false)
  @query = query
  @pipeline = pipeline
  @cached_response = nil
  @mongo_direct = mongo_direct
  @max_time_ms = max_time_ms
  @raw_values = raw_values
  @raw_field_names = raw_field_names
  @allow_internal_fields = allow_internal_fields
  # Use provided verbose setting, or fall back to query's verbose_aggregate setting
  @verbose = verbose.nil? ? @query.instance_variable_get(:@verbose_aggregate) : verbose
end

Instance Attribute Details

#mongo_directBoolean (readonly)

Returns whether #execute! will route through MongoDB.aggregate instead of Parse Server's REST /aggregate endpoint.

Returns:



5897
5898
5899
# File 'lib/parse/query.rb', line 5897

def mongo_direct
  @mongo_direct
end

#pipelineArray<Hash> (readonly)

Returns the MongoDB aggregation pipeline stages this Aggregation will execute. Useful for previewing the routed pipeline before #execute!, for snapshot-based regression tests, and for debugging the REST-vs-mongo-direct translation.

Returns:

  • (Array<Hash>)

    the MongoDB aggregation pipeline stages this Aggregation will execute. Useful for previewing the routed pipeline before #execute!, for snapshot-based regression tests, and for debugging the REST-vs-mongo-direct translation.



5892
5893
5894
# File 'lib/parse/query.rb', line 5892

def pipeline
  @pipeline
end

Instance Method Details

#add_stages(*stages) ⇒ Aggregation

Add additional pipeline stages

Parameters:

  • stages (Array<Hash>)

    additional pipeline stages to append

Returns:



6140
6141
6142
6143
6144
# File 'lib/parse/query.rb', line 6140

def add_stages(*stages)
  @pipeline.concat(stages.flatten)
  @cached_response = nil # Clear cache when pipeline changes
  self
end

#any?Boolean

Check if there are any results

Returns:

  • (Boolean)

    true if there are results



6127
6128
6129
# File 'lib/parse/query.rb', line 6127

def any?
  count > 0
end

#countInteger

Returns the count of results

Returns:

  • (Integer)

    the number of results



6116
6117
6118
6119
6120
6121
6122
6123
# File 'lib/parse/query.rb', line 6116

def count
  response = execute!
  if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
    response.nil? ? 0 : response.count
  else
    response.error? ? 0 : response.result.count
  end
end

#empty?Boolean

Check if there are no results

Returns:

  • (Boolean)

    true if there are no results



6133
6134
6135
# File 'lib/parse/query.rb', line 6133

def empty?
  count == 0
end

#execute!Parse::Response, Array

Execute the aggregation pipeline and cache the response

Returns:



5936
5937
5938
5939
5940
5941
5942
5943
5944
5945
5946
5947
5948
5949
5950
5951
5952
5953
5954
5955
5956
5957
5958
5959
5960
5961
5962
5963
5964
5965
5966
5967
5968
5969
# File 'lib/parse/query.rb', line 5936

def execute!
  return @cached_response if @cached_response

  if @verbose
    puts "[VERBOSE AGGREGATE] Custom aggregation pipeline:"
    puts JSON.pretty_generate(@pipeline)
    puts "[VERBOSE AGGREGATE] Sending to: #{@query.instance_variable_get(:@table)}"
    puts "[VERBOSE AGGREGATE] Using MongoDB direct: #{@mongo_direct}"
  end

  if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
    @cached_response = execute_direct!
  else
    @cached_response = @query.client.aggregate_pipeline(
      @query.instance_variable_get(:@table),
      @pipeline,
      headers: {},
      raw_values: @raw_values,
      raw_field_names: @raw_field_names,
      **@query.send(:_opts),
    )
  end

  if @verbose
    if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
      puts "[VERBOSE AGGREGATE] Response result count: #{@cached_response&.count}"
    else
      puts "[VERBOSE AGGREGATE] Response success?: #{@cached_response.success?}"
      puts "[VERBOSE AGGREGATE] Response result count: #{@cached_response.result&.count}"
    end
  end

  @cached_response
end

#execute_direct!(max_time_ms: @max_time_ms) ⇒ Array<Hash>

Execute aggregation directly on MongoDB

Parameters:

  • max_time_ms (Integer, nil) (defaults to: @max_time_ms)

    optional server-side time limit (milliseconds). Defaults to the value passed to #initialize via the +max_time_ms:+ keyword.

Returns:



5975
5976
5977
5978
5979
5980
5981
5982
5983
5984
# File 'lib/parse/query.rb', line 5975

def execute_direct!(max_time_ms: @max_time_ms)
  table = @query.instance_variable_get(:@table)
  auth_kwargs = @query.send(:mongo_direct_auth_kwargs)
  # Forward the parent query's index hint so `query.hint(...).aggregate(...)`
  # honors it on the mongo-direct path too (parity with results_direct /
  # count_direct / distinct_direct).
  hint = @query.instance_variable_get(:@hint)
  Parse::MongoDB.aggregate(table, @pipeline, max_time_ms: max_time_ms, hint: hint,
                           allow_internal_fields: @allow_internal_fields, **auth_kwargs)
end

#first(limit = 1) ⇒ Parse::Object+

Returns the first result from the aggregation

Parameters:

  • limit (Integer) (defaults to: 1)

    number of results to return

Returns:



6109
6110
6111
6112
# File 'lib/parse/query.rb', line 6109

def first(limit = 1)
  items = results.first(limit)
  limit == 1 ? items.first : items
end

#raw { ... } ⇒ Array<Hash>

Returns raw unprocessed results from the aggregation

Yields:

  • a block to iterate for each raw object in the result

Returns:

  • (Array<Hash>)

    raw Parse JSON hash results



6073
6074
6075
6076
6077
6078
6079
6080
6081
# File 'lib/parse/query.rb', line 6073

def raw(&block)
  response = execute!
  return [] if response.respond_to?(:error?) && response.error?

  items = response.respond_to?(:result) ? response.result : response
  items = [] unless items.is_a?(Array)
  return items.each(&block) if block_given?
  items
end

#result_pointers { ... } ⇒ Array<Parse::Pointer> Also known as: results_pointers

Returns only pointer objects for all matching results

Yields:

  • a block to iterate for each pointer object in the result

Returns:



6086
6087
6088
6089
6090
6091
6092
6093
6094
6095
6096
6097
6098
6099
6100
6101
# File 'lib/parse/query.rb', line 6086

def result_pointers(&block)
  response = execute!

  if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
    return [] if response.nil? || response.empty?
    # Convert MongoDB results to Parse format first
    converted = Parse::MongoDB.convert_documents_to_parse(response, @query.instance_variable_get(:@table))
    items = @query.send(:to_pointers, converted)
  else
    return [] if response.error?
    items = @query.send(:to_pointers, response.result)
  end

  return items.each(&block) if block_given?
  items
end

#results { ... } ⇒ Array<Parse::Object, AggregationResult> Also known as: all

Returns processed results from the aggregation.

  • Standard Parse documents (with objectId) are returned as Parse::Object instances
  • Custom aggregation results (from $group, $project, etc.) are returned as AggregationResult objects that support both hash access and method access

Yields:

  • a block to iterate for each object in the result

Returns:



5993
5994
5995
5996
5997
5998
5999
6000
6001
6002
6003
6004
6005
6006
6007
6008
6009
6010
6011
6012
# File 'lib/parse/query.rb', line 5993

def results(&block)
  response = execute!

  if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
    # For MongoDB direct, branch per-row on the *raw* document: real Parse
    # docs always carry _created_at / _updated_at, while $group rows reuse
    # _id as the group key. We must not feed group rows through
    # convert_document_to_parse, which would rename _id → objectId and
    # fool the Parse-document heuristic.
    return [] if response.nil? || response.empty?
    table = @query.instance_variable_get(:@table)
    items = response.map { |raw| convert_direct_aggregation_item(raw, table) }
  else
    return [] if response.error?
    items = response.result.map { |item| convert_aggregation_item(item) }
  end

  return items.each(&block) if block_given?
  items
end

#with_stages(*stages) ⇒ Aggregation

Create a new Aggregation with additional stages (non-mutating)

Parameters:

  • stages (Array<Hash>)

    additional pipeline stages to append

Returns:

  • (Aggregation)

    new aggregation object with combined pipeline



6149
6150
6151
# File 'lib/parse/query.rb', line 6149

def with_stages(*stages)
  Aggregation.new(@query, @pipeline + stages.flatten, verbose: @verbose)
end