Class: Parse::Aggregation

Inherits:
Object
  • Object
show all
Defined in:
lib/parse/query.rb

Overview

Helper class for executing arbitrary MongoDB aggregation pipelines. Provides a consistent interface with results, raw, and result_pointers methods.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(query, pipeline, verbose: nil, mongo_direct: false, max_time_ms: nil, raw_values: false, raw_field_names: false, allow_internal_fields: false) ⇒ Aggregation

Returns a new instance of Aggregation.

Parameters:

  • query (Parse::Query)

    the base query object

  • pipeline (Array<Hash>)

    the MongoDB aggregation pipeline stages

  • verbose (Boolean, nil) (defaults to: nil)

    whether to print verbose output (nil means use query's setting)

  • mongo_direct (Boolean) (defaults to: false)

    if true, uses MongoDB directly bypassing Parse Server (required for $literal)

  • max_time_ms (Integer, nil) (defaults to: nil)

    optional server-side time limit in milliseconds passed to MongoDB.aggregate when mongo_direct is true. Pass +nil+ (the default) for no cap.

  • raw_values (Boolean) (defaults to: false)

    when true, passes +rawValues: true+ to the Parse Server REST aggregate endpoint (PS 9.9.0+). Has no effect on the mongo-direct path.

  • raw_field_names (Boolean) (defaults to: false)

    when true, passes +rawFieldNames: true+ to the Parse Server REST aggregate endpoint (PS 9.9.0+). Has no effect on the mongo-direct path.

  • allow_internal_fields (Boolean) (defaults to: false)

    when true, the mongo-direct path forwards +allow_internal_fields: true+ to MongoDB.aggregate so SDK-built ACL $match stages that legitimately reference +_rperm+ / +_wperm+ (emitted by Query#readable_by, +#publicly_readable+, and friends) pass the pipeline-security internal-fields denylist — matching the parity already held by +results_direct+ / +count_direct+ / +distinct_direct+. Set +true+ ONLY when this Aggregation's pipeline was built entirely from SDK constraint translation (no caller-supplied stages); the credential-field guard (_hashed_password, session tokens, auth data) is what +allow_internal_fields+ relaxes, so it must never be set on a pipeline that interpolates user input. Defaults to +false+.



5970
5971
5972
5973
5974
5975
5976
5977
5978
5979
5980
5981
5982
# File 'lib/parse/query.rb', line 5970

def initialize(query, pipeline, verbose: nil, mongo_direct: false, max_time_ms: nil,
               raw_values: false, raw_field_names: false, allow_internal_fields: false)
  @query = query
  @pipeline = pipeline
  @cached_response = nil
  @mongo_direct = mongo_direct
  @max_time_ms = max_time_ms
  @raw_values = raw_values
  @raw_field_names = raw_field_names
  @allow_internal_fields = allow_internal_fields
  # Use provided verbose setting, or fall back to query's verbose_aggregate setting
  @verbose = verbose.nil? ? @query.instance_variable_get(:@verbose_aggregate) : verbose
end

Instance Attribute Details

#mongo_directBoolean (readonly)

Returns whether #execute! will route through MongoDB.aggregate instead of Parse Server's REST /aggregate endpoint.

Returns:



5947
5948
5949
# File 'lib/parse/query.rb', line 5947

def mongo_direct
  @mongo_direct
end

#pipelineArray<Hash> (readonly)

Returns the MongoDB aggregation pipeline stages this Aggregation will execute. Useful for previewing the routed pipeline before #execute!, for snapshot-based regression tests, and for debugging the REST-vs-mongo-direct translation.

Returns:

  • (Array<Hash>)

    the MongoDB aggregation pipeline stages this Aggregation will execute. Useful for previewing the routed pipeline before #execute!, for snapshot-based regression tests, and for debugging the REST-vs-mongo-direct translation.



5942
5943
5944
# File 'lib/parse/query.rb', line 5942

def pipeline
  @pipeline
end

Instance Method Details

#add_stages(*stages) ⇒ Aggregation

Add additional pipeline stages

Parameters:

  • stages (Array<Hash>)

    additional pipeline stages to append

Returns:



6190
6191
6192
6193
6194
# File 'lib/parse/query.rb', line 6190

def add_stages(*stages)
  @pipeline.concat(stages.flatten)
  @cached_response = nil # Clear cache when pipeline changes
  self
end

#any?Boolean

Check if there are any results

Returns:

  • (Boolean)

    true if there are results



6177
6178
6179
# File 'lib/parse/query.rb', line 6177

def any?
  count > 0
end

#countInteger

Returns the count of results

Returns:

  • (Integer)

    the number of results



6166
6167
6168
6169
6170
6171
6172
6173
# File 'lib/parse/query.rb', line 6166

def count
  response = execute!
  if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
    response.nil? ? 0 : response.count
  else
    response.error? ? 0 : response.result.count
  end
end

#empty?Boolean

Check if there are no results

Returns:

  • (Boolean)

    true if there are no results



6183
6184
6185
# File 'lib/parse/query.rb', line 6183

def empty?
  count == 0
end

#execute!Parse::Response, Array

Execute the aggregation pipeline and cache the response

Returns:



5986
5987
5988
5989
5990
5991
5992
5993
5994
5995
5996
5997
5998
5999
6000
6001
6002
6003
6004
6005
6006
6007
6008
6009
6010
6011
6012
6013
6014
6015
6016
6017
6018
6019
# File 'lib/parse/query.rb', line 5986

def execute!
  return @cached_response if @cached_response

  if @verbose
    puts "[VERBOSE AGGREGATE] Custom aggregation pipeline:"
    puts JSON.pretty_generate(@pipeline)
    puts "[VERBOSE AGGREGATE] Sending to: #{@query.instance_variable_get(:@table)}"
    puts "[VERBOSE AGGREGATE] Using MongoDB direct: #{@mongo_direct}"
  end

  if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
    @cached_response = execute_direct!
  else
    @cached_response = @query.client.aggregate_pipeline(
      @query.instance_variable_get(:@table),
      @pipeline,
      headers: {},
      raw_values: @raw_values,
      raw_field_names: @raw_field_names,
      **@query.send(:_opts),
    )
  end

  if @verbose
    if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
      puts "[VERBOSE AGGREGATE] Response result count: #{@cached_response&.count}"
    else
      puts "[VERBOSE AGGREGATE] Response success?: #{@cached_response.success?}"
      puts "[VERBOSE AGGREGATE] Response result count: #{@cached_response.result&.count}"
    end
  end

  @cached_response
end

#execute_direct!(max_time_ms: @max_time_ms) ⇒ Array<Hash>

Execute aggregation directly on MongoDB

Parameters:

  • max_time_ms (Integer, nil) (defaults to: @max_time_ms)

    optional server-side time limit (milliseconds). Defaults to the value passed to #initialize via the +max_time_ms:+ keyword.

Returns:



6025
6026
6027
6028
6029
6030
6031
6032
6033
6034
# File 'lib/parse/query.rb', line 6025

def execute_direct!(max_time_ms: @max_time_ms)
  table = @query.instance_variable_get(:@table)
  auth_kwargs = @query.send(:mongo_direct_auth_kwargs)
  # Forward the parent query's index hint so `query.hint(...).aggregate(...)`
  # honors it on the mongo-direct path too (parity with results_direct /
  # count_direct / distinct_direct).
  hint = @query.instance_variable_get(:@hint)
  Parse::MongoDB.aggregate(table, @pipeline, max_time_ms: max_time_ms, hint: hint,
                           allow_internal_fields: @allow_internal_fields, **auth_kwargs)
end

#first(limit = 1) ⇒ Parse::Object+

Returns the first result from the aggregation

Parameters:

  • limit (Integer) (defaults to: 1)

    number of results to return

Returns:



6159
6160
6161
6162
# File 'lib/parse/query.rb', line 6159

def first(limit = 1)
  items = results.first(limit)
  limit == 1 ? items.first : items
end

#raw { ... } ⇒ Array<Hash>

Returns raw unprocessed results from the aggregation

Yields:

  • a block to iterate for each raw object in the result

Returns:

  • (Array<Hash>)

    raw Parse JSON hash results



6123
6124
6125
6126
6127
6128
6129
6130
6131
# File 'lib/parse/query.rb', line 6123

def raw(&block)
  response = execute!
  return [] if response.respond_to?(:error?) && response.error?

  items = response.respond_to?(:result) ? response.result : response
  items = [] unless items.is_a?(Array)
  return items.each(&block) if block_given?
  items
end

#result_pointers { ... } ⇒ Array<Parse::Pointer> Also known as: results_pointers

Returns only pointer objects for all matching results

Yields:

  • a block to iterate for each pointer object in the result

Returns:



6136
6137
6138
6139
6140
6141
6142
6143
6144
6145
6146
6147
6148
6149
6150
6151
# File 'lib/parse/query.rb', line 6136

def result_pointers(&block)
  response = execute!

  if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
    return [] if response.nil? || response.empty?
    # Convert MongoDB results to Parse format first
    converted = Parse::MongoDB.convert_documents_to_parse(response, @query.instance_variable_get(:@table))
    items = @query.send(:to_pointers, converted)
  else
    return [] if response.error?
    items = @query.send(:to_pointers, response.result)
  end

  return items.each(&block) if block_given?
  items
end

#results { ... } ⇒ Array<Parse::Object, AggregationResult> Also known as: all

Returns processed results from the aggregation.

  • Standard Parse documents (with objectId) are returned as Parse::Object instances
  • Custom aggregation results (from $group, $project, etc.) are returned as AggregationResult objects that support both hash access and method access

Yields:

  • a block to iterate for each object in the result

Returns:



6043
6044
6045
6046
6047
6048
6049
6050
6051
6052
6053
6054
6055
6056
6057
6058
6059
6060
6061
6062
# File 'lib/parse/query.rb', line 6043

def results(&block)
  response = execute!

  if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
    # For MongoDB direct, branch per-row on the *raw* document: real Parse
    # docs always carry _created_at / _updated_at, while $group rows reuse
    # _id as the group key. We must not feed group rows through
    # convert_document_to_parse, which would rename _id → objectId and
    # fool the Parse-document heuristic.
    return [] if response.nil? || response.empty?
    table = @query.instance_variable_get(:@table)
    items = response.map { |raw| convert_direct_aggregation_item(raw, table) }
  else
    return [] if response.error?
    items = response.result.map { |item| convert_aggregation_item(item) }
  end

  return items.each(&block) if block_given?
  items
end

#with_stages(*stages) ⇒ Aggregation

Create a new Aggregation with additional stages (non-mutating)

Parameters:

  • stages (Array<Hash>)

    additional pipeline stages to append

Returns:

  • (Aggregation)

    new aggregation object with combined pipeline



6199
6200
6201
# File 'lib/parse/query.rb', line 6199

def with_stages(*stages)
  Aggregation.new(@query, @pipeline + stages.flatten, verbose: @verbose)
end