Class: RailsSemanticLogger::SolidQueue::LogSubscriber

Inherits:
ActiveSupport::LogSubscriber
  • Object
show all
Defined in:
lib/rails_semantic_logger/solid_queue/log_subscriber.rb

Instance Method Summary collapse

Instance Method Details

#claim(event) ⇒ Object



8
9
10
# File 'lib/rails_semantic_logger/solid_queue/log_subscriber.rb', line 8

def claim(event)
  log_event(event, :debug, "Claim jobs", **event.payload.slice(:process_id, :job_ids, :claimed_job_ids, :size))
end

#deregister_process(event) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/rails_semantic_logger/solid_queue/log_subscriber.rb', line 87

def deregister_process(event)
  process    = event.payload[:process]
  attributes = {
    process_id:        process.id,
    pid:               process.pid,
    hostname:          process.hostname,
    name:              process.name,
    last_heartbeat_at: process.last_heartbeat_at.iso8601,
    claimed_size:      event.payload[:claimed_size],
    pruned:            event.payload[:pruned]
  }

  if (error = event.payload[:error])
    log_event(event, :warn, "Error deregistering #{process.kind}", exception: error, **attributes)
  else
    log_event(event, :debug, "Deregister #{process.kind}", **attributes)
  end
end

#discard(event) ⇒ Object



36
37
38
# File 'lib/rails_semantic_logger/solid_queue/log_subscriber.rb', line 36

def discard(event)
  log_event(event, :debug, "Discard job", **event.payload.slice(:job_id, :status))
end

#discard_all(event) ⇒ Object



32
33
34
# File 'lib/rails_semantic_logger/solid_queue/log_subscriber.rb', line 32

def discard_all(event)
  log_event(event, :debug, "Discard jobs", **event.payload.slice(:jobs_size, :size, :status))
end

#dispatch_scheduled(event) ⇒ Object



4
5
6
# File 'lib/rails_semantic_logger/solid_queue/log_subscriber.rb', line 4

def dispatch_scheduled(event)
  log_event(event, :debug, "Dispatch scheduled jobs", **event.payload.slice(:batch_size, :size))
end

#enqueue_recurring_task(event) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/rails_semantic_logger/solid_queue/log_subscriber.rb', line 48

def enqueue_recurring_task(event)
  attributes      = event.payload.slice(:task, :active_job_id, :enqueue_error)
  attributes[:at] = event.payload[:at]&.iso8601

  if attributes[:active_job_id].nil? && event.payload[:skipped].nil?
    log_event(event, :error, "Error enqueuing recurring task", **attributes)
  elsif event.payload[:other_adapter]
    log_event(event, :debug, "Enqueued recurring task outside Solid Queue", **attributes)
  else
    action = event.payload[:skipped].present? ? "Skipped recurring task – already dispatched" : "Enqueued recurring task"
    log_event(event, :debug, action, **attributes)
  end
end

#fail_many_claimed(event) ⇒ Object



16
17
18
# File 'lib/rails_semantic_logger/solid_queue/log_subscriber.rb', line 16

def fail_many_claimed(event)
  log_event(event, :warn, "Fail claimed jobs", **event.payload.slice(:job_ids, :process_ids))
end

#graceful_termination(event) ⇒ Object



114
115
116
117
118
119
120
121
122
# File 'lib/rails_semantic_logger/solid_queue/log_subscriber.rb', line 114

def graceful_termination(event)
  attributes = event.payload.slice(:process_id, :supervisor_pid, :supervised_processes)

  if event.payload[:shutdown_timeout_exceeded]
    log_event(event, :warn, "Supervisor wasn't terminated gracefully - shutdown timeout exceeded", **attributes)
  else
    log_event(event, :info, "Supervisor terminated gracefully", **attributes)
  end
end

#immediate_termination(event) ⇒ Object



124
125
126
127
# File 'lib/rails_semantic_logger/solid_queue/log_subscriber.rb', line 124

def immediate_termination(event)
  log_event(event, :info, "Supervisor terminated immediately",
            **event.payload.slice(:process_id, :supervisor_pid, :supervised_processes))
end

#prune_processes(event) ⇒ Object



106
107
108
# File 'lib/rails_semantic_logger/solid_queue/log_subscriber.rb', line 106

def prune_processes(event)
  log_event(event, :debug, "Prune dead processes", **event.payload.slice(:size))
end

#register_process(event) ⇒ Object



76
77
78
79
80
81
82
83
84
85
# File 'lib/rails_semantic_logger/solid_queue/log_subscriber.rb', line 76

def register_process(event)
  process_kind = event.payload[:kind]
  attributes   = event.payload.slice(:pid, :hostname, :process_id, :name)

  if (error = event.payload[:error])
    log_event(event, :warn, "Error registering #{process_kind}", exception: error, **attributes)
  else
    log_event(event, :debug, "Register #{process_kind}", **attributes)
  end
end

#release_blocked(event) ⇒ Object



44
45
46
# File 'lib/rails_semantic_logger/solid_queue/log_subscriber.rb', line 44

def release_blocked(event)
  log_event(event, :debug, "Release blocked job", **event.payload.slice(:job_id, :concurrency_key, :released))
end

#release_claimed(event) ⇒ Object



20
21
22
# File 'lib/rails_semantic_logger/solid_queue/log_subscriber.rb', line 20

def release_claimed(event)
  log_event(event, :info, "Release claimed job", **event.payload.slice(:job_id, :process_id))
end

#release_many_blocked(event) ⇒ Object



40
41
42
# File 'lib/rails_semantic_logger/solid_queue/log_subscriber.rb', line 40

def release_many_blocked(event)
  log_event(event, :debug, "Unblock jobs", **event.payload.slice(:limit, :size))
end

#release_many_claimed(event) ⇒ Object



12
13
14
# File 'lib/rails_semantic_logger/solid_queue/log_subscriber.rb', line 12

def release_many_claimed(event)
  log_event(event, :info, "Release claimed jobs", **event.payload.slice(:size))
end

#replace_fork(event) ⇒ Object



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/rails_semantic_logger/solid_queue/log_subscriber.rb', line 133

def replace_fork(event)
  supervisor_pid = event.payload[:supervisor_pid]
  status         = event.payload[:status]
  attributes     = event.payload.slice(:pid).merge(
    status:          status.exitstatus || "no exit status set",
    pid_from_status: status.pid,
    signaled:        status.signaled?,
    stopsig:         status.stopsig,
    termsig:         status.termsig
  )

  if (replaced_fork = event.payload[:fork])
    log_event(event, :info, "Replaced terminated #{replaced_fork.kind}",
              **attributes, hostname: replaced_fork.hostname, name: replaced_fork.name)
  elsif supervisor_pid != 1 # Running Docker, possibly having some processes that have been reparented
    log_event(event, :warn, "Tried to replace forked process but it had already died", **attributes)
  end
end

#retry(event) ⇒ Object



28
29
30
# File 'lib/rails_semantic_logger/solid_queue/log_subscriber.rb', line 28

def retry(event)
  log_event(event, :debug, "Retry failed job", **event.payload.slice(:job_id))
end

#retry_all(event) ⇒ Object



24
25
26
# File 'lib/rails_semantic_logger/solid_queue/log_subscriber.rb', line 24

def retry_all(event)
  log_event(event, :debug, "Retry failed jobs", **event.payload.slice(:jobs_size, :size))
end

#shutdown_process(event) ⇒ Object



69
70
71
72
73
74
# File 'lib/rails_semantic_logger/solid_queue/log_subscriber.rb', line 69

def shutdown_process(event)
  process    = event.payload[:process]
  attributes = process_attributes(process).merge(process.)

  log_event(event, :info, "Shutdown #{process.kind}", **attributes)
end

#start_process(event) ⇒ Object



62
63
64
65
66
67
# File 'lib/rails_semantic_logger/solid_queue/log_subscriber.rb', line 62

def start_process(event)
  process    = event.payload[:process]
  attributes = process_attributes(process).merge(process.)

  log_event(event, :info, "Started #{process.kind}", **attributes)
end

#thread_error(event) ⇒ Object



110
111
112
# File 'lib/rails_semantic_logger/solid_queue/log_subscriber.rb', line 110

def thread_error(event)
  log_event(event, :error, "Error in thread", exception: event.payload[:error])
end

#unhandled_signal_error(event) ⇒ Object



129
130
131
# File 'lib/rails_semantic_logger/solid_queue/log_subscriber.rb', line 129

def unhandled_signal_error(event)
  log_event(event, :error, "Received unhandled signal", **event.payload.slice(:signal))
end