Class: Wurk::JobRecord

Inherits:
Object
  • Object
show all
Defined in:
lib/wurk/job_record.rb

Overview

One job payload viewed from the data API (Queue#each / JobSet#each). Wraps the raw JSON string from Redis; parses lazily so an O(n) scan over a large queue doesn’t pay JSON cost for jobs that go unused.

The ‘value` (raw JSON string) is what Redis stores; `LREM` matches exact bytes, so `delete` must use it rather than re-serialize.

Spec: docs/target/sidekiq-free.md §19.3.

Direct Known Subclasses

SortedEntry

Constant Summary collapse

ACTIVE_JOB_WRAPPER =

Pre-compiled. ActiveJob’s wrapper class varies per Rails minor (‘ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper`, `ActiveJob::QueueAdapters::WurkAdapter::JobWrapper`, etc.).

/\AActiveJob::QueueAdapters::.+::JobWrapper\z/
ACTION_MAILER_JOBS =
%w[
  ActionMailer::DeliveryJob
  ActionMailer::Parameterized::DeliveryJob
  ActionMailer::MailDeliveryJob
].freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(item, queue_name = nil) ⇒ JobRecord

Returns a new instance of JobRecord.

Parameters:

  • item (String, Hash)

    raw JSON payload or pre-parsed hash.

  • queue_name (String, nil) (defaults to: nil)

    queue this record came from.



30
31
32
33
34
35
36
37
38
39
# File 'lib/wurk/job_record.rb', line 30

def initialize(item, queue_name = nil)
  if item.is_a?(String)
    @value = item
    @item = nil
  else
    @item = item
    @value = nil
  end
  @queue = queue_name || (@item && @item['queue'])
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



26
27
28
# File 'lib/wurk/job_record.rb', line 26

def queue
  @queue
end

Class Method Details

.latency_from(enqueued_at, now_ms = nil) ⇒ Object

Shared latency math: ms ints (>= 10^10) and float secs (< 10^10) both stored in ‘enqueued_at` historically. See spec §31.5.



108
109
110
111
112
113
114
115
# File 'lib/wurk/job_record.rb', line 108

def self.latency_from(enqueued_at, now_ms = nil)
  return 0.0 if enqueued_at.nil?

  now_ms ||= ::Process.clock_gettime(::Process::CLOCK_REALTIME, :millisecond)
  enq_ms = enqueued_at < 10_000_000_000 ? enqueued_at * 1_000 : enqueued_at
  diff = (now_ms - enq_ms) / 1_000.0
  diff.negative? ? 0.0 : diff
end

Instance Method Details

#[](name) ⇒ Object

Hash-like reader for arbitrary payload fields. Spec §19.3.



63
# File 'lib/wurk/job_record.rb', line 63

def [](name) = item[name]

#argsObject



53
# File 'lib/wurk/job_record.rb', line 53

def args          = item['args']

#bidObject



55
# File 'lib/wurk/job_record.rb', line 55

def bid           = item['bid']

#created_atObject



58
# File 'lib/wurk/job_record.rb', line 58

def created_at    = parse_time(item['created_at'])

#deleteObject

Removes exactly this payload’s bytes from the queue list. Returns true when LREM removed ≥1 entry. Idempotent. Method name is Sidekiq wire-compat — renaming would break ‘JobRecord#delete`.



86
87
88
89
# File 'lib/wurk/job_record.rb', line 86

def delete # rubocop:disable Naming/PredicateMethod
  removed = Wurk.redis { |c| c.call('LREM', Keys.queue(@queue), 1, value) }
  removed.to_i.positive?
end

#display_argsObject



99
100
101
102
103
# File 'lib/wurk/job_record.rb', line 99

def display_args
  return @display_args if defined?(@display_args)

  @display_args = active_job_wrapper? ? unwrap_args : args
end

#display_classObject

ActiveJob/ActionMailer unwrappers — UI-facing only. For plain Wurk workers, returns the raw class name.



93
94
95
96
97
# File 'lib/wurk/job_record.rb', line 93

def display_class
  return @display_class if defined?(@display_class)

  @display_class = active_job_wrapper? ? unwrap_class : klass
end

#enqueued_atObject



57
# File 'lib/wurk/job_record.rb', line 57

def enqueued_at   = parse_time(item['enqueued_at'])

#error_backtraceObject

Sidekiq compresses the bt as base64(zlib.deflate(JSON.dump(bt))). Returns nil when no error has been recorded.



67
68
69
70
71
72
73
74
# File 'lib/wurk/job_record.rb', line 67

def error_backtrace
  compressed = item['error_backtrace']
  return nil unless compressed

  Wurk.load_json(Zlib.inflate(Base64.decode64(compressed)))
rescue Zlib::DataError, ArgumentError, ::JSON::ParserError
  nil
end

#failed_atObject



59
# File 'lib/wurk/job_record.rb', line 59

def failed_at     = parse_time(item['failed_at'])

#itemObject

Lazily parsed payload. Memoized; never re-parses.



42
43
44
# File 'lib/wurk/job_record.rb', line 42

def item
  @item ||= Wurk.load_json(@value)
end

#jidObject



54
# File 'lib/wurk/job_record.rb', line 54

def jid           = item['jid']

#klassObject



52
# File 'lib/wurk/job_record.rb', line 52

def klass         = item['class']

#latencyObject

Seconds since enqueued_at. Handles legacy float-seconds and current integer-ms ‘enqueued_at` shapes. Returns 0.0 when missing or somehow in the future (clock skew).



79
80
81
# File 'lib/wurk/job_record.rb', line 79

def latency
  JobRecord.latency_from(item['enqueued_at'])
end

#retried_atObject



60
# File 'lib/wurk/job_record.rb', line 60

def retried_at    = parse_time(item['retried_at'])

#tagsObject



56
# File 'lib/wurk/job_record.rb', line 56

def tags          = item['tags'] || []

#valueObject

Lazily serialized payload. When constructed from a Hash, we generate JSON on first call so ‘delete` (LREM) has exact bytes.



48
49
50
# File 'lib/wurk/job_record.rb', line 48

def value
  @value ||= Wurk.dump_json(@item)
end