Module: Wurk::IterableJob

Defined in:
lib/wurk/iterable_job.rb

Overview

Iterable jobs split long-running work into small, idempotent chunks. Override ‘#build_enumerator` (yielding `[item, new_cursor]` pairs) and `#each_iteration(item, *args)`; the framework drives the loop, persists the cursor, and resumes after interruption.

Defining ‘#perform` on the including class is refused at `method_added` — IterableJob owns the run loop. User code overrides `#each_iteration`.

State lives in the ‘it-<jid>` HASH (sidekiq-free.md §1.5):

ex        : execution count (int)
c         : cursor (JSON string)
rt        : runtime accumulated (float seconds)
cancelled : timestamp (int) if cancelled

Spec: docs/target/sidekiq-free.md §6.4.

Defined Under Namespace

Modules: MethodAddedGuard

Constant Summary collapse

Interrupted =

Alias to the canonical ‘Wurk::Job::Interrupted`. The exception lives on `Wurk::Job` so non-iterable code paths (manual `interrupted?` checks) can raise the same class; the interrupt-handler middleware rescues by the `Wurk::Job::Interrupted` name.

Wurk::Job::Interrupted
STATE_TTL =

Default expiry for an iteration state HASH while the job is running or awaiting resume. Refreshed on every checkpoint.

30 * 86_400
STATE_FLUSH_INTERVAL =

Cursor flush + cancellation poll cadence. Both share the timer so a long-running iteration that hits the 5-second mark checkpoints and checks for cross-process cancellation in the same tick.

5
CANCELLATION_PERIOD =

Shorter TTL applied once the state is marked cancelled. The HASH outlives ‘cancel!` long enough for live workers to observe the flag but is reaped well before the 30-day default would expire.

3 * 86_400

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#current_objectObject (readonly)

— iteration state accessors ————————————–



87
88
89
# File 'lib/wurk/iterable_job.rb', line 87

def current_object
  @current_object
end

Class Method Details

.included(base) ⇒ Object



57
58
59
60
# File 'lib/wurk/iterable_job.rb', line 57

def self.included(base)
  base.include(Wurk::Job)
  base.singleton_class.prepend(MethodAddedGuard)
end

Instance Method Details

#argumentsObject



89
90
91
# File 'lib/wurk/iterable_job.rb', line 89

def arguments
  @arguments ||= []
end

#around_iterationObject



81
82
83
# File 'lib/wurk/iterable_job.rb', line 81

def around_iteration
  yield
end

#build_enumerator(cursor:) ⇒ Object

User overrides — must return an Enumerator yielding ‘[item, new_cursor]` pairs. The cursor must round-trip through JSON.

Raises:

  • (NotImplementedError)


64
65
66
67
# File 'lib/wurk/iterable_job.rb', line 64

def build_enumerator(*, cursor:)
  _ = cursor
  raise NotImplementedError, "#{self.class} must override #build_enumerator"
end

#cancel!Object

Mark this iteration cancelled. Sets the in-process flag immediately (so the next ‘cancelled?` check inside the run loop trips) and, when a jid is bound, writes the timestamp to the `it-<jid>` HASH so other processes observe it on their next 5-second poll.

Returns the integer epoch-seconds timestamp written.



103
104
105
106
107
108
109
# File 'lib/wurk/iterable_job.rb', line 103

def cancel!
  ts_ms = ::Process.clock_gettime(::Process::CLOCK_REALTIME, :millisecond)
  @cancelled_at ||= ts_ms
  ts = ts_ms / 1000
  persist_cancellation(ts)
  ts
end

#cancelled?Boolean

True once ‘cancel!` has been called locally, OR — for cross-process cancellation — once the `cancelled` field appears in the `it-<jid>` HASH. The remote check is rate-limited to once per `STATE_FLUSH_INTERVAL` to keep the hot loop cheap.

Returns:

  • (Boolean)


115
116
117
118
119
120
121
122
123
# File 'lib/wurk/iterable_job.rb', line 115

def cancelled?
  return true if @cancelled_at

  ts = poll_remote_cancellation
  return false unless ts

  @cancelled_at = ts * 1000
  true
end

#cursorObject



93
94
95
# File 'lib/wurk/iterable_job.rb', line 93

def cursor
  @cursor
end

#each_iterationObject

Raises:

  • (NotImplementedError)


69
70
71
# File 'lib/wurk/iterable_job.rb', line 69

def each_iteration(*)
  raise NotImplementedError, "#{self.class} must override #each_iteration"
end

#iteration_keyObject

Redis HASH key holding iteration state for this job. Wire-compat with Sidekiq’s ‘it-<jid>` schema (sidekiq-free.md §1.5).



127
128
129
# File 'lib/wurk/iterable_job.rb', line 127

def iteration_key
  "it-#{jid}"
end

#on_cancelObject



78
# File 'lib/wurk/iterable_job.rb', line 78

def on_cancel; end

#on_completeObject



79
# File 'lib/wurk/iterable_job.rb', line 79

def on_complete; end

#on_resumeObject



76
# File 'lib/wurk/iterable_job.rb', line 76

def on_resume; end

#on_startObject

— lifecycle hooks (no-op defaults; users override as needed) —–



75
# File 'lib/wurk/iterable_job.rb', line 75

def on_start; end

#on_stopObject



77
# File 'lib/wurk/iterable_job.rb', line 77

def on_stop; end

#perform(*args) ⇒ Object

Foundation run loop. Loads any persisted state, drives the enumerator, checkpoints every ‘STATE_FLUSH_INTERVAL`, and on interruption persists the final cursor before re-raising so the interrupt-handler middleware can re-push the job at the head of the queue.



135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/wurk/iterable_job.rb', line 135

def perform(*args)
  reset_run_state(args)
  load_state
  fire_lifecycle_start
  @executions += 1

  run_iterations(args)

  finalize_complete
rescue Interrupted
  finalize_interrupted
  raise
end