Class: Parse::Aggregation

Inherits:
Object
  • Object
show all
Defined in:
lib/parse/query.rb

Overview

Helper class for executing arbitrary MongoDB aggregation pipelines. Provides a consistent interface with results, raw, and result_pointers methods.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(query, pipeline, verbose: nil, mongo_direct: false, max_time_ms: nil, raw_values: false, raw_field_names: false, allow_internal_fields: false) ⇒ Aggregation

Returns a new instance of Aggregation.

Parameters:

  • query (Parse::Query)

    the base query object

  • pipeline (Array<Hash>)

    the MongoDB aggregation pipeline stages

  • verbose (Boolean, nil) (defaults to: nil)

    whether to print verbose output (nil means use query's setting)

  • mongo_direct (Boolean) (defaults to: false)

    if true, uses MongoDB directly bypassing Parse Server (required for $literal)

  • max_time_ms (Integer, nil) (defaults to: nil)

    optional server-side time limit in milliseconds passed to MongoDB.aggregate when mongo_direct is true. Pass +nil+ (the default) for no cap.

  • raw_values (Boolean) (defaults to: false)

    when true, passes +rawValues: true+ to the Parse Server REST aggregate endpoint (PS 9.9.0+). Has no effect on the mongo-direct path.

  • raw_field_names (Boolean) (defaults to: false)

    when true, passes +rawFieldNames: true+ to the Parse Server REST aggregate endpoint (PS 9.9.0+). Has no effect on the mongo-direct path.

  • allow_internal_fields (Boolean) (defaults to: false)

    when true, the mongo-direct path forwards +allow_internal_fields: true+ to MongoDB.aggregate so SDK-built ACL $match stages that legitimately reference +_rperm+ / +_wperm+ (emitted by Query#readable_by, +#publicly_readable+, and friends) pass the pipeline-security internal-fields denylist — matching the parity already held by +results_direct+ / +count_direct+ / +distinct_direct+. Set +true+ ONLY when this Aggregation's pipeline was built entirely from SDK constraint translation (no caller-supplied stages); the credential-field guard (_hashed_password, session tokens, auth data) is what +allow_internal_fields+ relaxes, so it must never be set on a pipeline that interpolates user input. Defaults to +false+.



5983
5984
5985
5986
5987
5988
5989
5990
5991
5992
5993
5994
5995
# File 'lib/parse/query.rb', line 5983

def initialize(query, pipeline, verbose: nil, mongo_direct: false, max_time_ms: nil,
               raw_values: false, raw_field_names: false, allow_internal_fields: false)
  @query = query
  @pipeline = pipeline
  @cached_response = nil
  @mongo_direct = mongo_direct
  @max_time_ms = max_time_ms
  @raw_values = raw_values
  @raw_field_names = raw_field_names
  @allow_internal_fields = allow_internal_fields
  # Use provided verbose setting, or fall back to query's verbose_aggregate setting
  @verbose = verbose.nil? ? @query.instance_variable_get(:@verbose_aggregate) : verbose
end

Instance Attribute Details

#mongo_directBoolean (readonly)

Returns whether #execute! will route through MongoDB.aggregate instead of Parse Server's REST /aggregate endpoint.

Returns:



5960
5961
5962
# File 'lib/parse/query.rb', line 5960

def mongo_direct
  @mongo_direct
end

#pipelineArray<Hash> (readonly)

Returns the MongoDB aggregation pipeline stages this Aggregation will execute. Useful for previewing the routed pipeline before #execute!, for snapshot-based regression tests, and for debugging the REST-vs-mongo-direct translation.

Returns:

  • (Array<Hash>)

    the MongoDB aggregation pipeline stages this Aggregation will execute. Useful for previewing the routed pipeline before #execute!, for snapshot-based regression tests, and for debugging the REST-vs-mongo-direct translation.



5955
5956
5957
# File 'lib/parse/query.rb', line 5955

def pipeline
  @pipeline
end

Instance Method Details

#add_stages(*stages) ⇒ Aggregation

Add additional pipeline stages

Parameters:

  • stages (Array<Hash>)

    additional pipeline stages to append

Returns:



6211
6212
6213
6214
6215
# File 'lib/parse/query.rb', line 6211

def add_stages(*stages)
  @pipeline.concat(stages.flatten)
  @cached_response = nil # Clear cache when pipeline changes
  self
end

#any?Boolean

Check if there are any results

Returns:

  • (Boolean)

    true if there are results



6198
6199
6200
# File 'lib/parse/query.rb', line 6198

def any?
  count > 0
end

#countInteger

Returns the count of results

Returns:

  • (Integer)

    the number of results



6187
6188
6189
6190
6191
6192
6193
6194
# File 'lib/parse/query.rb', line 6187

def count
  response = execute!
  if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
    response.nil? ? 0 : response.count
  else
    response.error? ? 0 : response.result.count
  end
end

#empty?Boolean

Check if there are no results

Returns:

  • (Boolean)

    true if there are no results



6204
6205
6206
# File 'lib/parse/query.rb', line 6204

def empty?
  count == 0
end

#execute!Parse::Response, Array

Execute the aggregation pipeline and cache the response

Returns:



5999
6000
6001
6002
6003
6004
6005
6006
6007
6008
6009
6010
6011
6012
6013
6014
6015
6016
6017
6018
6019
6020
6021
6022
6023
6024
6025
6026
6027
6028
6029
6030
6031
6032
6033
6034
6035
6036
6037
6038
6039
6040
# File 'lib/parse/query.rb', line 5999

def execute!
  return @cached_response if @cached_response

  if @verbose
    puts "[VERBOSE AGGREGATE] Custom aggregation pipeline:"
    puts JSON.pretty_generate(@pipeline)
    puts "[VERBOSE AGGREGATE] Sending to: #{@query.instance_variable_get(:@table)}"
    puts "[VERBOSE AGGREGATE] Using MongoDB direct: #{@mongo_direct}"
  end

  if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
    @cached_response = execute_direct!
  else
    # REST /aggregate is master-key-only. An ambient Parse.with_session
    # block would suppress the master key via session_token, causing a
    # 401/403. Force use_master_key unless the caller explicitly disabled
    # it (use_master_key: false is a deliberate client-mode decision).
    # `.dup` keeps the master-key flip local to this call even if `_opts`
    # ever returns a shared/memoized hash.
    rest_opts = @query.send(:_opts).dup
    rest_opts[:use_master_key] = true unless rest_opts[:use_master_key] == false
    @cached_response = @query.client.aggregate_pipeline(
      @query.instance_variable_get(:@table),
      @pipeline,
      headers: {},
      raw_values: @raw_values,
      raw_field_names: @raw_field_names,
      **rest_opts,
    )
  end

  if @verbose
    if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
      puts "[VERBOSE AGGREGATE] Response result count: #{@cached_response&.count}"
    else
      puts "[VERBOSE AGGREGATE] Response success?: #{@cached_response.success?}"
      puts "[VERBOSE AGGREGATE] Response result count: #{@cached_response.result&.count}"
    end
  end

  @cached_response
end

#execute_direct!(max_time_ms: @max_time_ms) ⇒ Array<Hash>

Execute aggregation directly on MongoDB

Parameters:

  • max_time_ms (Integer, nil) (defaults to: @max_time_ms)

    optional server-side time limit (milliseconds). Defaults to the value passed to #initialize via the +max_time_ms:+ keyword.

Returns:



6046
6047
6048
6049
6050
6051
6052
6053
6054
6055
# File 'lib/parse/query.rb', line 6046

def execute_direct!(max_time_ms: @max_time_ms)
  table = @query.instance_variable_get(:@table)
  auth_kwargs = @query.send(:mongo_direct_auth_kwargs)
  # Forward the parent query's index hint so `query.hint(...).aggregate(...)`
  # honors it on the mongo-direct path too (parity with results_direct /
  # count_direct / distinct_direct).
  hint = @query.instance_variable_get(:@hint)
  Parse::MongoDB.aggregate(table, @pipeline, max_time_ms: max_time_ms, hint: hint,
                           allow_internal_fields: @allow_internal_fields, **auth_kwargs)
end

#first(limit = 1) ⇒ Parse::Object+

Returns the first result from the aggregation

Parameters:

  • limit (Integer) (defaults to: 1)

    number of results to return

Returns:



6180
6181
6182
6183
# File 'lib/parse/query.rb', line 6180

def first(limit = 1)
  items = results.first(limit)
  limit == 1 ? items.first : items
end

#raw { ... } ⇒ Array<Hash>

Returns raw unprocessed results from the aggregation

Yields:

  • a block to iterate for each raw object in the result

Returns:

  • (Array<Hash>)

    raw Parse JSON hash results



6144
6145
6146
6147
6148
6149
6150
6151
6152
# File 'lib/parse/query.rb', line 6144

def raw(&block)
  response = execute!
  return [] if response.respond_to?(:error?) && response.error?

  items = response.respond_to?(:result) ? response.result : response
  items = [] unless items.is_a?(Array)
  return items.each(&block) if block_given?
  items
end

#result_pointers { ... } ⇒ Array<Parse::Pointer> Also known as: results_pointers

Returns only pointer objects for all matching results

Yields:

  • a block to iterate for each pointer object in the result

Returns:



6157
6158
6159
6160
6161
6162
6163
6164
6165
6166
6167
6168
6169
6170
6171
6172
# File 'lib/parse/query.rb', line 6157

def result_pointers(&block)
  response = execute!

  if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
    return [] if response.nil? || response.empty?
    # Convert MongoDB results to Parse format first
    converted = Parse::MongoDB.convert_documents_to_parse(response, @query.instance_variable_get(:@table))
    items = @query.send(:to_pointers, converted)
  else
    return [] if response.error?
    items = @query.send(:to_pointers, response.result)
  end

  return items.each(&block) if block_given?
  items
end

#results { ... } ⇒ Array<Parse::Object, AggregationResult> Also known as: all

Returns processed results from the aggregation.

  • Standard Parse documents (with objectId) are returned as Parse::Object instances
  • Custom aggregation results (from $group, $project, etc.) are returned as AggregationResult objects that support both hash access and method access

Yields:

  • a block to iterate for each object in the result

Returns:



6064
6065
6066
6067
6068
6069
6070
6071
6072
6073
6074
6075
6076
6077
6078
6079
6080
6081
6082
6083
# File 'lib/parse/query.rb', line 6064

def results(&block)
  response = execute!

  if @mongo_direct && defined?(Parse::MongoDB) && Parse::MongoDB.enabled?
    # For MongoDB direct, branch per-row on the *raw* document: real Parse
    # docs always carry _created_at / _updated_at, while $group rows reuse
    # _id as the group key. We must not feed group rows through
    # convert_document_to_parse, which would rename _id → objectId and
    # fool the Parse-document heuristic.
    return [] if response.nil? || response.empty?
    table = @query.instance_variable_get(:@table)
    items = response.map { |raw| convert_direct_aggregation_item(raw, table) }
  else
    return [] if response.error?
    items = response.result.map { |item| convert_aggregation_item(item) }
  end

  return items.each(&block) if block_given?
  items
end

#with_stages(*stages) ⇒ Aggregation

Create a new Aggregation with additional stages (non-mutating)

Parameters:

  • stages (Array<Hash>)

    additional pipeline stages to append

Returns:

  • (Aggregation)

    new aggregation object with combined pipeline



6220
6221
6222
# File 'lib/parse/query.rb', line 6220

def with_stages(*stages)
  Aggregation.new(@query, @pipeline + stages.flatten, verbose: @verbose)
end