Class: AllStak::Integrations::Sidekiq::Middleware

Inherits:
Object
  • Object
show all
Defined in:
lib/allstak/integrations/sidekiq.rb

Overview

Sidekiq server middleware. Sidekiq calls ‘#call(worker, job, queue)` and expects the middleware to `yield` to run the job.

Instance Method Summary collapse

Instance Method Details

#call(worker, job, queue) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/allstak/integrations/sidekiq.rb', line 96

def call(worker, job, queue)
  unless AllStak.initialized?
    return yield
  end

  client = AllStak.client
  config = client.config

  # Each job is its own trace root unless an upstream producer
  # propagated a trace id into the job payload.
  incoming_trace = job.is_a?(Hash) ? (job["allstak_trace_id"] || job["trace_id"]) : nil
  if incoming_trace && !incoming_trace.to_s.empty?
    client.tracing.set_trace_id(incoming_trace.to_s)
  else
    client.tracing.reset_trace
  end

  worker_class = worker_class_name(worker, job)
  job_queue = (job.is_a?(Hash) ? job["queue"] : nil) || queue
  jid = job.is_a?(Hash) ? job["jid"] : nil

  client.errors.add_breadcrumb(
    type: "sidekiq",
    message: "process #{worker_class}",
    data: { "queue" => job_queue, "jid" => jid }.reject { |_, v| v.nil? }
  )

  span = client.tracing.start_span(
    "queue.process",
    description: worker_class,
    tags: {
      "messaging.system"      => "sidekiq",
      "messaging.destination" => job_queue.to_s,
      "messaging.message_id"  => jid.to_s
    }.reject { |_, v| v.to_s.empty? }
  )

  captured = nil
  begin
    yield
  rescue Exception => e # rubocop:disable Lint/RescueException
    captured = e
    raise
  ensure
    span.finish(captured ? "error" : "ok") unless span.finished?

    if captured && config.capture_unhandled_exceptions
      begin
        meta = AllStak::Integrations::Sidekiq.(job_hash(job, worker_class, job_queue, jid)).merge(
          "mechanism" => "sidekiq",
          "handled"   => false,
          "traceId"   => client.tracing.current_trace_id
        )
        client.errors.capture_exception(
          captured,
          trace_id: client.tracing.current_trace_id,
          metadata: meta
        )
      rescue => err
        config.debug && warn("[AllStak] sidekiq exception capture failed: #{err.message}")
      end
    end
  end
end