Class: Wurk::JobRecord
- Inherits:
-
Object
- Object
- Wurk::JobRecord
- Defined in:
- lib/wurk/job_record.rb
Overview
One job payload viewed from the data API (Queue#each / JobSet#each). Wraps the raw JSON string from Redis; parses lazily so an O(n) scan over a large queue doesn’t pay JSON cost for jobs that go unused.
The ‘value` (raw JSON string) is what Redis stores; `LREM` matches exact bytes, so `delete` must use it rather than re-serialize.
Spec: docs/target/sidekiq-free.md §19.3.
Direct Known Subclasses
Constant Summary collapse
- ACTIVE_JOB_WRAPPER =
Pre-compiled. ActiveJob’s wrapper class varies per Rails minor (‘ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper`, `ActiveJob::QueueAdapters::WurkAdapter::JobWrapper`, etc.).
/\AActiveJob::QueueAdapters::.+::JobWrapper\z/- ACTION_MAILER_JOBS =
%w[ ActionMailer::DeliveryJob ActionMailer::Parameterized::DeliveryJob ActionMailer::MailDeliveryJob ].freeze
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Class Method Summary collapse
-
.latency_from(enqueued_at, now_ms = nil) ⇒ Object
Shared latency math: ms ints (>= 10^10) and float secs (< 10^10) both stored in ‘enqueued_at` historically.
Instance Method Summary collapse
-
#[](name) ⇒ Object
Hash-like reader for arbitrary payload fields.
- #args ⇒ Object
- #bid ⇒ Object
- #created_at ⇒ Object
-
#delete ⇒ Object
Removes exactly this payload’s bytes from the queue list.
-
#display_args ⇒ Object
UI-facing args.
-
#display_class ⇒ Object
ActiveJob/ActionMailer unwrappers — UI-facing only.
- #enqueued_at ⇒ Object
-
#error_backtrace ⇒ Object
Sidekiq compresses the bt as base64(zlib.deflate(JSON.dump(bt))).
- #failed_at ⇒ Object
-
#initialize(item, queue_name = nil) ⇒ JobRecord
constructor
A new instance of JobRecord.
-
#item ⇒ Object
Lazily parsed payload.
-
#iterable_state ⇒ Object
IterableJob progress for this job, or nil for a non-iterable job (no ‘it-<jid>` HASH).
- #jid ⇒ Object
- #klass ⇒ Object
-
#latency ⇒ Object
Seconds since enqueued_at.
- #retried_at ⇒ Object
- #tags ⇒ Object
-
#value ⇒ Object
Lazily serialized payload.
Constructor Details
#initialize(item, queue_name = nil) ⇒ JobRecord
Returns a new instance of JobRecord.
30 31 32 33 34 35 36 37 38 39 |
# File 'lib/wurk/job_record.rb', line 30 def initialize(item, queue_name = nil) if item.is_a?(String) @value = item @item = nil else @item = item @value = nil end @queue = queue_name || (@item && @item['queue']) end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
26 27 28 |
# File 'lib/wurk/job_record.rb', line 26 def queue @queue end |
Class Method Details
.latency_from(enqueued_at, now_ms = nil) ⇒ Object
Shared latency math: ms ints (>= 10^10) and float secs (< 10^10) both stored in ‘enqueued_at` historically. See spec §31.5.
123 124 125 126 127 128 129 130 |
# File 'lib/wurk/job_record.rb', line 123 def self.latency_from(enqueued_at, now_ms = nil) return 0.0 if enqueued_at.nil? now_ms ||= ::Process.clock_gettime(::Process::CLOCK_REALTIME, :millisecond) enq_ms = enqueued_at < 10_000_000_000 ? enqueued_at * 1_000 : enqueued_at diff = (now_ms - enq_ms) / 1_000.0 diff.negative? ? 0.0 : diff end |
Instance Method Details
#[](name) ⇒ Object
Hash-like reader for arbitrary payload fields. Spec §19.3.
72 |
# File 'lib/wurk/job_record.rb', line 72 def [](name) = item[name] |
#args ⇒ Object
53 |
# File 'lib/wurk/job_record.rb', line 53 def args = item['args'] |
#bid ⇒ Object
55 |
# File 'lib/wurk/job_record.rb', line 55 def bid = item['bid'] |
#created_at ⇒ Object
67 |
# File 'lib/wurk/job_record.rb', line 67 def created_at = parse_time(item['created_at']) |
#delete ⇒ Object
Removes exactly this payload’s bytes from the queue list. Returns true when LREM removed ≥1 entry. Idempotent. Method name is Sidekiq wire-compat — renaming would break ‘JobRecord#delete`.
95 96 97 98 |
# File 'lib/wurk/job_record.rb', line 95 def delete # rubocop:disable Naming/PredicateMethod removed = Wurk.redis { |c| c.call('LREM', Keys.queue(@queue), 1, value) } removed.to_i.positive? end |
#display_args ⇒ Object
UI-facing args. Encrypted jobs (§4.7) get their envelope last arg masked as “<encrypted>” so ciphertext never reaches the dashboard; redaction keys off the envelope shape, so it fires whether or not the stored hash carried the ‘encrypt` flag. Cleartext preceding args stay visible for triage. Display-only — the stored payload is untouched.
113 114 115 116 117 118 |
# File 'lib/wurk/job_record.rb', line 113 def display_args return @display_args if defined?(@display_args) base = active_job_wrapper? ? unwrap_args : args @display_args = Wurk::Encryption.redact_args('args' => base, 'encrypt' => item['encrypt']) end |
#display_class ⇒ Object
ActiveJob/ActionMailer unwrappers — UI-facing only. For plain Wurk workers, returns the raw class name.
102 103 104 105 106 |
# File 'lib/wurk/job_record.rb', line 102 def display_class return @display_class if defined?(@display_class) @display_class = active_job_wrapper? ? unwrap_class : klass end |
#enqueued_at ⇒ Object
66 |
# File 'lib/wurk/job_record.rb', line 66 def enqueued_at = parse_time(item['enqueued_at']) |
#error_backtrace ⇒ Object
Sidekiq compresses the bt as base64(zlib.deflate(JSON.dump(bt))). Returns nil when no error has been recorded.
76 77 78 79 80 81 82 83 |
# File 'lib/wurk/job_record.rb', line 76 def error_backtrace compressed = item['error_backtrace'] return nil unless compressed Wurk.load_json(Zlib.inflate(Base64.decode64(compressed))) rescue Zlib::DataError, ArgumentError, ::JSON::ParserError nil end |
#failed_at ⇒ Object
68 |
# File 'lib/wurk/job_record.rb', line 68 def failed_at = parse_time(item['failed_at']) |
#item ⇒ Object
Lazily parsed payload. Memoized; never re-parses.
42 43 44 |
# File 'lib/wurk/job_record.rb', line 42 def item @item ||= Wurk.load_json(@value) end |
#iterable_state ⇒ Object
IterableJob progress for this job, or nil for a non-iterable job (no ‘it-<jid>` HASH). Spec §19.3. Reads via the IterableJobQuery data API.
59 60 61 62 63 |
# File 'lib/wurk/job_record.rb', line 59 def iterable_state return nil if jid.nil? || jid.to_s.empty? Wurk::IterableJobQuery.new([jid])[jid] end |
#jid ⇒ Object
54 |
# File 'lib/wurk/job_record.rb', line 54 def jid = item['jid'] |
#klass ⇒ Object
52 |
# File 'lib/wurk/job_record.rb', line 52 def klass = item['class'] |
#latency ⇒ Object
Seconds since enqueued_at. Handles legacy float-seconds and current integer-ms ‘enqueued_at` shapes. Returns 0.0 when missing or somehow in the future (clock skew).
88 89 90 |
# File 'lib/wurk/job_record.rb', line 88 def latency JobRecord.latency_from(item['enqueued_at']) end |
#retried_at ⇒ Object
69 |
# File 'lib/wurk/job_record.rb', line 69 def retried_at = parse_time(item['retried_at']) |
#tags ⇒ Object
65 |
# File 'lib/wurk/job_record.rb', line 65 def = item['tags'] || [] |