Module: Sidekiq::Job::Iterable

Includes:
Enumerators
Defined in:
lib/sidekiq/job/iterable.rb,
lib/sidekiq/job/iterable/enumerators.rb,
lib/sidekiq/job/iterable/csv_enumerator.rb,
lib/sidekiq/job/iterable/active_record_enumerator.rb

Defined Under Namespace

Modules: ClassMethods, Enumerators Classes: ActiveRecordEnumerator, CsvEnumerator

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Enumerators

#active_record_batches_enumerator, #active_record_records_enumerator, #active_record_relations_enumerator, #array_enumerator, #csv_batches_enumerator, #csv_enumerator

Class Method Details

.included(base) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



13
14
15
# File 'lib/sidekiq/job/iterable.rb', line 13

def self.included(base)
  base.extend(ClassMethods)
end

Instance Method Details

#around_iterationObject

A hook to override that will be called around each iteration.

Can be useful for some metrics collection, performance tracking etc.



46
47
48
# File 'lib/sidekiq/job/iterable.rb', line 46

def around_iteration
  yield
end

#build_enumeratorEnumerator

The enumerator to be iterated over.

Returns:

  • (Enumerator)

Raises:

  • (NotImplementedError)

    with a message advising subclasses to implement an override for this method.



74
75
76
# File 'lib/sidekiq/job/iterable.rb', line 74

def build_enumerator(*)
  raise NotImplementedError, "#{self.class.name} must implement a '#build_enumerator' method"
end

#each_iterationvoid

This method returns an undefined value.

The action to be performed on each item from the enumerator.

Raises:

  • (NotImplementedError)

    with a message advising subclasses to implement an override for this method.



85
86
87
# File 'lib/sidekiq/job/iterable.rb', line 85

def each_iteration(*)
  raise NotImplementedError, "#{self.class.name} must implement an '#each_iteration' method"
end

#initializeObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



26
27
28
29
30
31
32
33
# File 'lib/sidekiq/job/iterable.rb', line 26

def initialize
  super

  @_executions = 0
  @_cursor = nil
  @_start_time = nil
  @_runtime = 0
end

#iteration_keyObject



89
90
91
# File 'lib/sidekiq/job/iterable.rb', line 89

def iteration_key
  "it-#{jid}"
end

#on_completeObject

A hook to override that will be called when the job finished iterating.



64
65
# File 'lib/sidekiq/job/iterable.rb', line 64

def on_complete
end

#on_resumeObject

A hook to override that will be called when the job resumes iterating.



52
53
# File 'lib/sidekiq/job/iterable.rb', line 52

def on_resume
end

#on_startObject

A hook to override that will be called when the job starts iterating.

It is called only once, for the first time.



39
40
# File 'lib/sidekiq/job/iterable.rb', line 39

def on_start
end

#on_stopObject

A hook to override that will be called each time the job is interrupted.

This can be due to interruption or sidekiq stopping.



59
60
# File 'lib/sidekiq/job/iterable.rb', line 59

def on_stop
end

#perform(*arguments) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/sidekiq/job/iterable.rb', line 94

def perform(*arguments)
  fetch_previous_iteration_state

  @_executions += 1
  @_start_time = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)

  enumerator = build_enumerator(*arguments, cursor: @_cursor)
  unless enumerator
    logger.info("'#build_enumerator' returned nil, skipping the job.")
    return
  end

  assert_enumerator!(enumerator)

  if @_executions == 1
    on_start
  else
    on_resume
  end

  completed = catch(:abort) do
    iterate_with_enumerator(enumerator, arguments)
  end

  on_stop
  completed = handle_completed(completed)

  if completed
    on_complete
    cleanup
  else
    reenqueue_iteration_job
  end
end