Class: Pgbus::Web::DataSource

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/web/data_source.rb

Instance Method Summary collapse

Constructor Details

#initialize(client: Pgbus.client) ⇒ DataSource

Returns a new instance of DataSource.



8
9
10
11
12
# File 'lib/pgbus/web/data_source.rb', line 8

def initialize(client: Pgbus.client)
  @client = client
  @last_throughput_snapshot = nil
  @last_throughput_at = nil
end

Instance Method Details

#discard_all_dlqObject



341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
# File 'lib/pgbus/web/data_source.rb', line 341

def discard_all_dlq
  messages = dlq_messages(page: 1, per_page: 1000)
  return 0 if messages.empty?

  release_locks_for_messages(messages)

  # Group by queue for batch delete — one call per DLQ instead of N calls
  messages.group_by { |m| m[:queue_name] }.sum do |queue_name, msgs|
    ids = msgs.map { |m| m[:msg_id].to_i }
    @client.delete_batch(queue_name, ids, prefixed: false).size
  rescue StandardError => e
    Pgbus.logger.debug { "[Pgbus::Web] Error batch-discarding DLQ messages from #{queue_name}: #{e.message}" }
    0
  end
end

#discard_all_enqueuedObject



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/pgbus/web/data_source.rb', line 130

def discard_all_enqueued
  dlq_suffix = Pgbus::DEAD_LETTER_SUFFIX
  queues = queues_with_metrics.reject { |q| q[:name].end_with?(dlq_suffix) }
  total = 0

  queues.each do |q|
    messages = query_queue_messages_raw(q[:name], 10_000, 0)
    next if messages.empty?

    release_locks_for_messages(messages)

    ids = messages.map { |m| m[:msg_id].to_i }
    @client.archive_batch(q[:name], ids, prefixed: false)
    total += ids.size
  rescue StandardError => e
    Pgbus.logger.debug { "[Pgbus::Web] Error discarding enqueued messages from #{q[:name]}: #{e.message}" }
  end

  total
end

#discard_all_failedObject



255
256
257
258
259
260
261
262
263
264
# File 'lib/pgbus/web/data_source.rb', line 255

def discard_all_failed
  release_locks_for_failed_events
  archive_all_failed_messages

  result = connection.execute("DELETE FROM pgbus_failed_events")
  result.cmd_tuples
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error discarding all failed events: #{e.message}" }
  0
end

#discard_all_locksObject



571
572
573
574
575
576
# File 'lib/pgbus/web/data_source.rb', line 571

def discard_all_locks
  UniquenessKey.delete_all
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error discarding all locks: #{e.message}" }
  0
end

#discard_dlq_message(queue_name, msg_id) ⇒ Object



319
320
321
322
323
324
325
326
327
# File 'lib/pgbus/web/data_source.rb', line 319

def discard_dlq_message(queue_name, msg_id)
  # queue_name here is the full DLQ name (already prefixed)
  release_lock_for_message(queue_name, msg_id)
  @client.delete_message(queue_name, msg_id.to_i, prefixed: false)
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error discarding DLQ message #{msg_id}: #{e.message}" }
  false
end

#discard_event(queue_name, msg_id) ⇒ Object

Discard (archive) an event message from a handler queue.



749
750
751
752
753
754
755
756
# File 'lib/pgbus/web/data_source.rb', line 749

def discard_event(queue_name, msg_id)
  release_lock_for_message(queue_name, msg_id)
  @client.archive_message(queue_name, msg_id.to_i, prefixed: false)
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error discarding event #{msg_id}: #{e.message}" }
  false
end

#discard_failed_event(id) ⇒ Object



212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/pgbus/web/data_source.rb', line 212

def discard_failed_event(id)
  event = failed_event(id)
  if event
    release_lock_for_payload(event["payload"])
    archive_failed_message(event)
  end

  connection.exec_delete(
    "DELETE FROM pgbus_failed_events WHERE id = $1", "Pgbus Delete Failed Event", [id.to_i]
  )
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error discarding failed event #{id}: #{e.message}" }
  false
end

#discard_job(queue_name, msg_id) ⇒ Object



125
126
127
128
# File 'lib/pgbus/web/data_source.rb', line 125

def discard_job(queue_name, msg_id)
  release_lock_for_message(queue_name, msg_id)
  @client.archive_message(queue_name, msg_id.to_i, prefixed: false)
end

#discard_lock(lock_key) ⇒ Object

Lock management



555
556
557
558
559
560
# File 'lib/pgbus/web/data_source.rb', line 555

def discard_lock(lock_key)
  UniquenessKey.where(lock_key: lock_key).delete_all
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error discarding lock #{lock_key}: #{e.message}" }
  0
end

#discard_locks(lock_keys) ⇒ Object



562
563
564
565
566
567
568
569
# File 'lib/pgbus/web/data_source.rb', line 562

def discard_locks(lock_keys)
  return 0 if lock_keys.empty?

  UniquenessKey.where(lock_key: lock_keys).delete_all
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error discarding locks: #{e.message}" }
  0
end

#discard_selected_events(selections) ⇒ Object

Bulk discard selected events from handler queues.



833
834
835
836
837
838
839
840
841
842
843
844
# File 'lib/pgbus/web/data_source.rb', line 833

def discard_selected_events(selections)
  return 0 if selections.empty?

  count = 0
  selections.each do |sel|
    discard_event(sel[:queue_name], sel[:msg_id]) && count += 1
  rescue StandardError => e
    Pgbus.logger.debug { "[Pgbus::Web] Error in bulk discard for #{sel[:msg_id]}: #{e.message}" }
    next
  end
  count
end

#dlq_message_detail(msg_id) ⇒ Object



280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# File 'lib/pgbus/web/data_source.rb', line 280

def dlq_message_detail(msg_id)
  dlq_suffix = Pgbus::DEAD_LETTER_SUFFIX
  queues = queues_with_metrics.select { |q| q[:name].end_with?(dlq_suffix) }
  queues.each do |q|
    row = connection.select_one(
      "SELECT * FROM pgmq.q_#{sanitize_name(q[:name])} WHERE msg_id = $1",
      "Pgbus DLQ Detail",
      [msg_id.to_i]
    )
    return format_message(row, q[:name]) if row
  end
  nil
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching DLQ message #{msg_id}: #{e.message}" }
  nil
end

#dlq_messages(page: 1, per_page: 25) ⇒ Object

Dead letter queue Note: DLQ queue names from queues_with_metrics are already fully qualified (e.g., “pgbus_default_dlq”), so we use them directly without re-prefixing.



269
270
271
272
273
274
275
276
277
278
# File 'lib/pgbus/web/data_source.rb', line 269

def dlq_messages(page: 1, per_page: 25)
  dlq_suffix = Pgbus::DEAD_LETTER_SUFFIX
  queues = queues_with_metrics.select { |q| q[:name].end_with?(dlq_suffix) }
  offset = (page - 1) * per_page

  paginated_queue_messages(queues.map { |q| q[:name] }, per_page, offset)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching DLQ messages: #{e.message}" }
  []
end

#drop_queue(name) ⇒ Object



72
73
74
75
# File 'lib/pgbus/web/data_source.rb', line 72

def drop_queue(name)
  release_uniqueness_keys_for_queue(name)
  @client.drop_queue(name, prefixed: false)
end

#edit_event_payload(queue_name, msg_id, new_payload_json) ⇒ Object

Edit the payload of a stuck event: delete old message and re-enqueue with the corrected payload in the same queue. The produce + delete are wrapped in a PGMQ transaction so the message can’t be lost if either half fails (same pattern as retry_dlq_message).



795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
# File 'lib/pgbus/web/data_source.rb', line 795

def edit_event_payload(queue_name, msg_id, new_payload_json)
  begin
    parsed = JSON.parse(new_payload_json)
  rescue JSON::ParserError
    return false
  end

  detail = job_detail(queue_name, msg_id)
  return false unless detail

  @client.transaction do |txn|
    txn.produce(queue_name, parsed.to_json, headers: detail[:headers])
    txn.delete(queue_name, msg_id.to_i)
  end
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error editing event #{msg_id}: #{e.message}" }
  false
end

#enqueue_recurring_task_now(id) ⇒ Object



507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
# File 'lib/pgbus/web/data_source.rb', line 507

def enqueue_recurring_task_now(id)
  record = RecurringTask.find_by(id: id)
  return false unless record

  task = Recurring::Task.from_configuration(record.key,
                                            class: record.class_name,
                                            command: record.command,
                                            schedule: record.schedule,
                                            queue: record.queue_name,
                                            args: parse_arguments(record.arguments),
                                            priority: record.priority)

  schedule = Recurring::Schedule.new(config: Pgbus.configuration)
  schedule.enqueue_task(task, run_at: Time.now.utc)
  true
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error enqueuing recurring task #{id}: #{e.message}" }
  false
end

#failed_event(id) ⇒ Object



173
174
175
176
177
178
179
180
181
182
# File 'lib/pgbus/web/data_source.rb', line 173

def failed_event(id)
  connection.select_one(
    "SELECT * FROM pgbus_failed_events WHERE id = $1",
    "Pgbus Failed Event",
    [id.to_i]
  )
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching failed event #{id}: #{e.message}" }
  nil
end

#failed_events(page: 1, per_page: 25) ⇒ Object

Failed events



152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/pgbus/web/data_source.rb', line 152

def failed_events(page: 1, per_page: 25)
  offset = (page - 1) * per_page
  rows = connection.select_all(
    "SELECT * FROM pgbus_failed_events ORDER BY failed_at DESC LIMIT $1 OFFSET $2",
    "Pgbus Failed Events",
    [per_page, offset]
  )
  rows.to_a
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching failed events: #{e.message}" }
  []
end

#failed_events_countObject



165
166
167
168
169
170
171
# File 'lib/pgbus/web/data_source.rb', line 165

def failed_events_count
  result = connection.select_value("SELECT COUNT(*) FROM pgbus_failed_events")
  result.to_i
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error counting failed events: #{e.message}" }
  0
end

#handler_class_for_queue(physical_queue_name) ⇒ Object

Find the handler class registered for a given physical queue name. Returns nil if no subscriber matches — used to reject forged handler values in mark_event_handled / reroute_event.



743
744
745
746
# File 'lib/pgbus/web/data_source.rb', line 743

def handler_class_for_queue(physical_queue_name)
  sub = registered_subscribers.find { |s| s[:physical_queue_name] == physical_queue_name }
  sub && sub[:handler_class]
end

#handler_queue_physical_namesObject

Physical queue names for all registered subscribers. Used for both pending_events lookup and server-side validation of target queues in reroute_event.



736
737
738
# File 'lib/pgbus/web/data_source.rb', line 736

def handler_queue_physical_names
  registered_subscribers.map { |s| s[:physical_queue_name] }.uniq
end

#job_detail(queue_name, msg_id) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
# File 'lib/pgbus/web/data_source.rb', line 109

def job_detail(queue_name, msg_id)
  row = connection.select_one(
    "SELECT * FROM pgmq.q_#{sanitize_name(queue_name)} WHERE msg_id = $1",
    "Pgbus Job Detail",
    [msg_id.to_i]
  )
  row ? format_message(row, queue_name) : nil
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching job detail: #{e.message}" }
  nil
end

#job_locksObject

Job uniqueness keys



579
580
581
582
583
584
585
586
587
588
589
590
591
592
# File 'lib/pgbus/web/data_source.rb', line 579

def job_locks
  UniquenessKey.order(created_at: :desc).limit(100).map do |key|
    {
      lock_key: key.lock_key,
      queue_name: key.queue_name,
      msg_id: key.msg_id,
      created_at: key.created_at,
      age_seconds: key.created_at ? (Time.current - key.created_at).to_i : nil
    }
  end
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching uniqueness keys: #{e.message}" }
  []
end

#job_stats_summary(minutes: 60) ⇒ Object

Job stats



595
596
597
598
599
600
# File 'lib/pgbus/web/data_source.rb', line 595

def job_stats_summary(minutes: 60)
  JobStat.summary(minutes: minutes)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching job stats summary: #{e.message}" }
  { total: 0, success: 0, failed: 0, dead_lettered: 0, avg_duration_ms: 0, max_duration_ms: 0 }
end

#job_status_counts(minutes: 60) ⇒ Object



609
610
611
612
613
614
# File 'lib/pgbus/web/data_source.rb', line 609

def job_status_counts(minutes: 60)
  JobStat.status_counts(minutes: minutes)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching status counts: #{e.message}" }
  {}
end

#job_throughput(minutes: 60) ⇒ Object



602
603
604
605
606
607
# File 'lib/pgbus/web/data_source.rb', line 602

def job_throughput(minutes: 60)
  JobStat.throughput(minutes: minutes).map { |time, count| { time: time, count: count } }
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching throughput: #{e.message}" }
  []
end

#jobs(queue_name: nil, page: 1, per_page: 25) ⇒ Object

Jobs (messages in queue tables)



96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/pgbus/web/data_source.rb', line 96

def jobs(queue_name: nil, page: 1, per_page: 25)
  offset = (page - 1) * per_page

  if queue_name
    query_queue_messages(queue_name, per_page, offset)
  else
    all_queue_messages(per_page, offset)
  end
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error reading jobs: #{e.message}" }
  []
end

#latency_by_queue(minutes: 60) ⇒ Object



630
631
632
633
634
635
# File 'lib/pgbus/web/data_source.rb', line 630

def latency_by_queue(minutes: 60)
  JobStat.avg_latency_by_queue(minutes: minutes)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching latency by queue: #{e.message}" }
  []
end

#latency_trend(minutes: 60) ⇒ Object



623
624
625
626
627
628
# File 'lib/pgbus/web/data_source.rb', line 623

def latency_trend(minutes: 60)
  JobStat.latency_trend(minutes: minutes)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching latency trend: #{e.message}" }
  []
end

#mark_event_handled(queue_name, msg_id, handler_class) ⇒ Object

Mark an event as handled: archive the queue message and insert a ProcessedEvent record so it won’t be reprocessed on replay.

The insert is performed BEFORE archive. If the archive step fails afterwards the operator can retry — replay protection is already in place and the idempotency dedup will cause the handler to skip the event even if it is eventually re-read from the queue. Doing it the other way around would risk losing the message without recording the marker.



767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
# File 'lib/pgbus/web/data_source.rb', line 767

def mark_event_handled(queue_name, msg_id, handler_class)
  detail = job_detail(queue_name, msg_id)
  return false unless detail

  raw = JSON.parse(detail[:message])
  event_id = raw["event_id"]
  return false unless event_id

  ProcessedEvent.insert(
    { event_id: event_id, handler_class: handler_class, processed_at: Time.now.utc },
    unique_by: %i[event_id handler_class]
  )
  # Release the uniqueness lock while we still hold the payload in
  # memory — otherwise the message is archived but the lock row stays
  # behind, blocking later publishes with the same key. Mirrors
  # discard_event.
  release_lock_for_payload(detail[:message])
  @client.archive_message(queue_name, msg_id.to_i, prefixed: false)
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error marking event #{msg_id} handled: #{e.message}" }
  false
end

#outbox_entries(page: 1, per_page: 25) ⇒ Object



546
547
548
549
550
551
552
# File 'lib/pgbus/web/data_source.rb', line 546

def outbox_entries(page: 1, per_page: 25)
  offset = (page - 1) * per_page
  OutboxEntry.order(id: :desc).limit(per_page).offset(offset).to_a
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching outbox entries: #{e.message}" }
  []
end

#outbox_statsObject

Outbox



535
536
537
538
539
540
541
542
543
544
# File 'lib/pgbus/web/data_source.rb', line 535

def outbox_stats
  {
    unpublished: OutboxEntry.unpublished.count,
    total: OutboxEntry.count,
    oldest_unpublished_age: oldest_unpublished_age
  }
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching outbox stats: #{e.message}" }
  { unpublished: 0, total: 0, oldest_unpublished_age: nil }
end

#pause_queue(name, reason: nil) ⇒ Object



77
78
79
80
81
# File 'lib/pgbus/web/data_source.rb', line 77

def pause_queue(name, reason: nil)
  QueueState.pause!(logical_queue_name(name), reason: reason)
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error pausing queue #{name}: #{e.message}" }
end

#pending_events(page: 1, per_page: 25) ⇒ Object

Pending events — messages sitting in handler queues that haven’t been processed. Identifies handler queues via the subscriber registry and queries them for unprocessed messages. Subscriber queue names are logical (e.g. “task_completion_handler”), while ‘pgmq.meta.queue_name` stores physical names (e.g. “pgbus_task_completion_handler”), so we normalize through `config.queue_name` before intersecting.



716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
# File 'lib/pgbus/web/data_source.rb', line 716

def pending_events(page: 1, per_page: 25)
  handler_queues = handler_queue_physical_names
  return [] if handler_queues.empty?

  existing = connection.select_values(
    "SELECT queue_name FROM pgmq.meta ORDER BY queue_name", "Pgbus Queue Names"
  )
  target_queues = handler_queues & existing
  return [] if target_queues.empty?

  offset = (page - 1) * per_page
  paginated_queue_messages(target_queues, per_page, offset)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching pending events: #{e.message}" }
  []
end

#processed_event(id) ⇒ Object



382
383
384
385
386
387
388
389
390
391
# File 'lib/pgbus/web/data_source.rb', line 382

def processed_event(id)
  connection.select_one(
    "SELECT * FROM pgbus_processed_events WHERE id = $1",
    "Pgbus Processed Event",
    [id.to_i]
  )
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching processed event #{id}: #{e.message}" }
  nil
end

#processed_events(page: 1, per_page: 25) ⇒ Object

Processed events (audit trail)



369
370
371
372
373
374
375
376
377
378
379
380
# File 'lib/pgbus/web/data_source.rb', line 369

def processed_events(page: 1, per_page: 25)
  offset = (page - 1) * per_page
  rows = connection.select_all(
    "SELECT * FROM pgbus_processed_events ORDER BY processed_at DESC LIMIT $1 OFFSET $2",
    "Pgbus Processed Events",
    [per_page, offset]
  )
  rows.to_a
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching processed events: #{e.message}" }
  []
end

#processed_events_countObject



393
394
395
396
397
398
399
# File 'lib/pgbus/web/data_source.rb', line 393

def processed_events_count
  result = connection.select_value("SELECT COUNT(*) FROM pgbus_processed_events")
  result.to_i
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error counting processed events: #{e.message}" }
  0
end

#processesObject

Processes



358
359
360
361
362
363
364
365
366
# File 'lib/pgbus/web/data_source.rb', line 358

def processes
  rows = connection.select_all(
    "SELECT * FROM pgbus_processes ORDER BY kind, created_at"
  )
  rows.to_a.map { |r| format_process(r) }
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching processes: #{e.message}" }
  []
end

#purge_queue(name) ⇒ Object



67
68
69
70
# File 'lib/pgbus/web/data_source.rb', line 67

def purge_queue(name)
  release_uniqueness_keys_for_queue(name)
  @client.purge_queue(name, prefixed: false)
end

#queue_detail(name) ⇒ Object

name is the full PGMQ queue name (e.g. “pgbus_default”) as returned by queues_with_metrics. No prefix is added.



60
61
62
63
64
65
# File 'lib/pgbus/web/data_source.rb', line 60

def queue_detail(name)
  queue_metrics_via_sql(name)
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error fetching queue detail for #{name}: #{e.class}: #{e.message}" }
  nil
end

#queue_health_detail(queue_name) ⇒ Object

Per-queue health stats for the queue detail view.



668
669
670
671
672
673
674
675
676
677
678
679
# File 'lib/pgbus/web/data_source.rb', line 668

def queue_health_detail(queue_name)
  sanitized = sanitize_name(queue_name)
  tables = [
    fetch_table_stats("pgmq", "q_#{sanitized}", "queue"),
    fetch_table_stats("pgmq", "a_#{sanitized}", "archive")
  ].compact

  { tables: tables, oldest_transaction_age_sec: oldest_transaction_age }
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching health detail for #{queue_name}: #{e.message}" }
  { tables: [], oldest_transaction_age_sec: nil }
end

#queue_health_statsObject

Queue health — vacuum stats, dead tuples, bloat, MVCC horizon. Returns aggregate health across all queue and archive tables, plus the oldest open transaction age (MVCC horizon pinning risk).



640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
# File 'lib/pgbus/web/data_source.rb', line 640

def queue_health_stats
  tables = fetch_all_table_stats

  total_dead = tables.sum { |t| t[:dead_tuples] }
  total_live = tables.sum { |t| t[:live_tuples] }
  worst_bloat = tables.map { |t| t[:bloat_ratio] }.max || 0.0
  needs_vacuum = tables.count { |t| t[:bloat_ratio] > 0.1 }
  oldest_vacuum = tables.filter_map { |t| t[:last_vacuum_ago_sec] }.max

  {
    total_dead_tuples: total_dead,
    total_live_tuples: total_live,
    worst_bloat_ratio: worst_bloat.round(4),
    tables_needing_vacuum: needs_vacuum,
    oldest_vacuum_ago_sec: oldest_vacuum,
    oldest_transaction_age_sec: oldest_transaction_age,
    tables: tables
  }
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching queue health stats: #{e.class}: #{e.message}" }
  {
    total_dead_tuples: 0, total_live_tuples: 0, worst_bloat_ratio: 0.0,
    tables_needing_vacuum: 0, oldest_vacuum_ago_sec: nil,
    oldest_transaction_age_sec: nil, tables: []
  }
end

#queue_paused?(name) ⇒ Boolean

Returns:

  • (Boolean)


89
90
91
92
93
# File 'lib/pgbus/web/data_source.rb', line 89

def queue_paused?(name)
  QueueState.paused?(logical_queue_name(name))
rescue StandardError
  false
end

#queues_with_metricsObject

Queues — query via ActiveRecord for reliability in web processes (avoids PGMQ client connection issues when the web server uses a different connection lifecycle than the worker processes).



44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/pgbus/web/data_source.rb', line 44

def queues_with_metrics
  queue_names = connection.select_values("SELECT queue_name FROM pgmq.meta ORDER BY queue_name")
  # paused_queue_names returns an Array; convert to Set so the
  # per-queue membership check is O(1). With 100+ queues the
  # Array#include? cost in the loop was O(n²) per dashboard load.
  paused_queues = paused_queue_names.to_set
  queue_names.map { |name| queue_metrics_via_sql(name) }.compact.map do |q|
    q.merge(paused: paused_queues.include?(logical_queue_name(q[:name])))
  end
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error fetching queue metrics: #{e.class}: #{e.message}" }
  []
end

#recurring_task(id) ⇒ Object



456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
# File 'lib/pgbus/web/data_source.rb', line 456

def recurring_task(id)
  record = RecurringTask.find_by(id: id)
  return nil unless record

  task = Recurring::Task.from_configuration(record.key,
                                            class: record.class_name,
                                            command: record.command,
                                            schedule: record.schedule,
                                            queue: record.queue_name,
                                            args: parse_arguments(record.arguments),
                                            priority: record.priority,
                                            description: record.description)

  executions = RecurringExecution.for_task(record.key).recent(25).map do |exec|
    { run_at: exec.run_at, created_at: exec.created_at }
  end

  {
    id: record.id,
    key: record.key,
    class_name: record.class_name,
    command: record.command,
    schedule: record.schedule,
    human_schedule: task.human_schedule,
    queue_name: record.queue_name,
    arguments: parse_arguments(record.arguments),
    priority: record.priority,
    description: record.description,
    enabled: record.enabled,
    static: record.static,
    next_run_at: task.next_time,
    executions: executions,
    created_at: record.created_at,
    updated_at: record.updated_at
  }
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error fetching recurring task #{id}: #{e.class}: #{e.message}" }
  nil
end

#recurring_tasksObject

Recurring tasks



414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
# File 'lib/pgbus/web/data_source.rb', line 414

def recurring_tasks
  records = RecurringTask.order(:key).to_a
  last_runs = RecurringExecution
              .where(task_key: records.map(&:key))
              .select("task_key, MAX(run_at) AS run_at")
              .group(:task_key)
              .index_by(&:task_key)

  records.map do |record|
    last_exec = last_runs[record.key]
    task = Recurring::Task.from_configuration(record.key,
                                              class: record.class_name,
                                              command: record.command,
                                              schedule: record.schedule,
                                              queue: record.queue_name,
                                              args: parse_arguments(record.arguments),
                                              priority: record.priority,
                                              description: record.description)

    {
      id: record.id,
      key: record.key,
      class_name: record.class_name,
      command: record.command,
      schedule: record.schedule,
      human_schedule: task.human_schedule,
      queue_name: record.queue_name,
      priority: record.priority,
      description: record.description,
      enabled: record.enabled,
      static: record.static,
      next_run_at: task.next_time,
      last_run_at: last_exec&.run_at,
      created_at: record.created_at,
      updated_at: record.updated_at
    }
  end
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error fetching recurring tasks: #{e.class}: #{e.message}" }
  []
end

#recurring_tasks_countObject



527
528
529
530
531
532
# File 'lib/pgbus/web/data_source.rb', line 527

def recurring_tasks_count
  RecurringTask.count
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error counting recurring tasks: #{e.message}" }
  0
end

#registered_subscribersObject

Subscriber registry. ‘queue_name` is the logical name the subscriber registered with; `physical_queue_name` is what the queue is actually called in `pgmq.meta` (e.g. logical “task_completion_handler” -> physical “pgbus_task_completion_handler”). The dashboard needs the physical name to match against pending messages / target queues.



851
852
853
854
855
856
857
858
859
860
861
862
863
# File 'lib/pgbus/web/data_source.rb', line 851

def registered_subscribers
  EventBus::Registry.instance.subscribers.map do |s|
    {
      pattern: s.pattern,
      handler_class: s.handler_class.name,
      queue_name: s.queue_name,
      physical_queue_name: @client.config.queue_name(s.queue_name)
    }
  end
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching subscribers: #{e.message}" }
  []
end

#replay_event(event) ⇒ Object



401
402
403
404
405
406
407
408
409
410
411
# File 'lib/pgbus/web/data_source.rb', line 401

def replay_event(event)
  # Re-publish the event payload to all matching subscribers
  routing_key = event["routing_key"] || event["handler_class"]
  return false unless routing_key

  @client.publish_to_topic(routing_key, event["payload"] || "{}")
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error replaying event: #{e.message}" }
  false
end

#reroute_event(source_queue, msg_id, target_queue) ⇒ Object

Reroute an event from one handler queue to another. Wrapped in a PGMQ transaction so produce on the target and delete on the source are atomic.



818
819
820
821
822
823
824
825
826
827
828
829
830
# File 'lib/pgbus/web/data_source.rb', line 818

def reroute_event(source_queue, msg_id, target_queue)
  detail = job_detail(source_queue, msg_id)
  return false unless detail

  @client.transaction do |txn|
    txn.produce(target_queue, detail[:message], headers: detail[:headers])
    txn.delete(source_queue, msg_id.to_i)
  end
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error rerouting event #{msg_id}: #{e.message}" }
  false
end

#resume_queue(name) ⇒ Object



83
84
85
86
87
# File 'lib/pgbus/web/data_source.rb', line 83

def resume_queue(name)
  QueueState.resume!(logical_queue_name(name))
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error resuming queue #{name}: #{e.message}" }
end

#retry_all_dlqObject



329
330
331
332
333
334
335
336
337
338
339
# File 'lib/pgbus/web/data_source.rb', line 329

def retry_all_dlq
  messages = dlq_messages(page: 1, per_page: 1000)
  count = 0
  messages.each do |m|
    retry_dlq_message(m[:queue_name], m[:msg_id]) && count += 1
  rescue StandardError => e
    Pgbus.logger.debug { "[Pgbus::Web] Error retrying DLQ message #{m[:msg_id]}: #{e.message}" }
    next
  end
  count
end

#retry_all_failedObject



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/pgbus/web/data_source.rb', line 228

def retry_all_failed
  count = 0
  loop do
    batch = connection.select_all(
      "SELECT * FROM pgbus_failed_events ORDER BY id LIMIT 100", "Pgbus Retry Batch"
    ).to_a
    break if batch.empty?

    batch.each do |event|
      payload = JSON.parse(event["payload"])
      headers = event["headers"]
      headers = JSON.parse(headers) if headers.is_a?(String)

      connection.transaction do
        @client.send_message(event["queue_name"], payload, headers: headers)
        connection.exec_delete(
          "DELETE FROM pgbus_failed_events WHERE id = $1", "Pgbus Delete Failed Event", [event["id"].to_i]
        )
      end
      count += 1
    rescue StandardError => e
      Pgbus.logger.error { "[Pgbus::Web] Failed to retry event #{event["id"]}: #{e.message}" }
    end
  end
  count
end

#retry_dlq_message(queue_name, msg_id) ⇒ Object



297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
# File 'lib/pgbus/web/data_source.rb', line 297

def retry_dlq_message(queue_name, msg_id)
  # queue_name here is the full DLQ name (already prefixed)
  dlq_suffix = Pgbus::DEAD_LETTER_SUFFIX
  original_queue = queue_name.delete_suffix(dlq_suffix)

  row = connection.select_one(
    "SELECT * FROM pgmq.q_#{sanitize_name(queue_name)} WHERE msg_id = $1",
    "Pgbus DLQ Read",
    [msg_id.to_i]
  )
  return false unless row

  @client.transaction do |txn|
    txn.produce(original_queue, row["message"], headers: row["headers"])
    txn.delete(queue_name, msg_id.to_i)
  end
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error retrying DLQ message #{msg_id}: #{e.message}" }
  false
end

#retry_failed_event(id) ⇒ Object



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/pgbus/web/data_source.rb', line 184

def retry_failed_event(id)
  event = failed_event(id)
  return false unless event

  # Prefer resetting the existing message's visibility timeout to 0
  # so the worker picks it up immediately. This avoids creating a
  # duplicate (the original is still in the queue waiting for retry).
  # Falls back to enqueueing a fresh copy only if the original is gone
  # (e.g., already moved to DLQ).
  msg_id = event["msg_id"]
  if msg_id && @client.message_exists?(event["queue_name"], msg_id: msg_id.to_i)
    @client.set_visibility_timeout(event["queue_name"], msg_id.to_i, vt: 0)
  else
    payload = JSON.parse(event["payload"])
    headers = event["headers"]
    headers = JSON.parse(headers) if headers.is_a?(String)
    @client.send_message(event["queue_name"], payload, headers: headers)
  end

  connection.exec_delete(
    "DELETE FROM pgbus_failed_events WHERE id = $1", "Pgbus Delete Failed Event", [id.to_i]
  )
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error retrying failed event #{id}: #{e.message}" }
  false
end

#retry_job(queue_name, msg_id) ⇒ Object



121
122
123
# File 'lib/pgbus/web/data_source.rb', line 121

def retry_job(queue_name, msg_id)
  @client.set_visibility_timeout(queue_name, msg_id.to_i, vt: 0, prefixed: false)
end

#slowest_job_classes(limit: 10, minutes: 60) ⇒ Object



616
617
618
619
620
621
# File 'lib/pgbus/web/data_source.rb', line 616

def slowest_job_classes(limit: 10, minutes: 60)
  JobStat.slowest_classes(limit: limit, minutes: minutes)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching slowest classes: #{e.message}" }
  []
end

#stream_stats_available?Boolean

Stream stats — only populated when streams_stats_enabled is true AND the migration has been run. Controllers should gate rendering on ‘stream_stats_available?` to avoid showing empty sections.

Returns:

  • (Boolean)


685
686
687
688
689
690
# File 'lib/pgbus/web/data_source.rb', line 685

def stream_stats_available?
  Pgbus.configuration.streams_stats_enabled && StreamStat.table_exists?
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error checking stream stats availability: #{e.message}" }
  false
end

#stream_stats_summary(minutes: 60) ⇒ Object



692
693
694
695
696
697
698
699
700
701
# File 'lib/pgbus/web/data_source.rb', line 692

def stream_stats_summary(minutes: 60)
  StreamStat.summary(minutes: minutes)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching stream stats summary: #{e.message}" }
  {
    broadcasts: 0, connects: 0, disconnects: 0,
    active_estimate: 0, avg_fanout: 0,
    avg_broadcast_ms: 0, avg_connect_ms: 0
  }
end

#summary_statsObject

Dashboard summary



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/pgbus/web/data_source.rb', line 15

def summary_stats
  queues = queues_with_metrics
  total_depth = queues.sum { |q| q[:queue_length] }
  total_visible = queues.sum { |q| q[:queue_visible_length] }
  dlq_suffix = Pgbus::DEAD_LETTER_SUFFIX
  dlq_depth = queues.select { |q| q[:name].end_with?(dlq_suffix) }.sum { |q| q[:queue_length] }

  throughput = compute_throughput(queues)

  health = queue_health_stats

  {
    total_queues: queues.size,
    total_depth: total_depth,
    total_visible: total_visible,
    active_processes: processes.count,
    failed_count: failed_events_count,
    dlq_depth: dlq_depth,
    recurring_count: recurring_tasks_count,
    throughput_rate: throughput,
    total_dead_tuples: health[:total_dead_tuples],
    tables_needing_vacuum: health[:tables_needing_vacuum],
    oldest_transaction_age_sec: health[:oldest_transaction_age_sec]
  }
end

#toggle_recurring_task(id) ⇒ Object



496
497
498
499
500
501
502
503
504
505
# File 'lib/pgbus/web/data_source.rb', line 496

def toggle_recurring_task(id)
  record = RecurringTask.find_by(id: id)
  return nil unless record

  record.update!(enabled: !record.enabled)
  record.enabled ? :enabled : :disabled
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error toggling recurring task #{id}: #{e.message}" }
  nil
end

#top_streams(limit: 10, minutes: 60) ⇒ Object



703
704
705
706
707
708
# File 'lib/pgbus/web/data_source.rb', line 703

def top_streams(limit: 10, minutes: 60)
  StreamStat.top_streams(limit: limit, minutes: minutes)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching top streams: #{e.message}" }
  []
end