Class: JobIteration::ActiveRecordCursor

Inherits:
Object
  • Object
show all
Includes:
Comparable
Defined in:
lib/job-iteration/active_record_cursor.rb

Overview

Curious about how this works from the SQL perspective? Check “Pagination Done the Right way”: bit.ly/2Rq7iPF

Defined Under Namespace

Classes: ConditionNotSupportedError

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(relation, columns, position, instance, instances) ⇒ ActiveRecordCursor

Returns a new instance of ActiveRecordCursor.

Raises:

  • (ArgumentError)


21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/job-iteration/active_record_cursor.rb', line 21

def initialize(relation, columns, position, instance, instances)
  @columns = if columns
    Array(columns)
  else
    Array(relation.primary_key).map { |pk| "#{relation.table_name}.#{pk}" }
  end
  self.position = Array.wrap(position)
  raise ArgumentError, "Must specify at least one column" if columns.empty?
  if relation.joins_values.present? && !@columns.all? { |column| column.to_s.include?(".") }
    raise ArgumentError, "You need to specify fully-qualified columns if you join a table"
  end

  if relation.arel.orders.present? || relation.arel.taken.present?
    raise ConditionNotSupportedError
  end

  @base_relation = relation.reorder(@columns.join(","))

  if instances.present?
    pk = relation.primary_key
    unless pk.is_a?(String) && relation.klass.column_for_attribute(pk).type == :integer
      raise ArgumentError, "Parallel iteration requires a single integer primary key. " \
        "For more complex cases, use the enumerator_builder.parallel primitive directly."
    end

    @base_relation = @base_relation.where("#{relation.table_name}.#{pk} % ? = ?", instances, instance)
  end

  @reached_end = false
end

Instance Attribute Details

#positionObject

Returns the value of attribute position.



9
10
11
# File 'lib/job-iteration/active_record_cursor.rb', line 9

def position
  @position
end

#reached_endObject

Returns the value of attribute reached_end.



10
11
12
# File 'lib/job-iteration/active_record_cursor.rb', line 10

def reached_end
  @reached_end
end

Instance Method Details

#<=>(other) ⇒ Object



52
53
54
55
56
57
58
# File 'lib/job-iteration/active_record_cursor.rb', line 52

def <=>(other)
  if reached_end != other.reached_end
    reached_end ? 1 : -1
  else
    position <=> other.position
  end
end

#next_batch(batch_size) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/job-iteration/active_record_cursor.rb', line 78

def next_batch(batch_size)
  return if @reached_end

  relation = @base_relation.limit(batch_size)

  if (conditions = self.conditions).any?
    relation = relation.where(*conditions)
  end

  records = relation.uncached do
    relation.to_a
  end

  update_from_record(records.last) unless records.empty?
  @reached_end = records.size < batch_size

  records.empty? ? nil : records
end

#update_from_record(record) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
# File 'lib/job-iteration/active_record_cursor.rb', line 66

def update_from_record(record)
  self.position = @columns.map do |column|
    method = column.to_s.split(".").last

    if method == "id"
      record.id_value
    else
      record.send(method.to_sym)
    end
  end
end