Class: Wurk::JobRecord

Inherits:
Object
  • Object
show all
Defined in:
lib/wurk/job_record.rb

Overview

One job payload viewed from the data API (Queue#each / JobSet#each). Wraps the raw JSON string from Redis; parses lazily so an O(n) scan over a large queue doesn’t pay JSON cost for jobs that go unused.

The ‘value` (raw JSON string) is what Redis stores; `LREM` matches exact bytes, so `delete` must use it rather than re-serialize.

Spec: docs/target/sidekiq-free.md §19.3.

Direct Known Subclasses

SortedEntry

Constant Summary collapse

ACTIVE_JOB_WRAPPER =

Pre-compiled. ActiveJob’s wrapper class varies per Rails minor (‘ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper`, `ActiveJob::QueueAdapters::WurkAdapter::JobWrapper`, etc.).

/\AActiveJob::QueueAdapters::.+::JobWrapper\z/
ACTION_MAILER_JOBS =
%w[
  ActionMailer::DeliveryJob
  ActionMailer::Parameterized::DeliveryJob
  ActionMailer::MailDeliveryJob
].freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(item, queue_name = nil) ⇒ JobRecord

Returns a new instance of JobRecord.

Parameters:

  • item (String, Hash)

    raw JSON payload or pre-parsed hash.

  • queue_name (String, nil) (defaults to: nil)

    queue this record came from.



30
31
32
33
34
35
36
37
38
39
# File 'lib/wurk/job_record.rb', line 30

def initialize(item, queue_name = nil)
  if item.is_a?(String)
    @value = item
    @item = nil
  else
    @item = item
    @value = nil
  end
  @queue = queue_name || (@item && @item['queue'])
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



26
27
28
# File 'lib/wurk/job_record.rb', line 26

def queue
  @queue
end

Class Method Details

.latency_from(enqueued_at, now_ms = nil) ⇒ Object

Shared latency math: ms ints (>= 10^10) and float secs (< 10^10) both stored in ‘enqueued_at` historically. See spec §31.5.



123
124
125
126
127
128
129
130
# File 'lib/wurk/job_record.rb', line 123

def self.latency_from(enqueued_at, now_ms = nil)
  return 0.0 if enqueued_at.nil?

  now_ms ||= ::Process.clock_gettime(::Process::CLOCK_REALTIME, :millisecond)
  enq_ms = enqueued_at < 10_000_000_000 ? enqueued_at * 1_000 : enqueued_at
  diff = (now_ms - enq_ms) / 1_000.0
  diff.negative? ? 0.0 : diff
end

Instance Method Details

#[](name) ⇒ Object

Hash-like reader for arbitrary payload fields. Spec §19.3.



72
# File 'lib/wurk/job_record.rb', line 72

def [](name) = item[name]

#argsObject



53
# File 'lib/wurk/job_record.rb', line 53

def args          = item['args']

#bidObject



55
# File 'lib/wurk/job_record.rb', line 55

def bid           = item['bid']

#created_atObject



67
# File 'lib/wurk/job_record.rb', line 67

def created_at    = parse_time(item['created_at'])

#deleteObject

Removes exactly this payload’s bytes from the queue list. Returns true when LREM removed ≥1 entry. Idempotent. Method name is Sidekiq wire-compat — renaming would break ‘JobRecord#delete`.



95
96
97
98
# File 'lib/wurk/job_record.rb', line 95

def delete # rubocop:disable Naming/PredicateMethod
  removed = Wurk.redis { |c| c.call('LREM', Keys.queue(@queue), 1, value) }
  removed.to_i.positive?
end

#display_argsObject

UI-facing args. Encrypted jobs (§4.7) get their envelope last arg masked as “<encrypted>” so ciphertext never reaches the dashboard; redaction keys off the envelope shape, so it fires whether or not the stored hash carried the ‘encrypt` flag. Cleartext preceding args stay visible for triage. Display-only — the stored payload is untouched.



113
114
115
116
117
118
# File 'lib/wurk/job_record.rb', line 113

def display_args
  return @display_args if defined?(@display_args)

  base = active_job_wrapper? ? unwrap_args : args
  @display_args = Wurk::Encryption.redact_args('args' => base, 'encrypt' => item['encrypt'])
end

#display_classObject

ActiveJob/ActionMailer unwrappers — UI-facing only. For plain Wurk workers, returns the raw class name.



102
103
104
105
106
# File 'lib/wurk/job_record.rb', line 102

def display_class
  return @display_class if defined?(@display_class)

  @display_class = active_job_wrapper? ? unwrap_class : klass
end

#enqueued_atObject



66
# File 'lib/wurk/job_record.rb', line 66

def enqueued_at   = parse_time(item['enqueued_at'])

#error_backtraceObject

Sidekiq compresses the bt as base64(zlib.deflate(JSON.dump(bt))). Returns nil when no error has been recorded.



76
77
78
79
80
81
82
83
# File 'lib/wurk/job_record.rb', line 76

def error_backtrace
  compressed = item['error_backtrace']
  return nil unless compressed

  Wurk.load_json(Zlib.inflate(Base64.decode64(compressed)))
rescue Zlib::DataError, ArgumentError, ::JSON::ParserError
  nil
end

#failed_atObject



68
# File 'lib/wurk/job_record.rb', line 68

def failed_at     = parse_time(item['failed_at'])

#itemObject

Lazily parsed payload. Memoized; never re-parses.



42
43
44
# File 'lib/wurk/job_record.rb', line 42

def item
  @item ||= Wurk.load_json(@value)
end

#iterable_stateObject

IterableJob progress for this job, or nil for a non-iterable job (no ‘it-<jid>` HASH). Spec §19.3. Reads via the IterableJobQuery data API.



59
60
61
62
63
# File 'lib/wurk/job_record.rb', line 59

def iterable_state
  return nil if jid.nil? || jid.to_s.empty?

  Wurk::IterableJobQuery.new([jid])[jid]
end

#jidObject



54
# File 'lib/wurk/job_record.rb', line 54

def jid           = item['jid']

#klassObject



52
# File 'lib/wurk/job_record.rb', line 52

def klass         = item['class']

#latencyObject

Seconds since enqueued_at. Handles legacy float-seconds and current integer-ms ‘enqueued_at` shapes. Returns 0.0 when missing or somehow in the future (clock skew).



88
89
90
# File 'lib/wurk/job_record.rb', line 88

def latency
  JobRecord.latency_from(item['enqueued_at'])
end

#retried_atObject



69
# File 'lib/wurk/job_record.rb', line 69

def retried_at    = parse_time(item['retried_at'])

#tagsObject



65
# File 'lib/wurk/job_record.rb', line 65

def tags          = item['tags'] || []

#valueObject

Lazily serialized payload. When constructed from a Hash, we generate JSON on first call so ‘delete` (LREM) has exact bytes.



48
49
50
# File 'lib/wurk/job_record.rb', line 48

def value
  @value ||= Wurk.dump_json(@item)
end