Module: Parse::VectorSearch::Hybrid
- Defined in:
- lib/parse/vector_search/hybrid.rb
Overview
Hybrid (lexical + vector) search with reciprocal-rank fusion.
Lexical search (Parse::AtlasSearch, BM25/$search) nails
exact-token matches — proper nouns, SKU codes, "OAuth 2.0". Vector
search (Parse::VectorSearch, $vectorSearch) nails paraphrase —
"login token spec". Fusing the two beats either alone on most real
workloads.
== Why two aggregations (and not one $facet)
$vectorSearch is explicitly prohibited inside $facet,
$lookup, $unionWith, or any compound stage on every Atlas
version, and it must be the FIRST stage of its pipeline. So on
pre-Atlas-8.0 clusters the only correct shape is two independent
aggregations followed by client-side reciprocal-rank fusion (RRF).
On Atlas 8.0+ the native $rankFusion stage performs the same
fusion server-side in a single round-trip; Hybrid.rank_fusion_supported?
detects it (probe-and-cache, not version-string parsing).
== ACL / CLP enforcement
The client-side path delegates each branch to an entry point that
already enforces the full SDK-side chain — AtlasSearch.search
(lexical) and search (vector). Both apply the
CLP find boundary, the post-stage _rperm $match, pointerFields
filtering, protectedFields redaction, and the internal-fields
denylist BEFORE returning rows. Fusion therefore operates only on
rows the caller is already allowed to read; there is no separate
hydration fetch to re-secure. The native $rankFusion path
reproduces the same enforcement inline (CLP find, post-stage ACL
$match, post-fetch redaction), mirroring search.
== Scores
The vector branch projects _vscore (Atlas vectorSearchScore),
the lexical branch _score (Atlas searchScore). The fused row
carries _hybrid_score (the summed RRF weight) and _hybrid_ranks
({ lexical: <rank>, vector: <rank> }, 1-based, absent for a branch
the row did not appear in). The raw branch scores are preserved on
the row for callers that want them.
Defined Under Namespace
Classes: FusionError
Constant Summary collapse
- DEFAULT_K_CONSTANT =
Standard RRF rank constant. Larger values flatten the contribution curve (later ranks matter more); 60 is the value from the original Cormack et al. RRF paper and the Atlas
$rankFusiondefault. 60- DEFAULT_K =
Default number of fused hits returned.
20- DEFAULT_OVERSAMPLE_MULTIPLIER =
Per-branch oversample multiplier. Each branch fetches
k * thiscandidates so a row ranked low in one branch but high in the other still has a rank to fuse. Atlas's own$rankFusionuses a comparable internal oversample. 5- MAX_K =
Hard ceiling on the fused result count, matching MAX_K.
Parse::VectorSearch::MAX_K
- PROBE_CACHE_TTL =
TTL (seconds) for the rank_fusion_supported? probe cache. A cluster gaining or losing
$rankFusionsupport is a rare, operator-driven event (an Atlas major-version upgrade), so a 1-hour cache keeps the extra probe round-trip off the hot path. 3600
Class Method Summary collapse
-
.clear_probe_cache(collection = nil) ⇒ Object
Clear the Hybrid.rank_fusion_supported? probe cache (all collections, or one).
-
.rank_fusion_supported?(collection) ⇒ Boolean
Detect whether the cluster backing
collectionsupports the native$rankFusionaggregation stage (Atlas 8.0+). -
.rrf(branches, k_constant: DEFAULT_K_CONSTANT, weights: nil) ⇒ Array<Hash>
Pure reciprocal-rank fusion.
-
.search(collection_name, lexical:, vector:, k: DEFAULT_K, fusion: nil, **scope_opts) ⇒ Array<Hash>
Run a hybrid search and return the fused raw rows.
Class Method Details
.clear_probe_cache(collection = nil) ⇒ Object
Clear the rank_fusion_supported? probe cache (all collections, or one). Mainly for tests that toggle cluster behaviour between cases.
160 161 162 163 164 165 166 167 168 |
# File 'lib/parse/vector_search/hybrid.rb', line 160 def clear_probe_cache(collection = nil) probe_mutex.synchronize do if collection probe_cache.delete(collection.to_s) else @probe_cache = {} end end end |
.rank_fusion_supported?(collection) ⇒ Boolean
Detect whether the cluster backing collection supports the
native $rankFusion aggregation stage (Atlas 8.0+).
Probe-and-cache, NOT version-string parsing: Atlas upgrades
cluster versions silently and the exact version where
$rankFusion reached general availability has moved. We send a
zero-cost behavioural probe ([{$rankFusion: {input: {}}},
{$limit: 0}]) and classify the response: success or any error
OTHER than "unknown stage" means supported; an "Unknown
aggregation stage" failure means unsupported. The result is
cached per collection for PROBE_CACHE_TTL.
144 145 146 147 148 149 150 151 152 153 |
# File 'lib/parse/vector_search/hybrid.rb', line 144 def rank_fusion_supported?(collection) key = collection.to_s now = monotonic cached = probe_cache_get(key, now) return cached unless cached.nil? supported = run_probe(key) probe_cache_put(key, supported, now) supported end |
.rrf(branches, k_constant: DEFAULT_K_CONSTANT, weights: nil) ⇒ Array<Hash>
Pure reciprocal-rank fusion. Operates on already-fetched, already-ranked branch result lists — no I/O, no ACL concerns (the rows were enforced upstream).
fused_score(d) = Σ_b weight_b / (k_constant + rank_b(d))
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/parse/vector_search/hybrid.rb', line 96 def rrf(branches, k_constant: DEFAULT_K_CONSTANT, weights: nil) unless branches.is_a?(Hash) && !branches.empty? raise FusionError, "rrf: branches must be a non-empty Hash of ranked result lists." end kc = Integer(k_constant) raise FusionError, "rrf: k_constant must be a positive integer (got #{kc})." if kc <= 0 validate_weights!(weights) acc = {} order = 0 branches.each do |branch_name, rows| weight = weight_for(weights, branch_name) next if weight.zero? Array(rows).each_with_index do |row, i| id = row_id(row) next if id.nil? rank = i + 1 entry = (acc[id] ||= { doc: row, score: 0.0, ranks: {}, seq: (order += 1) }) entry[:doc] = merge_rows(entry[:doc], row) entry[:score] += weight.to_f / (kc + rank) entry[:ranks][branch_name] = rank end end acc.values .sort_by { |e| [-e[:score], row_id(e[:doc]).to_s, e[:seq]] } .map do |e| row = e[:doc].dup row["_hybrid_score"] = e[:score] row["_hybrid_ranks"] = e[:ranks] row end end |
.search(collection_name, lexical:, vector:, k: DEFAULT_K, fusion: nil, **scope_opts) ⇒ Array<Hash>
Run a hybrid search and return the fused raw rows.
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
# File 'lib/parse/vector_search/hybrid.rb', line 200 def search(collection_name, lexical:, vector:, k: DEFAULT_K, fusion: nil, **scope_opts) require_available! fusion = symbolize(fusion || {}) lex = symbolize(lexical || {}) vec = symbolize(vector || {}) k_int = Integer(k) raise ArgumentError, "k must be in 1..#{MAX_K} (got #{k_int})." if k_int <= 0 || k_int > MAX_K unless lex[:query].is_a?(String) && !lex[:query].strip.empty? raise ArgumentError, "hybrid search: lexical[:query] must be a non-empty String." end if vec[:query_vector].nil? || vec[:field].nil? raise ArgumentError, "hybrid search: vector[:query_vector] and vector[:field] are required." end method = (fusion[:method] || :rrf).to_sym unless %i[rrf rrf_client rrf_native].include?(method) raise ArgumentError, "hybrid search: fusion[:method] must be :rrf, :rrf_client, or :rrf_native (got #{method.inspect})." end k_constant = fusion[:k_constant] || DEFAULT_K_CONSTANT weights = fusion[:weights] oversample = [k_int * DEFAULT_OVERSAMPLE_MULTIPLIER, k_int].max # NOTE (deviation from plan §8.3): the default fuses CLIENT-SIDE. # The native single-roundtrip `$rankFusion` path is OPT-IN # (`fusion: { method: :rrf_native }`) rather than the default, # because its server-side execution (and its ACL `$match` # placement) cannot be validated without an Atlas 8.0+ cluster # in CI. `rank_fusion_supported?` detection ships and is unit- # tested; the native pipeline shape is snapshot-tested; but live # results route through the always-correct, fully-enforced # two-aggregate client path unless a caller explicitly opts into # native AND the cluster supports it. Native still falls back to # the client path on any execution error. if method == :rrf_native && rank_fusion_supported?(collection_name) fused = run_native(collection_name, lex, vec, oversample, k_constant: k_constant, weights: weights, scope_opts: scope_opts) return fused.first(k_int) if fused end lexical_rows = run_lexical(collection_name, lex, oversample, scope_opts) vector_rows = run_vector(collection_name, vec, oversample, scope_opts) rrf({ lexical: lexical_rows, vector: vector_rows }, k_constant: k_constant, weights: weights).first(k_int) end |