Module: Wurk::IterableJob

Defined in:
lib/wurk/iterable_job.rb,
lib/wurk/iterable_job/csv_enumerator.rb,
lib/wurk/iterable_job/active_record_enumerator.rb

Overview

Iterable jobs split long-running work into small, idempotent chunks. Override ‘#build_enumerator` (yielding `[item, new_cursor]` pairs) and `#each_iteration(item, *args)`; the framework drives the loop, persists the cursor, and resumes after interruption.

Defining ‘#perform` on the including class is refused at `method_added` — IterableJob owns the run loop. User code overrides `#each_iteration`.

State lives in the ‘it-<jid>` HASH (sidekiq-free.md §1.5):

ex        : execution count (int)
c         : cursor (JSON string)
rt        : runtime accumulated (float seconds)
cancelled : timestamp (int) if cancelled

Spec: docs/target/sidekiq-free.md §6.4.

Defined Under Namespace

Modules: MethodAddedGuard Classes: ActiveRecordEnumerator, CsvEnumerator

Constant Summary collapse

Interrupted =

Alias to the canonical ‘Wurk::Job::Interrupted`. The exception lives on `Wurk::Job` so non-iterable code paths (manual `interrupted?` checks) can raise the same class; the interrupt-handler middleware rescues by the `Wurk::Job::Interrupted` name.

Wurk::Job::Interrupted
STATE_TTL =

Default expiry for an iteration state HASH while the job is running or awaiting resume. Refreshed on every checkpoint.

30 * 86_400
STATE_FLUSH_INTERVAL =

Cursor flush + cancellation poll cadence. Both share the timer so a long-running iteration that hits the 5-second mark checkpoints and checks for cross-process cancellation in the same tick.

5
CANCELLATION_PERIOD =

Shorter TTL applied once the state is marked cancelled. The HASH outlives ‘cancel!` long enough for live workers to observe the flag but is reaped well before the 30-day default would expire.

3 * 86_400

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#current_objectObject (readonly)

— iteration state accessors ————————————–



122
123
124
# File 'lib/wurk/iterable_job.rb', line 122

def current_object
  @current_object
end

Class Method Details

.included(base) ⇒ Object



59
60
61
62
# File 'lib/wurk/iterable_job.rb', line 59

def self.included(base)
  base.include(Wurk::Job)
  base.singleton_class.prepend(MethodAddedGuard)
end

Instance Method Details

#active_record_batches_enumerator(relation, cursor:) ⇒ Object



100
101
102
# File 'lib/wurk/iterable_job.rb', line 100

def active_record_batches_enumerator(relation, cursor:, **)
  ActiveRecordEnumerator.new(relation, cursor: cursor, **).batches
end

#active_record_records_enumerator(relation, cursor:) ⇒ Object



96
97
98
# File 'lib/wurk/iterable_job.rb', line 96

def active_record_records_enumerator(relation, cursor:, **)
  ActiveRecordEnumerator.new(relation, cursor: cursor, **).records
end

#active_record_relations_enumerator(relation, cursor:) ⇒ Object



104
105
106
# File 'lib/wurk/iterable_job.rb', line 104

def active_record_relations_enumerator(relation, cursor:, **)
  ActiveRecordEnumerator.new(relation, cursor: cursor, **).relations
end

#argumentsObject



124
125
126
# File 'lib/wurk/iterable_job.rb', line 124

def arguments
  @arguments ||= []
end

#around_iterationObject



116
117
118
# File 'lib/wurk/iterable_job.rb', line 116

def around_iteration
  yield
end

#array_enumerator(array, cursor:) ⇒ Object

— enumerator builders (§6.4) ————————————- Helpers user code calls from ‘#build_enumerator` to get a resumable enumerator of `[item, cursor]` pairs. Cursor parity with Sidekiq: array/CSV use the integer index; ActiveRecord uses the record’s primary key.

Raises:

  • (ArgumentError)


81
82
83
84
85
86
# File 'lib/wurk/iterable_job.rb', line 81

def array_enumerator(array, cursor:)
  raise ArgumentError, 'array must be an Array' unless array.is_a?(::Array)

  x = array.each_with_index.drop(cursor || 0)
  x.to_enum { x.size }
end

#build_enumerator(cursor:) ⇒ Object

User overrides — must return an Enumerator yielding ‘[item, new_cursor]` pairs. The cursor must round-trip through JSON.

Raises:

  • (NotImplementedError)


66
67
68
69
# File 'lib/wurk/iterable_job.rb', line 66

def build_enumerator(*, cursor:)
  _ = cursor
  raise NotImplementedError, "#{self.class} must override #build_enumerator"
end

#cancel!Object

Mark this iteration cancelled. Sets the in-process flag immediately (so the next ‘cancelled?` check inside the run loop trips) and, when a jid is bound, writes the timestamp to the `it-<jid>` HASH so other processes observe it on their next 5-second poll.

Returns the integer epoch-seconds timestamp written.



138
139
140
141
142
143
144
# File 'lib/wurk/iterable_job.rb', line 138

def cancel!
  ts_ms = ::Process.clock_gettime(::Process::CLOCK_REALTIME, :millisecond)
  @cancelled_at ||= ts_ms
  ts = ts_ms / 1000
  persist_cancellation(ts)
  ts
end

#cancelled?Boolean

True once ‘cancel!` has been called locally, OR — for cross-process cancellation — once the `cancelled` field appears in the `it-<jid>` HASH. The remote check is rate-limited to once per `STATE_FLUSH_INTERVAL` to keep the hot loop cheap.

Returns:

  • (Boolean)


150
151
152
153
154
155
156
157
158
# File 'lib/wurk/iterable_job.rb', line 150

def cancelled?
  return true if @cancelled_at

  ts = poll_remote_cancellation
  return false unless ts

  @cancelled_at = ts * 1000
  true
end

#csv_batches_enumerator(csv, cursor:) ⇒ Object



92
93
94
# File 'lib/wurk/iterable_job.rb', line 92

def csv_batches_enumerator(csv, cursor:, **)
  CsvEnumerator.new(csv).batches(cursor: cursor, **)
end

#csv_enumerator(csv, cursor:) ⇒ Object



88
89
90
# File 'lib/wurk/iterable_job.rb', line 88

def csv_enumerator(csv, cursor:)
  CsvEnumerator.new(csv).rows(cursor: cursor)
end

#cursorObject



128
129
130
# File 'lib/wurk/iterable_job.rb', line 128

def cursor
  @cursor
end

#each_iterationObject

Raises:

  • (NotImplementedError)


71
72
73
# File 'lib/wurk/iterable_job.rb', line 71

def each_iteration(*)
  raise NotImplementedError, "#{self.class} must override #each_iteration"
end

#iteration_keyObject

Redis HASH key holding iteration state for this job. Wire-compat with Sidekiq’s ‘it-<jid>` schema (sidekiq-free.md §1.5).



162
163
164
# File 'lib/wurk/iterable_job.rb', line 162

def iteration_key
  "it-#{jid}"
end

#on_cancelObject



113
# File 'lib/wurk/iterable_job.rb', line 113

def on_cancel; end

#on_completeObject



114
# File 'lib/wurk/iterable_job.rb', line 114

def on_complete; end

#on_resumeObject



111
# File 'lib/wurk/iterable_job.rb', line 111

def on_resume; end

#on_startObject

— lifecycle hooks (no-op defaults; users override as needed) —–



110
# File 'lib/wurk/iterable_job.rb', line 110

def on_start; end

#on_stopObject



112
# File 'lib/wurk/iterable_job.rb', line 112

def on_stop; end

#perform(*args) ⇒ Object

Foundation run loop. Loads any persisted state, drives the enumerator, checkpoints every ‘STATE_FLUSH_INTERVAL`, and on interruption persists the final cursor before re-raising so the interrupt-handler middleware can re-push the job at the head of the queue.



170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/wurk/iterable_job.rb', line 170

def perform(*args)
  reset_run_state(args)
  load_state
  fire_lifecycle_start
  @executions += 1

  run_iterations(args)

  finalize_complete
rescue Interrupted
  finalize_interrupted
  raise
end