Module: Parse::VectorSearch::Hybrid

Defined in:
lib/parse/vector_search/hybrid.rb

Overview

Hybrid (lexical + vector) search with reciprocal-rank fusion.

Lexical search (Parse::AtlasSearch, BM25/$search) nails exact-token matches — proper nouns, SKU codes, "OAuth 2.0". Vector search (Parse::VectorSearch, $vectorSearch) nails paraphrase — "login token spec". Fusing the two beats either alone on most real workloads.

== Why two aggregations (and not one $facet)

$vectorSearch is explicitly prohibited inside $facet, $lookup, $unionWith, or any compound stage on every Atlas version, and it must be the FIRST stage of its pipeline. So on pre-Atlas-8.0 clusters the only correct shape is two independent aggregations followed by client-side reciprocal-rank fusion (RRF). On Atlas 8.0+ the native $rankFusion stage performs the same fusion server-side in a single round-trip; Hybrid.rank_fusion_supported? detects it (probe-and-cache, not version-string parsing).

== ACL / CLP enforcement

The client-side path delegates each branch to an entry point that already enforces the full SDK-side chain — AtlasSearch.search (lexical) and search (vector). Both apply the CLP find boundary, the post-stage _rperm $match, pointerFields filtering, protectedFields redaction, and the internal-fields denylist BEFORE returning rows. Fusion therefore operates only on rows the caller is already allowed to read; there is no separate hydration fetch to re-secure. The native $rankFusion path reproduces the same enforcement inline (CLP find, post-stage ACL $match, post-fetch redaction), mirroring search.

== Scores

The vector branch projects _vscore (Atlas vectorSearchScore), the lexical branch _score (Atlas searchScore). The fused row carries _hybrid_score (the summed RRF weight) and _hybrid_ranks ({ lexical: <rank>, vector: <rank> }, 1-based, absent for a branch the row did not appear in). The raw branch scores are preserved on the row for callers that want them.

Defined Under Namespace

Classes: FusionError

Constant Summary collapse

DEFAULT_K_CONSTANT =

Standard RRF rank constant. Larger values flatten the contribution curve (later ranks matter more); 60 is the value from the original Cormack et al. RRF paper and the Atlas $rankFusion default.

60
DEFAULT_K =

Default number of fused hits returned.

20
DEFAULT_OVERSAMPLE_MULTIPLIER =

Per-branch oversample multiplier. Each branch fetches k * this candidates so a row ranked low in one branch but high in the other still has a rank to fuse. Atlas's own $rankFusion uses a comparable internal oversample.

5
MAX_K =

Hard ceiling on the fused result count, matching MAX_K.

Parse::VectorSearch::MAX_K
PROBE_CACHE_TTL =

TTL (seconds) for the rank_fusion_supported? probe cache. A cluster gaining or losing $rankFusion support is a rare, operator-driven event (an Atlas major-version upgrade), so a 1-hour cache keeps the extra probe round-trip off the hot path.

3600

Class Method Summary collapse

Class Method Details

.clear_probe_cache(collection = nil) ⇒ Object

Clear the rank_fusion_supported? probe cache (all collections, or one). Mainly for tests that toggle cluster behaviour between cases.

Parameters:

  • collection (String, nil) (defaults to: nil)


160
161
162
163
164
165
166
167
168
# File 'lib/parse/vector_search/hybrid.rb', line 160

def clear_probe_cache(collection = nil)
  probe_mutex.synchronize do
    if collection
      probe_cache.delete(collection.to_s)
    else
      @probe_cache = {}
    end
  end
end

.rank_fusion_supported?(collection) ⇒ Boolean

Detect whether the cluster backing collection supports the native $rankFusion aggregation stage (Atlas 8.0+).

Probe-and-cache, NOT version-string parsing: Atlas upgrades cluster versions silently and the exact version where $rankFusion reached general availability has moved. We send a zero-cost behavioural probe ([{$rankFusion: {input: {}}}, {$limit: 0}]) and classify the response: success or any error OTHER than "unknown stage" means supported; an "Unknown aggregation stage" failure means unsupported. The result is cached per collection for PROBE_CACHE_TTL.

Parameters:

  • collection (String)

    Parse class / Mongo collection name.

Returns:

  • (Boolean)


144
145
146
147
148
149
150
151
152
153
# File 'lib/parse/vector_search/hybrid.rb', line 144

def rank_fusion_supported?(collection)
  key = collection.to_s
  now = monotonic
  cached = probe_cache_get(key, now)
  return cached unless cached.nil?

  supported = run_probe(key)
  probe_cache_put(key, supported, now)
  supported
end

.rrf(branches, k_constant: DEFAULT_K_CONSTANT, weights: nil) ⇒ Array<Hash>

Pure reciprocal-rank fusion. Operates on already-fetched, already-ranked branch result lists — no I/O, no ACL concerns (the rows were enforced upstream).

fused_score(d) = Σ_b weight_b / (k_constant + rank_b(d))

Parameters:

  • branches (Hash{Symbol=>Array<Hash>})

    each value is a branch's result rows in descending relevance order (best first). Keys name the branch (:lexical, :vector).

  • k_constant (Integer) (defaults to: DEFAULT_K_CONSTANT)

    RRF rank constant (> 0).

  • weights (Hash{Symbol=>Numeric}, nil) (defaults to: nil)

    per-branch weight. Missing branches default to weight 1.0; nil weights the whole set at 1.0.

Returns:

  • (Array<Hash>)

    fused rows, descending by _hybrid_score, each carrying _hybrid_score and _hybrid_ranks. Ties broke deterministically by objectId (stable for snapshots).

Raises:



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/parse/vector_search/hybrid.rb', line 96

def rrf(branches, k_constant: DEFAULT_K_CONSTANT, weights: nil)
  unless branches.is_a?(Hash) && !branches.empty?
    raise FusionError, "rrf: branches must be a non-empty Hash of ranked result lists."
  end
  kc = Integer(k_constant)
  raise FusionError, "rrf: k_constant must be a positive integer (got #{kc})." if kc <= 0
  validate_weights!(weights)

  acc = {}
  order = 0
  branches.each do |branch_name, rows|
    weight = weight_for(weights, branch_name)
    next if weight.zero?
    Array(rows).each_with_index do |row, i|
      id = row_id(row)
      next if id.nil?
      rank = i + 1
      entry = (acc[id] ||= { doc: row, score: 0.0, ranks: {}, seq: (order += 1) })
      entry[:doc] = merge_rows(entry[:doc], row)
      entry[:score] += weight.to_f / (kc + rank)
      entry[:ranks][branch_name] = rank
    end
  end

  acc.values
     .sort_by { |e| [-e[:score], row_id(e[:doc]).to_s, e[:seq]] }
     .map do |e|
       row = e[:doc].dup
       row["_hybrid_score"] = e[:score]
       row["_hybrid_ranks"] = e[:ranks]
       row
     end
end

.search(collection_name, lexical:, vector:, k: DEFAULT_K, fusion: nil, **scope_opts) ⇒ Array<Hash>

Run a hybrid search and return the fused raw rows.

Parameters:

  • collection_name (String)

    Parse class / collection.

  • lexical (Hash)

    lexical branch config:

    • :query String the $search query text.
    • :index [String, nil] Atlas Search (lexical) index name.
    • :fields [Array, String, nil] fields to search; defaults to a wildcard path.
    • :filter [Hash, nil] post-$search $match.
    • :fuzzy [Hash, nil] forwarded to the text operator.
  • vector (Hash)

    vector branch config:

    • :query_vector Array the query embedding.
    • :field String, Symbol vector field path.
    • :index [String, nil] vectorSearch index name.
    • :num_candidates [Integer, nil] Atlas HNSW search width.
    • :filter [Hash, nil] post-$vectorSearch $match.
    • :vector_filter [Hash, nil] Atlas-native pre-search filter.
  • k (Integer) (defaults to: DEFAULT_K)

    number of fused hits to return (≤ MAX_K).

  • fusion (Hash, nil) (defaults to: nil)

    fusion config:

    • :method [Symbol] :rrf (default) and :rrf_client both fuse CLIENT-SIDE (deterministic across Atlas versions). :rrf_native opts into the single-roundtrip server-side $rankFusion stage (Atlas 8.0+ only) and falls back to the client path when unsupported or on any execution error.
    • :k_constant [Integer] RRF rank constant.
    • :weights [Hash] { lexical:, vector: } branch weights.
  • scope_opts (Hash)

    ACL/CLP scope kwargs forwarded to BOTH branch entry points: session_token: / master: / acl_user: / acl_role:.

Returns:

Raises:

  • (ArgumentError)


200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/parse/vector_search/hybrid.rb', line 200

def search(collection_name, lexical:, vector:, k: DEFAULT_K, fusion: nil, **scope_opts)
  require_available!
  fusion = symbolize(fusion || {})
  lex = symbolize(lexical || {})
  vec = symbolize(vector || {})

  k_int = Integer(k)
  raise ArgumentError, "k must be in 1..#{MAX_K} (got #{k_int})." if k_int <= 0 || k_int > MAX_K

  unless lex[:query].is_a?(String) && !lex[:query].strip.empty?
    raise ArgumentError, "hybrid search: lexical[:query] must be a non-empty String."
  end
  if vec[:query_vector].nil? || vec[:field].nil?
    raise ArgumentError, "hybrid search: vector[:query_vector] and vector[:field] are required."
  end

  method = (fusion[:method] || :rrf).to_sym
  unless %i[rrf rrf_client rrf_native].include?(method)
    raise ArgumentError,
          "hybrid search: fusion[:method] must be :rrf, :rrf_client, or :rrf_native (got #{method.inspect})."
  end
  k_constant = fusion[:k_constant] || DEFAULT_K_CONSTANT
  weights    = fusion[:weights]
  oversample = [k_int * DEFAULT_OVERSAMPLE_MULTIPLIER, k_int].max

  # NOTE (deviation from plan §8.3): the default fuses CLIENT-SIDE.
  # The native single-roundtrip `$rankFusion` path is OPT-IN
  # (`fusion: { method: :rrf_native }`) rather than the default,
  # because its server-side execution (and its ACL `$match`
  # placement) cannot be validated without an Atlas 8.0+ cluster
  # in CI. `rank_fusion_supported?` detection ships and is unit-
  # tested; the native pipeline shape is snapshot-tested; but live
  # results route through the always-correct, fully-enforced
  # two-aggregate client path unless a caller explicitly opts into
  # native AND the cluster supports it. Native still falls back to
  # the client path on any execution error.
  if method == :rrf_native && rank_fusion_supported?(collection_name)
    fused = run_native(collection_name, lex, vec, oversample,
                       k_constant: k_constant, weights: weights, scope_opts: scope_opts)
    return fused.first(k_int) if fused
  end

  lexical_rows = run_lexical(collection_name, lex, oversample, scope_opts)
  vector_rows  = run_vector(collection_name, vec, oversample, scope_opts)
  rrf({ lexical: lexical_rows, vector: vector_rows },
      k_constant: k_constant, weights: weights).first(k_int)
end