Class: Wurk::JobRecord
- Inherits:
-
Object
- Object
- Wurk::JobRecord
- Defined in:
- lib/wurk/job_record.rb
Overview
One job payload viewed from the data API (Queue#each / JobSet#each). Wraps the raw JSON string from Redis; parses lazily so an O(n) scan over a large queue doesn’t pay JSON cost for jobs that go unused.
The ‘value` (raw JSON string) is what Redis stores; `LREM` matches exact bytes, so `delete` must use it rather than re-serialize.
Spec: docs/target/sidekiq-free.md §19.3.
Direct Known Subclasses
Constant Summary collapse
- ACTIVE_JOB_WRAPPER =
Pre-compiled. ActiveJob’s wrapper class varies per Rails minor (‘ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper`, `ActiveJob::QueueAdapters::WurkAdapter::JobWrapper`, etc.).
/\AActiveJob::QueueAdapters::.+::JobWrapper\z/- ACTION_MAILER_JOBS =
%w[ ActionMailer::DeliveryJob ActionMailer::Parameterized::DeliveryJob ActionMailer::MailDeliveryJob ].freeze
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Class Method Summary collapse
-
.latency_from(enqueued_at, now_ms = nil) ⇒ Object
Shared latency math: ms ints (>= 10^10) and float secs (< 10^10) both stored in ‘enqueued_at` historically.
Instance Method Summary collapse
-
#[](name) ⇒ Object
Hash-like reader for arbitrary payload fields.
- #args ⇒ Object
- #bid ⇒ Object
- #created_at ⇒ Object
-
#delete ⇒ Object
Removes exactly this payload’s bytes from the queue list.
- #display_args ⇒ Object
-
#display_class ⇒ Object
ActiveJob/ActionMailer unwrappers — UI-facing only.
- #enqueued_at ⇒ Object
-
#error_backtrace ⇒ Object
Sidekiq compresses the bt as base64(zlib.deflate(JSON.dump(bt))).
- #failed_at ⇒ Object
-
#initialize(item, queue_name = nil) ⇒ JobRecord
constructor
A new instance of JobRecord.
-
#item ⇒ Object
Lazily parsed payload.
- #jid ⇒ Object
- #klass ⇒ Object
-
#latency ⇒ Object
Seconds since enqueued_at.
- #retried_at ⇒ Object
- #tags ⇒ Object
-
#value ⇒ Object
Lazily serialized payload.
Constructor Details
#initialize(item, queue_name = nil) ⇒ JobRecord
Returns a new instance of JobRecord.
30 31 32 33 34 35 36 37 38 39 |
# File 'lib/wurk/job_record.rb', line 30 def initialize(item, queue_name = nil) if item.is_a?(String) @value = item @item = nil else @item = item @value = nil end @queue = queue_name || (@item && @item['queue']) end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
26 27 28 |
# File 'lib/wurk/job_record.rb', line 26 def queue @queue end |
Class Method Details
.latency_from(enqueued_at, now_ms = nil) ⇒ Object
Shared latency math: ms ints (>= 10^10) and float secs (< 10^10) both stored in ‘enqueued_at` historically. See spec §31.5.
108 109 110 111 112 113 114 115 |
# File 'lib/wurk/job_record.rb', line 108 def self.latency_from(enqueued_at, now_ms = nil) return 0.0 if enqueued_at.nil? now_ms ||= ::Process.clock_gettime(::Process::CLOCK_REALTIME, :millisecond) enq_ms = enqueued_at < 10_000_000_000 ? enqueued_at * 1_000 : enqueued_at diff = (now_ms - enq_ms) / 1_000.0 diff.negative? ? 0.0 : diff end |
Instance Method Details
#[](name) ⇒ Object
Hash-like reader for arbitrary payload fields. Spec §19.3.
63 |
# File 'lib/wurk/job_record.rb', line 63 def [](name) = item[name] |
#args ⇒ Object
53 |
# File 'lib/wurk/job_record.rb', line 53 def args = item['args'] |
#bid ⇒ Object
55 |
# File 'lib/wurk/job_record.rb', line 55 def bid = item['bid'] |
#created_at ⇒ Object
58 |
# File 'lib/wurk/job_record.rb', line 58 def created_at = parse_time(item['created_at']) |
#delete ⇒ Object
Removes exactly this payload’s bytes from the queue list. Returns true when LREM removed ≥1 entry. Idempotent. Method name is Sidekiq wire-compat — renaming would break ‘JobRecord#delete`.
86 87 88 89 |
# File 'lib/wurk/job_record.rb', line 86 def delete # rubocop:disable Naming/PredicateMethod removed = Wurk.redis { |c| c.call('LREM', Keys.queue(@queue), 1, value) } removed.to_i.positive? end |
#display_args ⇒ Object
99 100 101 102 103 |
# File 'lib/wurk/job_record.rb', line 99 def display_args return @display_args if defined?(@display_args) @display_args = active_job_wrapper? ? unwrap_args : args end |
#display_class ⇒ Object
ActiveJob/ActionMailer unwrappers — UI-facing only. For plain Wurk workers, returns the raw class name.
93 94 95 96 97 |
# File 'lib/wurk/job_record.rb', line 93 def display_class return @display_class if defined?(@display_class) @display_class = active_job_wrapper? ? unwrap_class : klass end |
#enqueued_at ⇒ Object
57 |
# File 'lib/wurk/job_record.rb', line 57 def enqueued_at = parse_time(item['enqueued_at']) |
#error_backtrace ⇒ Object
Sidekiq compresses the bt as base64(zlib.deflate(JSON.dump(bt))). Returns nil when no error has been recorded.
67 68 69 70 71 72 73 74 |
# File 'lib/wurk/job_record.rb', line 67 def error_backtrace compressed = item['error_backtrace'] return nil unless compressed Wurk.load_json(Zlib.inflate(Base64.decode64(compressed))) rescue Zlib::DataError, ArgumentError, ::JSON::ParserError nil end |
#failed_at ⇒ Object
59 |
# File 'lib/wurk/job_record.rb', line 59 def failed_at = parse_time(item['failed_at']) |
#item ⇒ Object
Lazily parsed payload. Memoized; never re-parses.
42 43 44 |
# File 'lib/wurk/job_record.rb', line 42 def item @item ||= Wurk.load_json(@value) end |
#jid ⇒ Object
54 |
# File 'lib/wurk/job_record.rb', line 54 def jid = item['jid'] |
#klass ⇒ Object
52 |
# File 'lib/wurk/job_record.rb', line 52 def klass = item['class'] |
#latency ⇒ Object
Seconds since enqueued_at. Handles legacy float-seconds and current integer-ms ‘enqueued_at` shapes. Returns 0.0 when missing or somehow in the future (clock skew).
79 80 81 |
# File 'lib/wurk/job_record.rb', line 79 def latency JobRecord.latency_from(item['enqueued_at']) end |
#retried_at ⇒ Object
60 |
# File 'lib/wurk/job_record.rb', line 60 def retried_at = parse_time(item['retried_at']) |
#tags ⇒ Object
56 |
# File 'lib/wurk/job_record.rb', line 56 def = item['tags'] || [] |