Class: Parse::Embeddings::BatchEmbedder

Inherits:
Object
  • Object
show all
Defined in:
lib/parse/embeddings/batch_embedder.rb

Overview

Batch-level orchestration for bulk embedding jobs.

Provider#embed_text_batched only slices input into provider-sized chunks; any retry/backoff lives inside each provider's single HTTP call. That is the wrong layer for bulk work: a 50k-document backfill needs batch-level pacing (stay under the provider's requests-per-minute budget across calls) and batch-level backoff (a 429 after the provider's internal retries are exhausted should pause the whole job, not kill it). BatchEmbedder wraps any registered provider with both.

== Retry classification

By default a batch is retried when the provider raises a Error subclass whose class name ends in RateLimitError or TransientError — the convention every bundled provider follows (OpenAI::RateLimitError, Voyage::TransientError, …). Pass retry_on: with explicit exception classes to override. Non-retryable errors (auth, bad-request, response-contract violations) propagate immediately.

Vectors are returned aligned 1:1 with the input, identical to embed_text on the wrapped provider.

Examples:

Backfill with pacing and backoff

embedder = Parse::Embeddings::BatchEmbedder.new(
  Parse::Embeddings.provider(:openai),
  requests_per_minute: 60,
  max_attempts: 5,
)
vectors = embedder.embed_text(texts, input_type: :search_document)

Progress reporting

embedder = Parse::Embeddings::BatchEmbedder.new(provider,
  on_progress: ->(done:, total:, batch_index:, batch_count:) {
    puts "#{done}/#{total}"
  })

Defined Under Namespace

Classes: BatchFailed

Constant Summary collapse

RETRYABLE_NAME_SUFFIXES =
%w[RateLimitError TransientError].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(provider, batch_size: nil, requests_per_minute: nil, max_attempts: 5, base_delay: 2.0, max_delay: 60.0, jitter: 0.25, retry_on: nil, on_progress: nil) ⇒ BatchEmbedder

Returns a new instance of BatchEmbedder.

Parameters:

  • provider (Provider)

    any registered embedding provider.

  • batch_size (Integer, nil) (defaults to: nil)

    inputs per provider call. Defaults to the provider's own Provider#embed_batch_size hint, falling back to 64 when the provider has none.

  • requests_per_minute (Numeric, nil) (defaults to: nil)

    batch-level pacing budget. When set, consecutive provider calls are spaced at least 60.0 / requests_per_minute seconds apart. nil disables pacing.

  • max_attempts (Integer) (defaults to: 5)

    attempts per batch (1 = no retry).

  • base_delay (Numeric) (defaults to: 2.0)

    first backoff delay in seconds; doubles per attempt.

  • max_delay (Numeric) (defaults to: 60.0)

    backoff ceiling in seconds.

  • jitter (Numeric) (defaults to: 0.25)

    random multiplier range added to each delay (delay * (1 + rand * jitter)); spreads thundering herds when several workers back off together.

  • retry_on (Array<Class>, nil) (defaults to: nil)

    explicit retryable exception classes; nil uses the name-suffix convention described above.

  • on_progress (#call, nil) (defaults to: nil)

    callable invoked after each successful batch with done:, total:, batch_index:, batch_count:.

Raises:

  • (ArgumentError)


85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/parse/embeddings/batch_embedder.rb', line 85

def initialize(provider, batch_size: nil, requests_per_minute: nil,
               max_attempts: 5, base_delay: 2.0, max_delay: 60.0,
               jitter: 0.25, retry_on: nil, on_progress: nil)
  unless provider.is_a?(Provider)
    raise ArgumentError,
          "Parse::Embeddings::BatchEmbedder expects a Parse::Embeddings::Provider " \
          "(got #{provider.class})."
  end
  @provider = provider
  @batch_size = batch_size ? Integer(batch_size) : nil
  raise ArgumentError, "batch_size must be positive" if @batch_size && @batch_size <= 0
  @min_interval = requests_per_minute ? (60.0 / Float(requests_per_minute)) : nil
  @max_attempts = Integer(max_attempts)
  raise ArgumentError, "max_attempts must be >= 1" if @max_attempts < 1
  @base_delay = Float(base_delay)
  @max_delay = Float(max_delay)
  @jitter = Float(jitter)
  @retry_on = retry_on && Array(retry_on)
  @on_progress = on_progress
  @last_call_at = nil
end

Instance Attribute Details

#providerProvider (readonly)

Returns the wrapped provider.

Returns:



64
65
66
# File 'lib/parse/embeddings/batch_embedder.rb', line 64

def provider
  @provider
end

Instance Method Details

#embed_text(strings, input_type: :search_document) ⇒ Array<Array<Float>>

Embed strings through the wrapped provider with pacing and batch-level backoff.

Parameters:

Returns:

  • (Array<Array<Float>>)

    aligned 1:1 with strings.

Raises:

  • (BatchFailed)

    when a batch exhausts its attempts.



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/parse/embeddings/batch_embedder.rb', line 114

def embed_text(strings, input_type: :search_document)
  unless strings.is_a?(Array)
    raise ArgumentError,
          "Parse::Embeddings::BatchEmbedder#embed_text expects Array<String> " \
          "(got #{strings.class})."
  end
  return [] if strings.empty?

  size = @batch_size || @provider.embed_batch_size || 64
  batches = strings.each_slice(size).to_a
  out = []
  batches.each_with_index do |batch, idx|
    out.concat(run_batch(batch, input_type, idx, out.length))
    if @on_progress
      @on_progress.call(done: out.length, total: strings.length,
                        batch_index: idx, batch_count: batches.length)
    end
  end
  out
end