Module: Chewy::Search::Scrolling

Included in:
Request
Defined in:
lib/chewy/search/scrolling.rb

Overview

This module contains batch requests DSL via ES scroll API. All the methods are optimized on memory consumption, they are not caching anythig, so use them when you need to do some single-run stuff on a huge amount of documents. Don’t forget to tune the ‘scroll` parameter for long-lasting actions. All the scroll methods respect the limit value if provided.

Instance Method Summary collapse

Instance Method Details

#scroll_batches(batch_size: 1000, scroll: '1m') {|batch| ... } ⇒ Object #scroll_batches(batch_size: 1000, scroll: '1m') ⇒ Enumerator

Iterates through the documents of the scope in batches. Limit if overrided by the ‘batch_size`. There are 2 possible use-cases: with a block or without.

Overloads:

  • #scroll_batches(batch_size: 1000, scroll: '1m') {|batch| ... } ⇒ Object

    Examples:

    PlaceIndex.scroll_batches { |batch| batch.each { |hit| p hit['_id'] } }

    Yield Parameters:

    • batch (Array<Hash>)

      block is executed for each batch of hits

  • #scroll_batches(batch_size: 1000, scroll: '1m') ⇒ Enumerator

    Returns a standard ruby Enumerator.

    Examples:

    PlaceIndex.scroll_batches.flat_map { |batch| batch.map { |hit| hit['_id'] } }

    Returns:

    • (Enumerator)

      a standard ruby Enumerator

Parameters:

  • batch_size (Integer) (defaults to: Request::DEFAULT_BATCH_SIZE)

    batch size obviously, replaces ‘size` query parameter

  • scroll (String) (defaults to: Request::DEFAULT_SCROLL)

    cursor expiration time



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/chewy/search/scrolling.rb', line 27

def scroll_batches(batch_size: Request::DEFAULT_BATCH_SIZE, scroll: Request::DEFAULT_SCROLL)
  return enum_for(:scroll_batches, batch_size: batch_size, scroll: scroll) unless block_given?

  result = perform(size: batch_size, scroll: scroll)
  total = [raw_limit_value, result.fetch('hits', {}).fetch('total', {}).fetch('value', 0)].compact.min

  total_batches = total / batch_size
  last_batch_size = total % batch_size

  total_batches += 1 if last_batch_size != 0

  scroll_id = nil

  total_batches.times do |batch_counter|
    last_run = total_batches - 1 == batch_counter

    hits = result.fetch('hits', {}).fetch('hits', [])
    hits = hits.first(last_batch_size) if last_run && last_batch_size != 0

    raise Chewy::MissingHitsInScrollError if hits.empty?

    yield(hits) if hits.present?
    scroll_id = result['_scroll_id']

    break if result['terminated_early']

    result = perform_scroll(scroll: scroll, scroll_id: scroll_id) unless last_run
  end
ensure
  Chewy.client.clear_scroll(body: {scroll_id: scroll_id}) if scroll_id
end

#scroll_hits(batch_size: 1000, scroll: '1m') {|hit| ... } ⇒ Object #scroll_hits(batch_size: 1000, scroll: '1m') ⇒ Enumerator

Iterates through the documents of the scope in batches. Yields each hit separately.

Overloads:

  • #scroll_hits(batch_size: 1000, scroll: '1m') {|hit| ... } ⇒ Object

    Examples:

    PlaceIndex.scroll_hits { |hit| p hit['_id'] }

    Yield Parameters:

    • hit (Hash)

      block is executed for each hit

  • #scroll_hits(batch_size: 1000, scroll: '1m') ⇒ Enumerator

    Returns a standard ruby Enumerator.

    Examples:

    PlaceIndex.scroll_hits.map { |hit| hit['_id'] }

    Returns:

    • (Enumerator)

      a standard ruby Enumerator

Parameters:

  • batch_size (Integer) (defaults to: 1000)

    batch size obviously, replaces ‘size` query parameter

  • scroll (String) (defaults to: '1m')

    cursor expiration time



74
75
76
77
78
79
80
# File 'lib/chewy/search/scrolling.rb', line 74

def scroll_hits(**options, &block)
  return enum_for(:scroll_hits, **options) unless block_given?

  scroll_batches(**options).each do |batch|
    batch.each(&block)
  end
end

#scroll_objects(batch_size: 1000, scroll: '1m') {|record| ... } ⇒ Object #scroll_objects(batch_size: 1000, scroll: '1m') ⇒ Enumerator Also known as: scroll_records, scroll_documents

Note:

If the record is not found it yields nil instead.

Iterates through the documents of the scope in batches. Performs load operation for each batch and then yields each loaded ORM/ODM object. Uses Request#load passed options for loading.

Overloads:

  • #scroll_objects(batch_size: 1000, scroll: '1m') {|record| ... } ⇒ Object

    Examples:

    PlaceIndex.scroll_objects { |record| p record.id }

    Yield Parameters:

    • record (Object)

      block is executed for each record loaded

  • #scroll_objects(batch_size: 1000, scroll: '1m') ⇒ Enumerator

    Returns a standard ruby Enumerator.

    Examples:

    PlaceIndex.scroll_objects.map { |record| record.id }

    Returns:

    • (Enumerator)

      a standard ruby Enumerator

Parameters:

  • batch_size (Integer) (defaults to: 1000)

    batch size obviously, replaces ‘size` query parameter

  • scroll (String) (defaults to: '1m')

    cursor expiration time

See Also:



126
127
128
129
130
131
132
133
# File 'lib/chewy/search/scrolling.rb', line 126

def scroll_objects(**options, &block)
  return enum_for(:scroll_objects, **options) unless block_given?

  except(:source, :stored_fields, :script_fields, :docvalue_fields)
    .source(false).scroll_batches(**options).each do |batch|
      loader.load(batch).each(&block)
    end
end

#scroll_wrappers(batch_size: 1000, scroll: '1m') {|object| ... } ⇒ Object #scroll_wrappers(batch_size: 1000, scroll: '1m') ⇒ Enumerator

Iterates through the documents of the scope in batches. Yields each hit wrapped with Index.

Overloads:

  • #scroll_wrappers(batch_size: 1000, scroll: '1m') {|object| ... } ⇒ Object

    Examples:

    PlaceIndex.scroll_wrappers { |object| p object.id }

    Yield Parameters:

    • object (Chewy::Index)

      block is executed for each hit object

  • #scroll_wrappers(batch_size: 1000, scroll: '1m') ⇒ Enumerator

    Returns a standard ruby Enumerator.

    Examples:

    PlaceIndex.scroll_wrappers.map { |object| object.id }

    Returns:

    • (Enumerator)

      a standard ruby Enumerator

Parameters:

  • batch_size (Integer) (defaults to: 1000)

    batch size obviously, replaces ‘size` query parameter

  • scroll (String) (defaults to: '1m')

    cursor expiration time



98
99
100
101
102
103
104
# File 'lib/chewy/search/scrolling.rb', line 98

def scroll_wrappers(**options)
  return enum_for(:scroll_wrappers, **options) unless block_given?

  scroll_hits(**options).each do |hit|
    yield loader.derive_index(hit['_index']).build(hit)
  end
end