Class: Pgbus::FailedEventRecorder
- Inherits:
-
Object
- Object
- Pgbus::FailedEventRecorder
- Defined in:
- lib/pgbus/failed_event_recorder.rb
Overview
Records job failures to pgbus_failed_events for dashboard visibility. Uses upsert (INSERT ON CONFLICT UPDATE) keyed on (queue_name, msg_id) so each message has at most one failed_event row tracking its latest error.
Class Method Summary collapse
- .clear!(queue_name:, msg_id:) ⇒ Object
- .exists?(queue_name:, msg_id:) ⇒ Boolean
- .record!(queue_name:, msg_id:, payload:, headers:, error:, retry_count:) ⇒ Object
Class Method Details
.clear!(queue_name:, msg_id:) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/pgbus/failed_event_recorder.rb', line 50 def clear!(queue_name:, msg_id:) connection.exec_delete( "DELETE FROM pgbus_failed_events WHERE queue_name = $1 AND msg_id = $2", "FailedEvent Clear", [queue_name, msg_id.to_i] ) rescue StandardError => e # ERROR-level: a failed clear leaves a stale row in the dashboard # AFTER the job actually succeeded — confusing and load-bearing # for users debugging recurring duplicates. Pgbus.logger.error do "[Pgbus] Failed to clear failed event for queue=#{queue_name} msg_id=#{msg_id}: " \ "#{e.class}: #{e.}" end end |
.exists?(queue_name:, msg_id:) ⇒ Boolean
38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/pgbus/failed_event_recorder.rb', line 38 def exists?(queue_name:, msg_id:) result = connection.select_value( "SELECT 1 FROM pgbus_failed_events WHERE queue_name = $1 AND msg_id = $2 LIMIT 1", "FailedEvent Exists", [queue_name, msg_id.to_i] ) !result.nil? rescue StandardError => e Pgbus.logger.debug { "[Pgbus] FailedEvent exists? check failed: #{e.class}: #{e.}" } false end |
.record!(queue_name:, msg_id:, payload:, headers:, error:, retry_count:) ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/pgbus/failed_event_recorder.rb', line 9 def record!(queue_name:, msg_id:, payload:, headers:, error:, retry_count:) connection.exec_query( <<~SQL.squish, INSERT INTO pgbus_failed_events (queue_name, msg_id, payload, headers, error_class, error_message, backtrace, retry_count, failed_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, CURRENT_TIMESTAMP) ON CONFLICT (queue_name, msg_id) DO UPDATE SET error_class = EXCLUDED.error_class, error_message = EXCLUDED.error_message, backtrace = EXCLUDED.backtrace, retry_count = EXCLUDED.retry_count, failed_at = EXCLUDED.failed_at SQL "FailedEvent Record", [ queue_name, msg_id.to_i, payload.is_a?(String) ? payload : JSON.generate(payload), headers.is_a?(String) ? headers : headers&.then { |h| JSON.generate(h) }, error.class.name, error..to_s.truncate(10_000), error.backtrace&.first(30)&.join("\n"), retry_count ] ) rescue StandardError => e ErrorReporter.report(e, { action: "record_failed_event", queue: queue_name, msg_id: msg_id }) end |