Class: Pgbus::Web::DataSource

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/web/data_source.rb

Instance Method Summary collapse

Constructor Details

#initialize(client: Pgbus.client) ⇒ DataSource

Returns a new instance of DataSource.



8
9
10
11
12
# File 'lib/pgbus/web/data_source.rb', line 8

def initialize(client: Pgbus.client)
  @client = client
  @last_throughput_snapshot = nil
  @last_throughput_at = nil
end

Instance Method Details

#discard_all_dlqObject



340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# File 'lib/pgbus/web/data_source.rb', line 340

def discard_all_dlq
  messages = dlq_messages(page: 1, per_page: 1000)
  return 0 if messages.empty?

  release_locks_for_messages(messages)

  # Group by queue for batch delete — one call per DLQ instead of N calls
  messages.group_by { |m| m[:queue_name] }.sum do |queue_name, msgs|
    ids = msgs.map { |m| m[:msg_id].to_i }
    @client.delete_batch(queue_name, ids, prefixed: false).size
  rescue StandardError => e
    Pgbus.logger.debug { "[Pgbus::Web] Error batch-discarding DLQ messages from #{queue_name}: #{e.message}" }
    0
  end
end

#discard_all_enqueuedObject



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/pgbus/web/data_source.rb', line 129

def discard_all_enqueued
  dlq_suffix = Pgbus::DEAD_LETTER_SUFFIX
  queues = queues_with_metrics.reject { |q| q[:name].end_with?(dlq_suffix) }
  total = 0

  queues.each do |q|
    messages = query_queue_messages_raw(q[:name], 10_000, 0)
    next if messages.empty?

    release_locks_for_messages(messages)

    ids = messages.map { |m| m[:msg_id].to_i }
    @client.archive_batch(q[:name], ids, prefixed: false)
    total += ids.size
  rescue StandardError => e
    Pgbus.logger.debug { "[Pgbus::Web] Error discarding enqueued messages from #{q[:name]}: #{e.message}" }
  end

  total
end

#discard_all_failedObject



254
255
256
257
258
259
260
261
262
263
# File 'lib/pgbus/web/data_source.rb', line 254

def discard_all_failed
  release_locks_for_failed_events
  archive_all_failed_messages

  result = connection.execute("DELETE FROM pgbus_failed_events")
  result.cmd_tuples
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error discarding all failed events: #{e.message}" }
  0
end

#discard_all_locksObject



570
571
572
573
574
575
# File 'lib/pgbus/web/data_source.rb', line 570

def discard_all_locks
  UniquenessKey.delete_all
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error discarding all locks: #{e.message}" }
  0
end

#discard_dlq_message(queue_name, msg_id) ⇒ Object



318
319
320
321
322
323
324
325
326
# File 'lib/pgbus/web/data_source.rb', line 318

def discard_dlq_message(queue_name, msg_id)
  # queue_name here is the full DLQ name (already prefixed)
  release_lock_for_message(queue_name, msg_id)
  @client.delete_message(queue_name, msg_id.to_i, prefixed: false)
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error discarding DLQ message #{msg_id}: #{e.message}" }
  false
end

#discard_event(queue_name, msg_id) ⇒ Object

Discard (archive) an event message from a handler queue.



748
749
750
751
752
753
754
755
# File 'lib/pgbus/web/data_source.rb', line 748

def discard_event(queue_name, msg_id)
  release_lock_for_message(queue_name, msg_id)
  @client.archive_message(queue_name, msg_id.to_i, prefixed: false)
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error discarding event #{msg_id}: #{e.message}" }
  false
end

#discard_failed_event(id) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/pgbus/web/data_source.rb', line 211

def discard_failed_event(id)
  event = failed_event(id)
  if event
    release_lock_for_payload(event["payload"])
    archive_failed_message(event)
  end

  connection.exec_delete(
    "DELETE FROM pgbus_failed_events WHERE id = $1", "Pgbus Delete Failed Event", [id.to_i]
  )
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error discarding failed event #{id}: #{e.message}" }
  false
end

#discard_job(queue_name, msg_id) ⇒ Object



124
125
126
127
# File 'lib/pgbus/web/data_source.rb', line 124

def discard_job(queue_name, msg_id)
  release_lock_for_message(queue_name, msg_id)
  @client.archive_message(queue_name, msg_id.to_i, prefixed: false)
end

#discard_lock(lock_key) ⇒ Object

Lock management



554
555
556
557
558
559
# File 'lib/pgbus/web/data_source.rb', line 554

def discard_lock(lock_key)
  UniquenessKey.where(lock_key: lock_key).delete_all
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error discarding lock #{lock_key}: #{e.message}" }
  0
end

#discard_locks(lock_keys) ⇒ Object



561
562
563
564
565
566
567
568
# File 'lib/pgbus/web/data_source.rb', line 561

def discard_locks(lock_keys)
  return 0 if lock_keys.empty?

  UniquenessKey.where(lock_key: lock_keys).delete_all
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error discarding locks: #{e.message}" }
  0
end

#discard_selected_events(selections) ⇒ Object

Bulk discard selected events from handler queues.



832
833
834
835
836
837
838
839
840
841
842
843
# File 'lib/pgbus/web/data_source.rb', line 832

def discard_selected_events(selections)
  return 0 if selections.empty?

  count = 0
  selections.each do |sel|
    discard_event(sel[:queue_name], sel[:msg_id]) && count += 1
  rescue StandardError => e
    Pgbus.logger.debug { "[Pgbus::Web] Error in bulk discard for #{sel[:msg_id]}: #{e.message}" }
    next
  end
  count
end

#dlq_message_detail(msg_id) ⇒ Object



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/pgbus/web/data_source.rb', line 279

def dlq_message_detail(msg_id)
  dlq_suffix = Pgbus::DEAD_LETTER_SUFFIX
  queues = queues_with_metrics.select { |q| q[:name].end_with?(dlq_suffix) }
  queues.each do |q|
    row = connection.select_one(
      "SELECT * FROM pgmq.q_#{sanitize_name(q[:name])} WHERE msg_id = $1",
      "Pgbus DLQ Detail",
      [msg_id.to_i]
    )
    return format_message(row, q[:name]) if row
  end
  nil
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching DLQ message #{msg_id}: #{e.message}" }
  nil
end

#dlq_messages(page: 1, per_page: 25) ⇒ Object

Dead letter queue Note: DLQ queue names from queues_with_metrics are already fully qualified (e.g., “pgbus_default_dlq”), so we use them directly without re-prefixing.



268
269
270
271
272
273
274
275
276
277
# File 'lib/pgbus/web/data_source.rb', line 268

def dlq_messages(page: 1, per_page: 25)
  dlq_suffix = Pgbus::DEAD_LETTER_SUFFIX
  queues = queues_with_metrics.select { |q| q[:name].end_with?(dlq_suffix) }
  offset = (page - 1) * per_page

  paginated_queue_messages(queues.map { |q| q[:name] }, per_page, offset)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching DLQ messages: #{e.message}" }
  []
end

#drop_queue(name) ⇒ Object



71
72
73
74
# File 'lib/pgbus/web/data_source.rb', line 71

def drop_queue(name)
  release_uniqueness_keys_for_queue(name)
  @client.drop_queue(name, prefixed: false)
end

#edit_event_payload(queue_name, msg_id, new_payload_json) ⇒ Object

Edit the payload of a stuck event: delete old message and re-enqueue with the corrected payload in the same queue. The produce + delete are wrapped in a PGMQ transaction so the message can’t be lost if either half fails (same pattern as retry_dlq_message).



794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
# File 'lib/pgbus/web/data_source.rb', line 794

def edit_event_payload(queue_name, msg_id, new_payload_json)
  begin
    parsed = JSON.parse(new_payload_json)
  rescue JSON::ParserError
    return false
  end

  detail = job_detail(queue_name, msg_id)
  return false unless detail

  @client.transaction do |txn|
    txn.produce(queue_name, parsed.to_json, headers: detail[:headers])
    txn.delete(queue_name, msg_id.to_i)
  end
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error editing event #{msg_id}: #{e.message}" }
  false
end

#enqueue_recurring_task_now(id) ⇒ Object



506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
# File 'lib/pgbus/web/data_source.rb', line 506

def enqueue_recurring_task_now(id)
  record = RecurringTask.find_by(id: id)
  return false unless record

  task = Recurring::Task.from_configuration(record.key,
                                            class: record.class_name,
                                            command: record.command,
                                            schedule: record.schedule,
                                            queue: record.queue_name,
                                            args: parse_arguments(record.arguments),
                                            priority: record.priority)

  schedule = Recurring::Schedule.new(config: Pgbus.configuration)
  schedule.enqueue_task(task, run_at: Time.now.utc)
  true
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error enqueuing recurring task #{id}: #{e.message}" }
  false
end

#failed_event(id) ⇒ Object



172
173
174
175
176
177
178
179
180
181
# File 'lib/pgbus/web/data_source.rb', line 172

def failed_event(id)
  connection.select_one(
    "SELECT * FROM pgbus_failed_events WHERE id = $1",
    "Pgbus Failed Event",
    [id.to_i]
  )
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching failed event #{id}: #{e.message}" }
  nil
end

#failed_events(page: 1, per_page: 25) ⇒ Object

Failed events



151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/pgbus/web/data_source.rb', line 151

def failed_events(page: 1, per_page: 25)
  offset = (page - 1) * per_page
  rows = connection.select_all(
    "SELECT * FROM pgbus_failed_events ORDER BY failed_at DESC LIMIT $1 OFFSET $2",
    "Pgbus Failed Events",
    [per_page, offset]
  )
  rows.to_a
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching failed events: #{e.message}" }
  []
end

#failed_events_countObject



164
165
166
167
168
169
170
# File 'lib/pgbus/web/data_source.rb', line 164

def failed_events_count
  result = connection.select_value("SELECT COUNT(*) FROM pgbus_failed_events")
  result.to_i
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error counting failed events: #{e.message}" }
  0
end

#handler_class_for_queue(physical_queue_name) ⇒ Object

Find the handler class registered for a given physical queue name. Returns nil if no subscriber matches — used to reject forged handler values in mark_event_handled / reroute_event.



742
743
744
745
# File 'lib/pgbus/web/data_source.rb', line 742

def handler_class_for_queue(physical_queue_name)
  sub = registered_subscribers.find { |s| s[:physical_queue_name] == physical_queue_name }
  sub && sub[:handler_class]
end

#handler_queue_physical_namesObject

Physical queue names for all registered subscribers. Used for both pending_events lookup and server-side validation of target queues in reroute_event.



735
736
737
# File 'lib/pgbus/web/data_source.rb', line 735

def handler_queue_physical_names
  registered_subscribers.map { |s| s[:physical_queue_name] }.uniq
end

#job_detail(queue_name, msg_id) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
# File 'lib/pgbus/web/data_source.rb', line 108

def job_detail(queue_name, msg_id)
  row = connection.select_one(
    "SELECT * FROM pgmq.q_#{sanitize_name(queue_name)} WHERE msg_id = $1",
    "Pgbus Job Detail",
    [msg_id.to_i]
  )
  row ? format_message(row, queue_name) : nil
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching job detail: #{e.message}" }
  nil
end

#job_locksObject

Job uniqueness keys



578
579
580
581
582
583
584
585
586
587
588
589
590
591
# File 'lib/pgbus/web/data_source.rb', line 578

def job_locks
  UniquenessKey.order(created_at: :desc).limit(100).map do |key|
    {
      lock_key: key.lock_key,
      queue_name: key.queue_name,
      msg_id: key.msg_id,
      created_at: key.created_at,
      age_seconds: key.created_at ? (Time.current - key.created_at).to_i : nil
    }
  end
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching uniqueness keys: #{e.message}" }
  []
end

#job_stats_summary(minutes: 60) ⇒ Object

Job stats



594
595
596
597
598
599
# File 'lib/pgbus/web/data_source.rb', line 594

def job_stats_summary(minutes: 60)
  JobStat.summary(minutes: minutes)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching job stats summary: #{e.message}" }
  { total: 0, success: 0, failed: 0, dead_lettered: 0, avg_duration_ms: 0, max_duration_ms: 0 }
end

#job_status_counts(minutes: 60) ⇒ Object



608
609
610
611
612
613
# File 'lib/pgbus/web/data_source.rb', line 608

def job_status_counts(minutes: 60)
  JobStat.status_counts(minutes: minutes)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching status counts: #{e.message}" }
  {}
end

#job_throughput(minutes: 60) ⇒ Object



601
602
603
604
605
606
# File 'lib/pgbus/web/data_source.rb', line 601

def job_throughput(minutes: 60)
  JobStat.throughput(minutes: minutes).map { |time, count| { time: time, count: count } }
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching throughput: #{e.message}" }
  []
end

#jobs(queue_name: nil, page: 1, per_page: 25) ⇒ Object

Jobs (messages in queue tables)



95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/pgbus/web/data_source.rb', line 95

def jobs(queue_name: nil, page: 1, per_page: 25)
  offset = (page - 1) * per_page

  if queue_name
    query_queue_messages(queue_name, per_page, offset)
  else
    all_queue_messages(per_page, offset)
  end
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error reading jobs: #{e.message}" }
  []
end

#latency_by_queue(minutes: 60) ⇒ Object



629
630
631
632
633
634
# File 'lib/pgbus/web/data_source.rb', line 629

def latency_by_queue(minutes: 60)
  JobStat.avg_latency_by_queue(minutes: minutes)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching latency by queue: #{e.message}" }
  []
end

#latency_trend(minutes: 60) ⇒ Object



622
623
624
625
626
627
# File 'lib/pgbus/web/data_source.rb', line 622

def latency_trend(minutes: 60)
  JobStat.latency_trend(minutes: minutes)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching latency trend: #{e.message}" }
  []
end

#mark_event_handled(queue_name, msg_id, handler_class) ⇒ Object

Mark an event as handled: archive the queue message and insert a ProcessedEvent record so it won’t be reprocessed on replay.

The insert is performed BEFORE archive. If the archive step fails afterwards the operator can retry — replay protection is already in place and the idempotency dedup will cause the handler to skip the event even if it is eventually re-read from the queue. Doing it the other way around would risk losing the message without recording the marker.



766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
# File 'lib/pgbus/web/data_source.rb', line 766

def mark_event_handled(queue_name, msg_id, handler_class)
  detail = job_detail(queue_name, msg_id)
  return false unless detail

  raw = JSON.parse(detail[:message])
  event_id = raw["event_id"]
  return false unless event_id

  ProcessedEvent.insert(
    { event_id: event_id, handler_class: handler_class, processed_at: Time.now.utc },
    unique_by: %i[event_id handler_class]
  )
  # Release the uniqueness lock while we still hold the payload in
  # memory — otherwise the message is archived but the lock row stays
  # behind, blocking later publishes with the same key. Mirrors
  # discard_event.
  release_lock_for_payload(detail[:message])
  @client.archive_message(queue_name, msg_id.to_i, prefixed: false)
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error marking event #{msg_id} handled: #{e.message}" }
  false
end

#outbox_entries(page: 1, per_page: 25) ⇒ Object



545
546
547
548
549
550
551
# File 'lib/pgbus/web/data_source.rb', line 545

def outbox_entries(page: 1, per_page: 25)
  offset = (page - 1) * per_page
  OutboxEntry.order(id: :desc).limit(per_page).offset(offset).to_a
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching outbox entries: #{e.message}" }
  []
end

#outbox_statsObject

Outbox



534
535
536
537
538
539
540
541
542
543
# File 'lib/pgbus/web/data_source.rb', line 534

def outbox_stats
  {
    unpublished: OutboxEntry.unpublished.count,
    total: OutboxEntry.count,
    oldest_unpublished_age: oldest_unpublished_age
  }
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching outbox stats: #{e.message}" }
  { unpublished: 0, total: 0, oldest_unpublished_age: nil }
end

#pause_queue(name, reason: nil) ⇒ Object



76
77
78
79
80
# File 'lib/pgbus/web/data_source.rb', line 76

def pause_queue(name, reason: nil)
  QueueState.pause!(logical_queue_name(name), reason: reason)
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error pausing queue #{name}: #{e.message}" }
end

#pending_events(page: 1, per_page: 25) ⇒ Object

Pending events — messages sitting in handler queues that haven’t been processed. Identifies handler queues via the subscriber registry and queries them for unprocessed messages. Subscriber queue names are logical (e.g. “task_completion_handler”), while ‘pgmq.meta.queue_name` stores physical names (e.g. “pgbus_task_completion_handler”), so we normalize through `config.queue_name` before intersecting.



715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
# File 'lib/pgbus/web/data_source.rb', line 715

def pending_events(page: 1, per_page: 25)
  handler_queues = handler_queue_physical_names
  return [] if handler_queues.empty?

  existing = connection.select_values(
    "SELECT queue_name FROM pgmq.meta ORDER BY queue_name", "Pgbus Queue Names"
  )
  target_queues = handler_queues & existing
  return [] if target_queues.empty?

  offset = (page - 1) * per_page
  paginated_queue_messages(target_queues, per_page, offset)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching pending events: #{e.message}" }
  []
end

#processed_event(id) ⇒ Object



381
382
383
384
385
386
387
388
389
390
# File 'lib/pgbus/web/data_source.rb', line 381

def processed_event(id)
  connection.select_one(
    "SELECT * FROM pgbus_processed_events WHERE id = $1",
    "Pgbus Processed Event",
    [id.to_i]
  )
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching processed event #{id}: #{e.message}" }
  nil
end

#processed_events(page: 1, per_page: 25) ⇒ Object

Processed events (audit trail)



368
369
370
371
372
373
374
375
376
377
378
379
# File 'lib/pgbus/web/data_source.rb', line 368

def processed_events(page: 1, per_page: 25)
  offset = (page - 1) * per_page
  rows = connection.select_all(
    "SELECT * FROM pgbus_processed_events ORDER BY processed_at DESC LIMIT $1 OFFSET $2",
    "Pgbus Processed Events",
    [per_page, offset]
  )
  rows.to_a
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching processed events: #{e.message}" }
  []
end

#processed_events_countObject



392
393
394
395
396
397
398
# File 'lib/pgbus/web/data_source.rb', line 392

def processed_events_count
  result = connection.select_value("SELECT COUNT(*) FROM pgbus_processed_events")
  result.to_i
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error counting processed events: #{e.message}" }
  0
end

#processesObject

Processes



357
358
359
360
361
362
363
364
365
# File 'lib/pgbus/web/data_source.rb', line 357

def processes
  rows = connection.select_all(
    "SELECT * FROM pgbus_processes ORDER BY kind, created_at"
  )
  rows.to_a.map { |r| format_process(r) }
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching processes: #{e.message}" }
  []
end

#purge_queue(name) ⇒ Object



66
67
68
69
# File 'lib/pgbus/web/data_source.rb', line 66

def purge_queue(name)
  release_uniqueness_keys_for_queue(name)
  @client.purge_queue(name, prefixed: false)
end

#queue_detail(name) ⇒ Object

name is the full PGMQ queue name (e.g. “pgbus_default”) as returned by queues_with_metrics. No prefix is added.



59
60
61
62
63
64
# File 'lib/pgbus/web/data_source.rb', line 59

def queue_detail(name)
  queue_metrics_via_sql(name)
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error fetching queue detail for #{name}: #{e.class}: #{e.message}" }
  nil
end

#queue_health_detail(queue_name) ⇒ Object

Per-queue health stats for the queue detail view.



667
668
669
670
671
672
673
674
675
676
677
678
# File 'lib/pgbus/web/data_source.rb', line 667

def queue_health_detail(queue_name)
  sanitized = sanitize_name(queue_name)
  tables = [
    fetch_table_stats("pgmq", "q_#{sanitized}", "queue"),
    fetch_table_stats("pgmq", "a_#{sanitized}", "archive")
  ].compact

  { tables: tables, oldest_transaction_age_sec: oldest_transaction_age }
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching health detail for #{queue_name}: #{e.message}" }
  { tables: [], oldest_transaction_age_sec: nil }
end

#queue_health_statsObject

Queue health — vacuum stats, dead tuples, bloat, MVCC horizon. Returns aggregate health across all queue and archive tables, plus the oldest open transaction age (MVCC horizon pinning risk).



639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
# File 'lib/pgbus/web/data_source.rb', line 639

def queue_health_stats
  tables = fetch_all_table_stats

  total_dead = tables.sum { |t| t[:dead_tuples] }
  total_live = tables.sum { |t| t[:live_tuples] }
  worst_bloat = tables.map { |t| t[:bloat_ratio] }.max || 0.0
  needs_vacuum = tables.count { |t| t[:bloat_ratio] > 0.1 }
  oldest_vacuum = tables.filter_map { |t| t[:last_vacuum_ago_sec] }.max

  {
    total_dead_tuples: total_dead,
    total_live_tuples: total_live,
    worst_bloat_ratio: worst_bloat.round(4),
    tables_needing_vacuum: needs_vacuum,
    oldest_vacuum_ago_sec: oldest_vacuum,
    oldest_transaction_age_sec: oldest_transaction_age,
    tables: tables
  }
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching queue health stats: #{e.class}: #{e.message}" }
  {
    total_dead_tuples: 0, total_live_tuples: 0, worst_bloat_ratio: 0.0,
    tables_needing_vacuum: 0, oldest_vacuum_ago_sec: nil,
    oldest_transaction_age_sec: nil, tables: []
  }
end

#queue_paused?(name) ⇒ Boolean

Returns:

  • (Boolean)


88
89
90
91
92
# File 'lib/pgbus/web/data_source.rb', line 88

def queue_paused?(name)
  QueueState.paused?(logical_queue_name(name))
rescue StandardError
  false
end

#queues_with_metricsObject

Queues — query via ActiveRecord for reliability in web processes (avoids PGMQ client connection issues when the web server uses a different connection lifecycle than the worker processes).



44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/pgbus/web/data_source.rb', line 44

def queues_with_metrics
  queue_names = connection.select_values("SELECT queue_name FROM pgmq.meta ORDER BY queue_name")
  return [] if queue_names.empty?

  paused_queues = paused_queue_names.to_set
  batched_queue_metrics(queue_names).map do |q|
    q.merge(paused: paused_queues.include?(logical_queue_name(q[:name])))
  end
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error fetching queue metrics: #{e.class}: #{e.message}" }
  []
end

#recurring_task(id) ⇒ Object



455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
# File 'lib/pgbus/web/data_source.rb', line 455

def recurring_task(id)
  record = RecurringTask.find_by(id: id)
  return nil unless record

  task = Recurring::Task.from_configuration(record.key,
                                            class: record.class_name,
                                            command: record.command,
                                            schedule: record.schedule,
                                            queue: record.queue_name,
                                            args: parse_arguments(record.arguments),
                                            priority: record.priority,
                                            description: record.description)

  executions = RecurringExecution.for_task(record.key).recent(25).map do |exec|
    { run_at: exec.run_at, created_at: exec.created_at }
  end

  {
    id: record.id,
    key: record.key,
    class_name: record.class_name,
    command: record.command,
    schedule: record.schedule,
    human_schedule: task.human_schedule,
    queue_name: record.queue_name,
    arguments: parse_arguments(record.arguments),
    priority: record.priority,
    description: record.description,
    enabled: record.enabled,
    static: record.static,
    next_run_at: task.next_time,
    executions: executions,
    created_at: record.created_at,
    updated_at: record.updated_at
  }
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error fetching recurring task #{id}: #{e.class}: #{e.message}" }
  nil
end

#recurring_tasksObject

Recurring tasks



413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
# File 'lib/pgbus/web/data_source.rb', line 413

def recurring_tasks
  records = RecurringTask.order(:key).to_a
  last_runs = RecurringExecution
              .where(task_key: records.map(&:key))
              .select("task_key, MAX(run_at) AS run_at")
              .group(:task_key)
              .index_by(&:task_key)

  records.map do |record|
    last_exec = last_runs[record.key]
    task = Recurring::Task.from_configuration(record.key,
                                              class: record.class_name,
                                              command: record.command,
                                              schedule: record.schedule,
                                              queue: record.queue_name,
                                              args: parse_arguments(record.arguments),
                                              priority: record.priority,
                                              description: record.description)

    {
      id: record.id,
      key: record.key,
      class_name: record.class_name,
      command: record.command,
      schedule: record.schedule,
      human_schedule: task.human_schedule,
      queue_name: record.queue_name,
      priority: record.priority,
      description: record.description,
      enabled: record.enabled,
      static: record.static,
      next_run_at: task.next_time,
      last_run_at: last_exec&.run_at,
      created_at: record.created_at,
      updated_at: record.updated_at
    }
  end
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error fetching recurring tasks: #{e.class}: #{e.message}" }
  []
end

#recurring_tasks_countObject



526
527
528
529
530
531
# File 'lib/pgbus/web/data_source.rb', line 526

def recurring_tasks_count
  RecurringTask.count
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error counting recurring tasks: #{e.message}" }
  0
end

#registered_subscribersObject

Subscriber registry. ‘queue_name` is the logical name the subscriber registered with; `physical_queue_name` is what the queue is actually called in `pgmq.meta` (e.g. logical “task_completion_handler” -> physical “pgbus_task_completion_handler”). The dashboard needs the physical name to match against pending messages / target queues.



850
851
852
853
854
855
856
857
858
859
860
861
862
# File 'lib/pgbus/web/data_source.rb', line 850

def registered_subscribers
  EventBus::Registry.instance.subscribers.map do |s|
    {
      pattern: s.pattern,
      handler_class: s.handler_class.name,
      queue_name: s.queue_name,
      physical_queue_name: @client.config.queue_name(s.queue_name)
    }
  end
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching subscribers: #{e.message}" }
  []
end

#replay_event(event) ⇒ Object



400
401
402
403
404
405
406
407
408
409
410
# File 'lib/pgbus/web/data_source.rb', line 400

def replay_event(event)
  # Re-publish the event payload to all matching subscribers
  routing_key = event["routing_key"] || event["handler_class"]
  return false unless routing_key

  @client.publish_to_topic(routing_key, event["payload"] || "{}")
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error replaying event: #{e.message}" }
  false
end

#reroute_event(source_queue, msg_id, target_queue) ⇒ Object

Reroute an event from one handler queue to another. Wrapped in a PGMQ transaction so produce on the target and delete on the source are atomic.



817
818
819
820
821
822
823
824
825
826
827
828
829
# File 'lib/pgbus/web/data_source.rb', line 817

def reroute_event(source_queue, msg_id, target_queue)
  detail = job_detail(source_queue, msg_id)
  return false unless detail

  @client.transaction do |txn|
    txn.produce(target_queue, detail[:message], headers: detail[:headers])
    txn.delete(source_queue, msg_id.to_i)
  end
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error rerouting event #{msg_id}: #{e.message}" }
  false
end

#resume_queue(name) ⇒ Object



82
83
84
85
86
# File 'lib/pgbus/web/data_source.rb', line 82

def resume_queue(name)
  QueueState.resume!(logical_queue_name(name))
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error resuming queue #{name}: #{e.message}" }
end

#retry_all_dlqObject



328
329
330
331
332
333
334
335
336
337
338
# File 'lib/pgbus/web/data_source.rb', line 328

def retry_all_dlq
  messages = dlq_messages(page: 1, per_page: 1000)
  count = 0
  messages.each do |m|
    retry_dlq_message(m[:queue_name], m[:msg_id]) && count += 1
  rescue StandardError => e
    Pgbus.logger.debug { "[Pgbus::Web] Error retrying DLQ message #{m[:msg_id]}: #{e.message}" }
    next
  end
  count
end

#retry_all_failedObject



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/pgbus/web/data_source.rb', line 227

def retry_all_failed
  count = 0
  loop do
    batch = connection.select_all(
      "SELECT * FROM pgbus_failed_events ORDER BY id LIMIT 100", "Pgbus Retry Batch"
    ).to_a
    break if batch.empty?

    batch.each do |event|
      payload = JSON.parse(event["payload"])
      headers = event["headers"]
      headers = JSON.parse(headers) if headers.is_a?(String)

      connection.transaction do
        @client.send_message(event["queue_name"], payload, headers: headers)
        connection.exec_delete(
          "DELETE FROM pgbus_failed_events WHERE id = $1", "Pgbus Delete Failed Event", [event["id"].to_i]
        )
      end
      count += 1
    rescue StandardError => e
      Pgbus.logger.error { "[Pgbus::Web] Failed to retry event #{event["id"]}: #{e.message}" }
    end
  end
  count
end

#retry_dlq_message(queue_name, msg_id) ⇒ Object



296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
# File 'lib/pgbus/web/data_source.rb', line 296

def retry_dlq_message(queue_name, msg_id)
  # queue_name here is the full DLQ name (already prefixed)
  dlq_suffix = Pgbus::DEAD_LETTER_SUFFIX
  original_queue = queue_name.delete_suffix(dlq_suffix)

  row = connection.select_one(
    "SELECT * FROM pgmq.q_#{sanitize_name(queue_name)} WHERE msg_id = $1",
    "Pgbus DLQ Read",
    [msg_id.to_i]
  )
  return false unless row

  @client.transaction do |txn|
    txn.produce(original_queue, row["message"], headers: row["headers"])
    txn.delete(queue_name, msg_id.to_i)
  end
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error retrying DLQ message #{msg_id}: #{e.message}" }
  false
end

#retry_failed_event(id) ⇒ Object



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/pgbus/web/data_source.rb', line 183

def retry_failed_event(id)
  event = failed_event(id)
  return false unless event

  # Prefer resetting the existing message's visibility timeout to 0
  # so the worker picks it up immediately. This avoids creating a
  # duplicate (the original is still in the queue waiting for retry).
  # Falls back to enqueueing a fresh copy only if the original is gone
  # (e.g., already moved to DLQ).
  msg_id = event["msg_id"]
  if msg_id && @client.message_exists?(event["queue_name"], msg_id: msg_id.to_i)
    @client.set_visibility_timeout(event["queue_name"], msg_id.to_i, vt: 0)
  else
    payload = JSON.parse(event["payload"])
    headers = event["headers"]
    headers = JSON.parse(headers) if headers.is_a?(String)
    @client.send_message(event["queue_name"], payload, headers: headers)
  end

  connection.exec_delete(
    "DELETE FROM pgbus_failed_events WHERE id = $1", "Pgbus Delete Failed Event", [id.to_i]
  )
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error retrying failed event #{id}: #{e.message}" }
  false
end

#retry_job(queue_name, msg_id) ⇒ Object



120
121
122
# File 'lib/pgbus/web/data_source.rb', line 120

def retry_job(queue_name, msg_id)
  @client.set_visibility_timeout(queue_name, msg_id.to_i, vt: 0, prefixed: false)
end

#slowest_job_classes(limit: 10, minutes: 60) ⇒ Object



615
616
617
618
619
620
# File 'lib/pgbus/web/data_source.rb', line 615

def slowest_job_classes(limit: 10, minutes: 60)
  JobStat.slowest_classes(limit: limit, minutes: minutes)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching slowest classes: #{e.message}" }
  []
end

#stream_stats_available?Boolean

Stream stats — only populated when streams_stats_enabled is true AND the migration has been run. Controllers should gate rendering on ‘stream_stats_available?` to avoid showing empty sections.

Returns:

  • (Boolean)


684
685
686
687
688
689
# File 'lib/pgbus/web/data_source.rb', line 684

def stream_stats_available?
  Pgbus.configuration.streams_stats_enabled && StreamStat.table_exists?
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error checking stream stats availability: #{e.message}" }
  false
end

#stream_stats_summary(minutes: 60) ⇒ Object



691
692
693
694
695
696
697
698
699
700
# File 'lib/pgbus/web/data_source.rb', line 691

def stream_stats_summary(minutes: 60)
  StreamStat.summary(minutes: minutes)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching stream stats summary: #{e.message}" }
  {
    broadcasts: 0, connects: 0, disconnects: 0,
    active_estimate: 0, avg_fanout: 0,
    avg_broadcast_ms: 0, avg_connect_ms: 0
  }
end

#summary_statsObject

Dashboard summary



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/pgbus/web/data_source.rb', line 15

def summary_stats
  queues = queues_with_metrics
  total_depth = queues.sum { |q| q[:queue_length] }
  total_visible = queues.sum { |q| q[:queue_visible_length] }
  dlq_suffix = Pgbus::DEAD_LETTER_SUFFIX
  dlq_depth = queues.select { |q| q[:name].end_with?(dlq_suffix) }.sum { |q| q[:queue_length] }

  throughput = compute_throughput(queues)

  health = queue_health_stats

  {
    total_queues: queues.size,
    total_depth: total_depth,
    total_visible: total_visible,
    active_processes: processes.count,
    failed_count: failed_events_count,
    dlq_depth: dlq_depth,
    recurring_count: recurring_tasks_count,
    throughput_rate: throughput,
    total_dead_tuples: health[:total_dead_tuples],
    tables_needing_vacuum: health[:tables_needing_vacuum],
    oldest_transaction_age_sec: health[:oldest_transaction_age_sec]
  }
end

#toggle_recurring_task(id) ⇒ Object



495
496
497
498
499
500
501
502
503
504
# File 'lib/pgbus/web/data_source.rb', line 495

def toggle_recurring_task(id)
  record = RecurringTask.find_by(id: id)
  return nil unless record

  record.update!(enabled: !record.enabled)
  record.enabled ? :enabled : :disabled
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error toggling recurring task #{id}: #{e.message}" }
  nil
end

#top_streams(limit: 10, minutes: 60) ⇒ Object



702
703
704
705
706
707
# File 'lib/pgbus/web/data_source.rb', line 702

def top_streams(limit: 10, minutes: 60)
  StreamStat.top_streams(limit: limit, minutes: minutes)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching top streams: #{e.message}" }
  []
end