Module: Wurk::IterableJob
- Defined in:
- lib/wurk/iterable_job.rb,
lib/wurk/iterable_job/csv_enumerator.rb,
lib/wurk/iterable_job/active_record_enumerator.rb
Overview
Iterable jobs split long-running work into small, idempotent chunks. Override ‘#build_enumerator` (yielding `[item, new_cursor]` pairs) and `#each_iteration(item, *args)`; the framework drives the loop, persists the cursor, and resumes after interruption.
Defining ‘#perform` on the including class is refused at `method_added` — IterableJob owns the run loop. User code overrides `#each_iteration`.
State lives in the ‘it-<jid>` HASH (sidekiq-free.md §1.5):
ex : execution count (int)
c : cursor (JSON string)
rt : runtime accumulated (float seconds)
cancelled : timestamp (int) if cancelled
Spec: docs/target/sidekiq-free.md §6.4.
Defined Under Namespace
Modules: MethodAddedGuard Classes: ActiveRecordEnumerator, CsvEnumerator
Constant Summary collapse
- Interrupted =
Alias to the canonical ‘Wurk::Job::Interrupted`. The exception lives on `Wurk::Job` so non-iterable code paths (manual `interrupted?` checks) can raise the same class; the interrupt-handler middleware rescues by the `Wurk::Job::Interrupted` name.
Wurk::Job::Interrupted
- STATE_TTL =
Default expiry for an iteration state HASH while the job is running or awaiting resume. Refreshed on every checkpoint.
30 * 86_400
- STATE_FLUSH_INTERVAL =
Cursor flush + cancellation poll cadence. Both share the timer so a long-running iteration that hits the 5-second mark checkpoints and checks for cross-process cancellation in the same tick.
5- CANCELLATION_PERIOD =
Shorter TTL applied once the state is marked cancelled. The HASH outlives ‘cancel!` long enough for live workers to observe the flag but is reaped well before the 30-day default would expire.
3 * 86_400
Instance Attribute Summary collapse
-
#current_object ⇒ Object
readonly
— iteration state accessors ————————————–.
Class Method Summary collapse
Instance Method Summary collapse
- #active_record_batches_enumerator(relation, cursor:) ⇒ Object
- #active_record_records_enumerator(relation, cursor:) ⇒ Object
- #active_record_relations_enumerator(relation, cursor:) ⇒ Object
- #arguments ⇒ Object
- #around_iteration ⇒ Object
-
#array_enumerator(array, cursor:) ⇒ Object
— enumerator builders (§6.4) ————————————- Helpers user code calls from ‘#build_enumerator` to get a resumable enumerator of `[item, cursor]` pairs.
-
#build_enumerator(cursor:) ⇒ Object
User overrides — must return an Enumerator yielding ‘[item, new_cursor]` pairs.
-
#cancel! ⇒ Object
Mark this iteration cancelled.
-
#cancelled? ⇒ Boolean
True once ‘cancel!` has been called locally, OR — for cross-process cancellation — once the `cancelled` field appears in the `it-<jid>` HASH.
- #csv_batches_enumerator(csv, cursor:) ⇒ Object
- #csv_enumerator(csv, cursor:) ⇒ Object
- #cursor ⇒ Object
- #each_iteration ⇒ Object
-
#iteration_key ⇒ Object
Redis HASH key holding iteration state for this job.
- #on_cancel ⇒ Object
- #on_complete ⇒ Object
- #on_resume ⇒ Object
-
#on_start ⇒ Object
— lifecycle hooks (no-op defaults; users override as needed) —–.
- #on_stop ⇒ Object
-
#perform(*args) ⇒ Object
Foundation run loop.
Instance Attribute Details
#current_object ⇒ Object (readonly)
— iteration state accessors ————————————–
122 123 124 |
# File 'lib/wurk/iterable_job.rb', line 122 def current_object @current_object end |
Class Method Details
.included(base) ⇒ Object
59 60 61 62 |
# File 'lib/wurk/iterable_job.rb', line 59 def self.included(base) base.include(Wurk::Job) base.singleton_class.prepend(MethodAddedGuard) end |
Instance Method Details
#active_record_batches_enumerator(relation, cursor:) ⇒ Object
100 101 102 |
# File 'lib/wurk/iterable_job.rb', line 100 def active_record_batches_enumerator(relation, cursor:, **) ActiveRecordEnumerator.new(relation, cursor: cursor, **).batches end |
#active_record_records_enumerator(relation, cursor:) ⇒ Object
96 97 98 |
# File 'lib/wurk/iterable_job.rb', line 96 def active_record_records_enumerator(relation, cursor:, **) ActiveRecordEnumerator.new(relation, cursor: cursor, **).records end |
#active_record_relations_enumerator(relation, cursor:) ⇒ Object
104 105 106 |
# File 'lib/wurk/iterable_job.rb', line 104 def active_record_relations_enumerator(relation, cursor:, **) ActiveRecordEnumerator.new(relation, cursor: cursor, **).relations end |
#arguments ⇒ Object
124 125 126 |
# File 'lib/wurk/iterable_job.rb', line 124 def arguments @arguments ||= [] end |
#around_iteration ⇒ Object
116 117 118 |
# File 'lib/wurk/iterable_job.rb', line 116 def around_iteration yield end |
#array_enumerator(array, cursor:) ⇒ Object
— enumerator builders (§6.4) ————————————- Helpers user code calls from ‘#build_enumerator` to get a resumable enumerator of `[item, cursor]` pairs. Cursor parity with Sidekiq: array/CSV use the integer index; ActiveRecord uses the record’s primary key.
81 82 83 84 85 86 |
# File 'lib/wurk/iterable_job.rb', line 81 def array_enumerator(array, cursor:) raise ArgumentError, 'array must be an Array' unless array.is_a?(::Array) x = array.each_with_index.drop(cursor || 0) x.to_enum { x.size } end |
#build_enumerator(cursor:) ⇒ Object
User overrides — must return an Enumerator yielding ‘[item, new_cursor]` pairs. The cursor must round-trip through JSON.
66 67 68 69 |
# File 'lib/wurk/iterable_job.rb', line 66 def build_enumerator(*, cursor:) _ = cursor raise NotImplementedError, "#{self.class} must override #build_enumerator" end |
#cancel! ⇒ Object
Mark this iteration cancelled. Sets the in-process flag immediately (so the next ‘cancelled?` check inside the run loop trips) and, when a jid is bound, writes the timestamp to the `it-<jid>` HASH so other processes observe it on their next 5-second poll.
Returns the integer epoch-seconds timestamp written.
138 139 140 141 142 143 144 |
# File 'lib/wurk/iterable_job.rb', line 138 def cancel! ts_ms = ::Process.clock_gettime(::Process::CLOCK_REALTIME, :millisecond) @cancelled_at ||= ts_ms ts = ts_ms / 1000 persist_cancellation(ts) ts end |
#cancelled? ⇒ Boolean
True once ‘cancel!` has been called locally, OR — for cross-process cancellation — once the `cancelled` field appears in the `it-<jid>` HASH. The remote check is rate-limited to once per `STATE_FLUSH_INTERVAL` to keep the hot loop cheap.
150 151 152 153 154 155 156 157 158 |
# File 'lib/wurk/iterable_job.rb', line 150 def cancelled? return true if @cancelled_at ts = poll_remote_cancellation return false unless ts @cancelled_at = ts * 1000 true end |
#csv_batches_enumerator(csv, cursor:) ⇒ Object
92 93 94 |
# File 'lib/wurk/iterable_job.rb', line 92 def csv_batches_enumerator(csv, cursor:, **) CsvEnumerator.new(csv).batches(cursor: cursor, **) end |
#csv_enumerator(csv, cursor:) ⇒ Object
88 89 90 |
# File 'lib/wurk/iterable_job.rb', line 88 def csv_enumerator(csv, cursor:) CsvEnumerator.new(csv).rows(cursor: cursor) end |
#cursor ⇒ Object
128 129 130 |
# File 'lib/wurk/iterable_job.rb', line 128 def cursor @cursor end |
#each_iteration ⇒ Object
71 72 73 |
# File 'lib/wurk/iterable_job.rb', line 71 def each_iteration(*) raise NotImplementedError, "#{self.class} must override #each_iteration" end |
#iteration_key ⇒ Object
Redis HASH key holding iteration state for this job. Wire-compat with Sidekiq’s ‘it-<jid>` schema (sidekiq-free.md §1.5).
162 163 164 |
# File 'lib/wurk/iterable_job.rb', line 162 def iteration_key "it-#{jid}" end |
#on_cancel ⇒ Object
113 |
# File 'lib/wurk/iterable_job.rb', line 113 def on_cancel; end |
#on_complete ⇒ Object
114 |
# File 'lib/wurk/iterable_job.rb', line 114 def on_complete; end |
#on_resume ⇒ Object
111 |
# File 'lib/wurk/iterable_job.rb', line 111 def on_resume; end |
#on_start ⇒ Object
— lifecycle hooks (no-op defaults; users override as needed) —–
110 |
# File 'lib/wurk/iterable_job.rb', line 110 def on_start; end |
#on_stop ⇒ Object
112 |
# File 'lib/wurk/iterable_job.rb', line 112 def on_stop; end |
#perform(*args) ⇒ Object
Foundation run loop. Loads any persisted state, drives the enumerator, checkpoints every ‘STATE_FLUSH_INTERVAL`, and on interruption persists the final cursor before re-raising so the interrupt-handler middleware can re-push the job at the head of the queue.
170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/wurk/iterable_job.rb', line 170 def perform(*args) reset_run_state(args) load_state fire_lifecycle_start @executions += 1 run_iterations(args) finalize_complete rescue Interrupted finalize_interrupted raise end |