Module: Wurk::IterableJob
- Defined in:
- lib/wurk/iterable_job.rb
Overview
Iterable jobs split long-running work into small, idempotent chunks. Override ‘#build_enumerator` (yielding `[item, new_cursor]` pairs) and `#each_iteration(item, *args)`; the framework drives the loop, persists the cursor, and resumes after interruption.
Defining ‘#perform` on the including class is refused at `method_added` — IterableJob owns the run loop. User code overrides `#each_iteration`.
State lives in the ‘it-<jid>` HASH (sidekiq-free.md §1.5):
ex : execution count (int)
c : cursor (JSON string)
rt : runtime accumulated (float seconds)
cancelled : timestamp (int) if cancelled
Spec: docs/target/sidekiq-free.md §6.4.
Defined Under Namespace
Modules: MethodAddedGuard
Constant Summary collapse
- Interrupted =
Alias to the canonical ‘Wurk::Job::Interrupted`. The exception lives on `Wurk::Job` so non-iterable code paths (manual `interrupted?` checks) can raise the same class; the interrupt-handler middleware rescues by the `Wurk::Job::Interrupted` name.
Wurk::Job::Interrupted
- STATE_TTL =
Default expiry for an iteration state HASH while the job is running or awaiting resume. Refreshed on every checkpoint.
30 * 86_400
- STATE_FLUSH_INTERVAL =
Cursor flush + cancellation poll cadence. Both share the timer so a long-running iteration that hits the 5-second mark checkpoints and checks for cross-process cancellation in the same tick.
5- CANCELLATION_PERIOD =
Shorter TTL applied once the state is marked cancelled. The HASH outlives ‘cancel!` long enough for live workers to observe the flag but is reaped well before the 30-day default would expire.
3 * 86_400
Instance Attribute Summary collapse
-
#current_object ⇒ Object
readonly
— iteration state accessors ————————————–.
Class Method Summary collapse
Instance Method Summary collapse
- #arguments ⇒ Object
- #around_iteration ⇒ Object
-
#build_enumerator(cursor:) ⇒ Object
User overrides — must return an Enumerator yielding ‘[item, new_cursor]` pairs.
-
#cancel! ⇒ Object
Mark this iteration cancelled.
-
#cancelled? ⇒ Boolean
True once ‘cancel!` has been called locally, OR — for cross-process cancellation — once the `cancelled` field appears in the `it-<jid>` HASH.
- #cursor ⇒ Object
- #each_iteration ⇒ Object
-
#iteration_key ⇒ Object
Redis HASH key holding iteration state for this job.
- #on_cancel ⇒ Object
- #on_complete ⇒ Object
- #on_resume ⇒ Object
-
#on_start ⇒ Object
— lifecycle hooks (no-op defaults; users override as needed) —–.
- #on_stop ⇒ Object
-
#perform(*args) ⇒ Object
Foundation run loop.
Instance Attribute Details
#current_object ⇒ Object (readonly)
— iteration state accessors ————————————–
87 88 89 |
# File 'lib/wurk/iterable_job.rb', line 87 def current_object @current_object end |
Class Method Details
.included(base) ⇒ Object
57 58 59 60 |
# File 'lib/wurk/iterable_job.rb', line 57 def self.included(base) base.include(Wurk::Job) base.singleton_class.prepend(MethodAddedGuard) end |
Instance Method Details
#arguments ⇒ Object
89 90 91 |
# File 'lib/wurk/iterable_job.rb', line 89 def arguments @arguments ||= [] end |
#around_iteration ⇒ Object
81 82 83 |
# File 'lib/wurk/iterable_job.rb', line 81 def around_iteration yield end |
#build_enumerator(cursor:) ⇒ Object
User overrides — must return an Enumerator yielding ‘[item, new_cursor]` pairs. The cursor must round-trip through JSON.
64 65 66 67 |
# File 'lib/wurk/iterable_job.rb', line 64 def build_enumerator(*, cursor:) _ = cursor raise NotImplementedError, "#{self.class} must override #build_enumerator" end |
#cancel! ⇒ Object
Mark this iteration cancelled. Sets the in-process flag immediately (so the next ‘cancelled?` check inside the run loop trips) and, when a jid is bound, writes the timestamp to the `it-<jid>` HASH so other processes observe it on their next 5-second poll.
Returns the integer epoch-seconds timestamp written.
103 104 105 106 107 108 109 |
# File 'lib/wurk/iterable_job.rb', line 103 def cancel! ts_ms = ::Process.clock_gettime(::Process::CLOCK_REALTIME, :millisecond) @cancelled_at ||= ts_ms ts = ts_ms / 1000 persist_cancellation(ts) ts end |
#cancelled? ⇒ Boolean
True once ‘cancel!` has been called locally, OR — for cross-process cancellation — once the `cancelled` field appears in the `it-<jid>` HASH. The remote check is rate-limited to once per `STATE_FLUSH_INTERVAL` to keep the hot loop cheap.
115 116 117 118 119 120 121 122 123 |
# File 'lib/wurk/iterable_job.rb', line 115 def cancelled? return true if @cancelled_at ts = poll_remote_cancellation return false unless ts @cancelled_at = ts * 1000 true end |
#cursor ⇒ Object
93 94 95 |
# File 'lib/wurk/iterable_job.rb', line 93 def cursor @cursor end |
#each_iteration ⇒ Object
69 70 71 |
# File 'lib/wurk/iterable_job.rb', line 69 def each_iteration(*) raise NotImplementedError, "#{self.class} must override #each_iteration" end |
#iteration_key ⇒ Object
Redis HASH key holding iteration state for this job. Wire-compat with Sidekiq’s ‘it-<jid>` schema (sidekiq-free.md §1.5).
127 128 129 |
# File 'lib/wurk/iterable_job.rb', line 127 def iteration_key "it-#{jid}" end |
#on_cancel ⇒ Object
78 |
# File 'lib/wurk/iterable_job.rb', line 78 def on_cancel; end |
#on_complete ⇒ Object
79 |
# File 'lib/wurk/iterable_job.rb', line 79 def on_complete; end |
#on_resume ⇒ Object
76 |
# File 'lib/wurk/iterable_job.rb', line 76 def on_resume; end |
#on_start ⇒ Object
— lifecycle hooks (no-op defaults; users override as needed) —–
75 |
# File 'lib/wurk/iterable_job.rb', line 75 def on_start; end |
#on_stop ⇒ Object
77 |
# File 'lib/wurk/iterable_job.rb', line 77 def on_stop; end |
#perform(*args) ⇒ Object
Foundation run loop. Loads any persisted state, drives the enumerator, checkpoints every ‘STATE_FLUSH_INTERVAL`, and on interruption persists the final cursor before re-raising so the interrupt-handler middleware can re-push the job at the head of the queue.
135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/wurk/iterable_job.rb', line 135 def perform(*args) reset_run_state(args) load_state fire_lifecycle_start @executions += 1 run_iterations(args) finalize_complete rescue Interrupted finalize_interrupted raise end |