Class: Pgbus::Web::DataSource

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/web/data_source.rb

Instance Method Summary collapse

Constructor Details

#initialize(client: Pgbus.client) ⇒ DataSource

Returns a new instance of DataSource.



8
9
10
11
12
# File 'lib/pgbus/web/data_source.rb', line 8

def initialize(client: Pgbus.client)
  @client = client
  @last_throughput_snapshot = nil
  @last_throughput_at = nil
end

Instance Method Details

#active_batches_countObject



644
645
646
647
648
649
# File 'lib/pgbus/web/data_source.rb', line 644

def active_batches_count
  BatchEntry.where.not(status: "finished").count
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error counting active batches: #{e.message}" }
  0
end

#batch_detail(batch_id) ⇒ Object



622
623
624
625
626
627
628
629
630
631
632
633
634
635
# File 'lib/pgbus/web/data_source.rb', line 622

def batch_detail(batch_id)
  record = BatchEntry.find_by(batch_id: batch_id)
  return nil unless record

  format_batch(record).merge(
    properties: record.properties,
    on_finish_class: record.on_finish_class,
    on_success_class: record.on_success_class,
    on_discard_class: record.on_discard_class
  )
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching batch #{batch_id}: #{e.message}" }
  nil
end

#batches(limit: 100) ⇒ Object

Batches



615
616
617
618
619
620
# File 'lib/pgbus/web/data_source.rb', line 615

def batches(limit: 100)
  BatchEntry.order(created_at: :desc).limit(limit).map { |r| format_batch(r) }
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching batches: #{e.message}" }
  []
end

#batches_countObject



637
638
639
640
641
642
# File 'lib/pgbus/web/data_source.rb', line 637

def batches_count
  BatchEntry.count
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error counting batches: #{e.message}" }
  0
end

#discard_all_dlqObject



361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
# File 'lib/pgbus/web/data_source.rb', line 361

def discard_all_dlq
  messages = dlq_messages(page: 1, per_page: 1000)
  return 0 if messages.empty?

  release_locks_for_messages(messages)

  # Group by queue for batch delete — one call per DLQ instead of N calls
  messages.group_by { |m| m[:queue_name] }.sum do |queue_name, msgs|
    ids = msgs.map { |m| m[:msg_id].to_i }
    @client.delete_batch(queue_name, ids, prefixed: false).size
  rescue StandardError => e
    Pgbus.logger.debug { "[Pgbus::Web] Error batch-discarding DLQ messages from #{queue_name}: #{e.message}" }
    0
  end
end

#discard_all_enqueuedObject



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/pgbus/web/data_source.rb', line 140

def discard_all_enqueued
  dlq_suffix = Pgbus::DEAD_LETTER_SUFFIX
  queues = queues_with_metrics.reject { |q| q[:name].end_with?(dlq_suffix) }
  total = 0

  queues.each do |q|
    messages = query_queue_messages_raw(q[:name], 10_000, 0)
    next if messages.empty?

    release_locks_for_messages(messages)

    ids = messages.map { |m| m[:msg_id].to_i }
    @client.archive_batch(q[:name], ids, prefixed: false)
    total += ids.size
  rescue StandardError => e
    Pgbus.logger.debug { "[Pgbus::Web] Error discarding enqueued messages from #{q[:name]}: #{e.message}" }
  end

  total
end

#discard_all_failedObject



265
266
267
268
269
270
271
272
273
274
# File 'lib/pgbus/web/data_source.rb', line 265

def discard_all_failed
  release_locks_for_failed_events
  archive_all_failed_messages

  result = connection.execute("DELETE FROM pgbus_failed_events")
  result.cmd_tuples
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error discarding all failed events: #{e.message}" }
  0
end

#discard_all_locksObject



591
592
593
594
595
596
# File 'lib/pgbus/web/data_source.rb', line 591

def discard_all_locks
  UniquenessKey.delete_all
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error discarding all locks: #{e.message}" }
  0
end

#discard_dlq_message(queue_name, msg_id) ⇒ Object



339
340
341
342
343
344
345
346
347
# File 'lib/pgbus/web/data_source.rb', line 339

def discard_dlq_message(queue_name, msg_id)
  # queue_name here is the full DLQ name (already prefixed)
  release_lock_for_message(queue_name, msg_id)
  @client.delete_message(queue_name, msg_id.to_i, prefixed: false)
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error discarding DLQ message #{msg_id}: #{e.message}" }
  false
end

#discard_event(queue_name, msg_id) ⇒ Object

Discard (archive) an event message from a handler queue.



819
820
821
822
823
824
825
826
# File 'lib/pgbus/web/data_source.rb', line 819

def discard_event(queue_name, msg_id)
  release_lock_for_message(queue_name, msg_id)
  @client.archive_message(queue_name, msg_id.to_i, prefixed: false)
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error discarding event #{msg_id}: #{e.message}" }
  false
end

#discard_failed_event(id) ⇒ Object



222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/pgbus/web/data_source.rb', line 222

def discard_failed_event(id)
  event = failed_event(id)
  if event
    release_lock_for_payload(event["payload"])
    archive_failed_message(event)
  end

  connection.exec_delete(
    "DELETE FROM pgbus_failed_events WHERE id = $1", "Pgbus Delete Failed Event", [id.to_i]
  )
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error discarding failed event #{id}: #{e.message}" }
  false
end

#discard_job(queue_name, msg_id) ⇒ Object



135
136
137
138
# File 'lib/pgbus/web/data_source.rb', line 135

def discard_job(queue_name, msg_id)
  release_lock_for_message(queue_name, msg_id)
  @client.archive_message(queue_name, msg_id.to_i, prefixed: false)
end

#discard_lock(lock_key) ⇒ Object

Lock management



575
576
577
578
579
580
# File 'lib/pgbus/web/data_source.rb', line 575

def discard_lock(lock_key)
  UniquenessKey.where(lock_key: lock_key).delete_all
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error discarding lock #{lock_key}: #{e.message}" }
  0
end

#discard_locks(lock_keys) ⇒ Object



582
583
584
585
586
587
588
589
# File 'lib/pgbus/web/data_source.rb', line 582

def discard_locks(lock_keys)
  return 0 if lock_keys.empty?

  UniquenessKey.where(lock_key: lock_keys).delete_all
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error discarding locks: #{e.message}" }
  0
end

#discard_selected_events(selections) ⇒ Object

Bulk discard selected events from handler queues.



903
904
905
906
907
908
909
910
911
912
913
914
# File 'lib/pgbus/web/data_source.rb', line 903

def discard_selected_events(selections)
  return 0 if selections.empty?

  count = 0
  selections.each do |sel|
    discard_event(sel[:queue_name], sel[:msg_id]) && count += 1
  rescue StandardError => e
    Pgbus.logger.debug { "[Pgbus::Web] Error in bulk discard for #{sel[:msg_id]}: #{e.message}" }
    next
  end
  count
end

#dlq_message_detail(msg_id) ⇒ Object



300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/pgbus/web/data_source.rb', line 300

def dlq_message_detail(msg_id)
  dlq_suffix = Pgbus::DEAD_LETTER_SUFFIX
  queues = queues_with_metrics.select { |q| q[:name].end_with?(dlq_suffix) }
  queues.each do |q|
    row = connection.select_one(
      "SELECT * FROM pgmq.q_#{sanitize_name(q[:name])} WHERE msg_id = $1",
      "Pgbus DLQ Detail",
      [msg_id.to_i]
    )
    return format_message(row, q[:name]) if row
  end
  nil
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching DLQ message #{msg_id}: #{e.message}" }
  nil
end

#dlq_messages(page: 1, per_page: 25) ⇒ Object

Dead letter queue Note: DLQ queue names from queues_with_metrics are already fully qualified (e.g., "pgbus_default_dlq"), so we use them directly without re-prefixing.



279
280
281
282
283
284
285
286
287
288
# File 'lib/pgbus/web/data_source.rb', line 279

def dlq_messages(page: 1, per_page: 25)
  dlq_suffix = Pgbus::DEAD_LETTER_SUFFIX
  queues = queues_with_metrics.select { |q| q[:name].end_with?(dlq_suffix) }
  offset = (page - 1) * per_page

  paginated_queue_messages(queues.map { |q| q[:name] }, per_page, offset)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching DLQ messages: #{e.message}" }
  []
end

#dlq_total_countObject



290
291
292
293
294
295
296
297
298
# File 'lib/pgbus/web/data_source.rb', line 290

def dlq_total_count
  dlq_suffix = Pgbus::DEAD_LETTER_SUFFIX
  queues_with_metrics
    .select { |q| q[:name].end_with?(dlq_suffix) }
    .sum { |q| q[:queue_length] }
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching DLQ count: #{e.message}" }
  0
end

#drop_queue(name) ⇒ Object



82
83
84
85
# File 'lib/pgbus/web/data_source.rb', line 82

def drop_queue(name)
  release_uniqueness_keys_for_queue(name)
  @client.drop_queue(name, prefixed: false)
end

#edit_event_payload(queue_name, msg_id, new_payload_json) ⇒ Object

Edit the payload of a stuck event: delete old message and re-enqueue with the corrected payload in the same queue. The produce + delete are wrapped in a PGMQ transaction so the message can't be lost if either half fails (same pattern as retry_dlq_message).



865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
# File 'lib/pgbus/web/data_source.rb', line 865

def edit_event_payload(queue_name, msg_id, new_payload_json)
  begin
    parsed = JSON.parse(new_payload_json)
  rescue JSON::ParserError
    return false
  end

  detail = job_detail(queue_name, msg_id)
  return false unless detail

  @client.transaction do |txn|
    txn.produce(queue_name, parsed.to_json, headers: detail[:headers])
    txn.delete(queue_name, msg_id.to_i)
  end
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error editing event #{msg_id}: #{e.message}" }
  false
end

#enqueue_recurring_task_now(id) ⇒ Object



527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
# File 'lib/pgbus/web/data_source.rb', line 527

def enqueue_recurring_task_now(id)
  record = RecurringTask.find_by(id: id)
  return false unless record

  task = Recurring::Task.from_configuration(record.key,
                                            class: record.class_name,
                                            command: record.command,
                                            schedule: record.schedule,
                                            queue: record.queue_name,
                                            args: parse_arguments(record.arguments),
                                            priority: record.priority)

  schedule = Recurring::Schedule.new(config: Pgbus.configuration)
  schedule.enqueue_task(task, run_at: Time.now.utc)
  true
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error enqueuing recurring task #{id}: #{e.message}" }
  false
end

#failed_event(id) ⇒ Object



183
184
185
186
187
188
189
190
191
192
# File 'lib/pgbus/web/data_source.rb', line 183

def failed_event(id)
  connection.select_one(
    "SELECT * FROM pgbus_failed_events WHERE id = $1",
    "Pgbus Failed Event",
    [id.to_i]
  )
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching failed event #{id}: #{e.message}" }
  nil
end

#failed_events(page: 1, per_page: 25) ⇒ Object

Failed events



162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/pgbus/web/data_source.rb', line 162

def failed_events(page: 1, per_page: 25)
  offset = (page - 1) * per_page
  rows = connection.select_all(
    "SELECT * FROM pgbus_failed_events ORDER BY failed_at DESC LIMIT $1 OFFSET $2",
    "Pgbus Failed Events",
    [per_page, offset]
  )
  rows.to_a
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching failed events: #{e.message}" }
  []
end

#failed_events_countObject



175
176
177
178
179
180
181
# File 'lib/pgbus/web/data_source.rb', line 175

def failed_events_count
  result = connection.select_value("SELECT COUNT(*) FROM pgbus_failed_events")
  result.to_i
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error counting failed events: #{e.message}" }
  0
end

#handler_class_for_queue(physical_queue_name) ⇒ Object

Find the handler class registered for a given physical queue name. Returns nil if no subscriber matches — used to reject forged handler values in mark_event_handled / reroute_event.



813
814
815
816
# File 'lib/pgbus/web/data_source.rb', line 813

def handler_class_for_queue(physical_queue_name)
  sub = registered_subscribers.find { |s| s[:physical_queue_name] == physical_queue_name }
  sub && sub[:handler_class]
end

#handler_queue_physical_namesObject

Physical queue names for all registered subscribers. Used for both pending_events lookup and server-side validation of target queues in reroute_event.



806
807
808
# File 'lib/pgbus/web/data_source.rb', line 806

def handler_queue_physical_names
  registered_subscribers.map { |s| s[:physical_queue_name] }.uniq
end

#job_detail(queue_name, msg_id) ⇒ Object



119
120
121
122
123
124
125
126
127
128
129
# File 'lib/pgbus/web/data_source.rb', line 119

def job_detail(queue_name, msg_id)
  row = connection.select_one(
    "SELECT * FROM pgmq.q_#{sanitize_name(queue_name)} WHERE msg_id = $1",
    "Pgbus Job Detail",
    [msg_id.to_i]
  )
  row ? format_message(row, queue_name) : nil
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching job detail: #{e.message}" }
  nil
end

#job_locksObject

Job uniqueness keys



599
600
601
602
603
604
605
606
607
608
609
610
611
612
# File 'lib/pgbus/web/data_source.rb', line 599

def job_locks
  UniquenessKey.order(created_at: :desc).limit(100).map do |key|
    {
      lock_key: key.lock_key,
      queue_name: key.queue_name,
      msg_id: key.msg_id,
      created_at: key.created_at,
      age_seconds: key.created_at ? (Time.current - key.created_at).to_i : nil
    }
  end
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching uniqueness keys: #{e.message}" }
  []
end

#job_stats_summary(minutes: 60) ⇒ Object

Job stats



652
653
654
655
656
657
# File 'lib/pgbus/web/data_source.rb', line 652

def job_stats_summary(minutes: 60)
  JobStat.summary(minutes: minutes)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching job stats summary: #{e.message}" }
  { total: 0, success: 0, failed: 0, dead_lettered: 0, avg_duration_ms: 0, max_duration_ms: 0 }
end

#job_status_counts(minutes: 60) ⇒ Object



666
667
668
669
670
671
# File 'lib/pgbus/web/data_source.rb', line 666

def job_status_counts(minutes: 60)
  JobStat.status_counts(minutes: minutes)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching status counts: #{e.message}" }
  {}
end

#job_throughput(minutes: 60) ⇒ Object



659
660
661
662
663
664
# File 'lib/pgbus/web/data_source.rb', line 659

def job_throughput(minutes: 60)
  JobStat.throughput(minutes: minutes).map { |time, count| { time: time, count: count } }
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching throughput: #{e.message}" }
  []
end

#jobs(queue_name: nil, page: 1, per_page: 25) ⇒ Object

Jobs (messages in queue tables)



106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/pgbus/web/data_source.rb', line 106

def jobs(queue_name: nil, page: 1, per_page: 25)
  offset = (page - 1) * per_page

  if queue_name
    query_queue_messages(queue_name, per_page, offset)
  else
    all_queue_messages(per_page, offset)
  end
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error reading jobs: #{e.message}" }
  []
end

#latency_by_queue(minutes: 60) ⇒ Object



687
688
689
690
691
692
# File 'lib/pgbus/web/data_source.rb', line 687

def latency_by_queue(minutes: 60)
  JobStat.avg_latency_by_queue(minutes: minutes)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching latency by queue: #{e.message}" }
  []
end

#latency_trend(minutes: 60) ⇒ Object



680
681
682
683
684
685
# File 'lib/pgbus/web/data_source.rb', line 680

def latency_trend(minutes: 60)
  JobStat.latency_trend(minutes: minutes)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching latency trend: #{e.message}" }
  []
end

#live_stream_metricsObject

Stream stats — only populated when streams_stats_enabled is true AND the migration has been run. Controllers should gate rendering on stream_stats_available? to avoid showing empty sections.



742
743
744
745
746
747
748
749
750
751
752
753
# File 'lib/pgbus/web/data_source.rb', line 742

def live_stream_metrics
  counter = Pgbus::Web::Streamer.stream_counter
  unless counter
    empty_totals = { broadcasts: 0, active_connections: 0, total_connections: 0, streams: 0 }
    return { streams: {}, totals: empty_totals }
  end

  { streams: counter.snapshot, totals: counter.totals }
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching live stream metrics: #{e.message}" }
  { streams: {}, totals: { broadcasts: 0, active_connections: 0, total_connections: 0, streams: 0 } }
end

#mark_event_handled(queue_name, msg_id, handler_class) ⇒ Object

Mark an event as handled: archive the queue message and insert a ProcessedEvent record so it won't be reprocessed on replay.

The insert is performed BEFORE archive. If the archive step fails afterwards the operator can retry — replay protection is already in place and the idempotency dedup will cause the handler to skip the event even if it is eventually re-read from the queue. Doing it the other way around would risk losing the message without recording the marker.



837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
# File 'lib/pgbus/web/data_source.rb', line 837

def mark_event_handled(queue_name, msg_id, handler_class)
  detail = job_detail(queue_name, msg_id)
  return false unless detail

  raw = JSON.parse(detail[:message])
  event_id = raw["event_id"]
  return false unless event_id

  ProcessedEvent.insert(
    { event_id: event_id, handler_class: handler_class, processed_at: Time.now.utc },
    unique_by: %i[event_id handler_class]
  )
  # Release the uniqueness lock while we still hold the payload in
  # memory — otherwise the message is archived but the lock row stays
  # behind, blocking later publishes with the same key. Mirrors
  # discard_event.
  release_lock_for_payload(detail[:message])
  @client.archive_message(queue_name, msg_id.to_i, prefixed: false)
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error marking event #{msg_id} handled: #{e.message}" }
  false
end

#notify_throttlesObject

NOTIFY throttle status for all queues with notifications enabled. Returns an array of hashes: { queue_name:, throttle_interval_ms:, last_notified_at: }



937
938
939
940
941
942
943
944
945
946
947
948
# File 'lib/pgbus/web/data_source.rb', line 937

def notify_throttles
  @client.list_notify_insert_throttles.map do |throttle|
    {
      queue_name: throttle.queue_name,
      throttle_interval_ms: throttle.throttle_interval_ms,
      last_notified_at: throttle.last_notified_at
    }
  end
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching notify throttles: #{e.message}" }
  []
end

#outbox_entries(page: 1, per_page: 25) ⇒ Object



566
567
568
569
570
571
572
# File 'lib/pgbus/web/data_source.rb', line 566

def outbox_entries(page: 1, per_page: 25)
  offset = (page - 1) * per_page
  OutboxEntry.order(id: :desc).limit(per_page).offset(offset).to_a
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching outbox entries: #{e.message}" }
  []
end

#outbox_statsObject

Outbox



555
556
557
558
559
560
561
562
563
564
# File 'lib/pgbus/web/data_source.rb', line 555

def outbox_stats
  {
    unpublished: OutboxEntry.unpublished.count,
    total: OutboxEntry.count,
    oldest_unpublished_age: oldest_unpublished_age
  }
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching outbox stats: #{e.message}" }
  { unpublished: 0, total: 0, oldest_unpublished_age: nil }
end

#pause_queue(name, reason: nil) ⇒ Object



87
88
89
90
91
# File 'lib/pgbus/web/data_source.rb', line 87

def pause_queue(name, reason: nil)
  QueueState.pause!(logical_queue_name(name), reason: reason)
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error pausing queue #{name}: #{e.message}" }
end

#pending_events(page: 1, per_page: 25) ⇒ Object

Pending events — messages sitting in handler queues that haven't been processed. Identifies handler queues via the subscriber registry and queries them for unprocessed messages. Subscriber queue names are logical (e.g. "task_completion_handler"), while pgmq.meta.queue_name stores physical names (e.g. "pgbus_task_completion_handler"), so we normalize through config.queue_name before intersecting.



786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
# File 'lib/pgbus/web/data_source.rb', line 786

def pending_events(page: 1, per_page: 25)
  handler_queues = handler_queue_physical_names
  return [] if handler_queues.empty?

  existing = connection.select_values(
    "SELECT queue_name FROM pgmq.meta ORDER BY queue_name", "Pgbus Queue Names"
  )
  target_queues = handler_queues & existing
  return [] if target_queues.empty?

  offset = (page - 1) * per_page
  paginated_queue_messages(target_queues, per_page, offset)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching pending events: #{e.message}" }
  []
end

#processed_event(id) ⇒ Object



402
403
404
405
406
407
408
409
410
411
# File 'lib/pgbus/web/data_source.rb', line 402

def processed_event(id)
  connection.select_one(
    "SELECT * FROM pgbus_processed_events WHERE id = $1",
    "Pgbus Processed Event",
    [id.to_i]
  )
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching processed event #{id}: #{e.message}" }
  nil
end

#processed_events(page: 1, per_page: 25) ⇒ Object

Processed events (audit trail)



389
390
391
392
393
394
395
396
397
398
399
400
# File 'lib/pgbus/web/data_source.rb', line 389

def processed_events(page: 1, per_page: 25)
  offset = (page - 1) * per_page
  rows = connection.select_all(
    "SELECT * FROM pgbus_processed_events ORDER BY processed_at DESC LIMIT $1 OFFSET $2",
    "Pgbus Processed Events",
    [per_page, offset]
  )
  rows.to_a
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching processed events: #{e.message}" }
  []
end

#processed_events_countObject



413
414
415
416
417
418
419
# File 'lib/pgbus/web/data_source.rb', line 413

def processed_events_count
  result = connection.select_value("SELECT COUNT(*) FROM pgbus_processed_events")
  result.to_i
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error counting processed events: #{e.message}" }
  0
end

#processesObject

Processes



378
379
380
381
382
383
384
385
386
# File 'lib/pgbus/web/data_source.rb', line 378

def processes
  rows = connection.select_all(
    "SELECT * FROM pgbus_processes ORDER BY kind, created_at"
  )
  rows.to_a.map { |r| format_process(r) }
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching processes: #{e.message}" }
  []
end

#purge_queue(name) ⇒ Object



77
78
79
80
# File 'lib/pgbus/web/data_source.rb', line 77

def purge_queue(name)
  release_uniqueness_keys_for_queue(name)
  @client.purge_queue(name, prefixed: false)
end

#queue_detail(name) ⇒ Object

name is the full PGMQ queue name (e.g. "pgbus_default") as returned by queues_with_metrics. No prefix is added.



70
71
72
73
74
75
# File 'lib/pgbus/web/data_source.rb', line 70

def queue_detail(name)
  queue_metrics_via_sql(name)
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error fetching queue detail for #{name}: #{e.class}: #{e.message}" }
  nil
end

#queue_group_heads(queue_name, qty: 20) ⇒ Object

FIFO group head sampling for a specific queue. Returns the oldest visible message from each distinct group (up to qty). Useful for detecting head-of-line stalls in multi-tenant queues.



953
954
955
956
957
958
959
960
# File 'lib/pgbus/web/data_source.rb', line 953

def queue_group_heads(queue_name, qty: 20)
  logical = logical_queue_name(queue_name)
  messages = @client.read_grouped_head(logical, qty: qty) || []
  messages.map { |m| format_pgmq_message(m, queue_name) }
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching group heads for #{queue_name}: #{e.message}" }
  []
end

#queue_health_detail(queue_name) ⇒ Object

Per-queue health stats for the queue detail view.



725
726
727
728
729
730
731
732
733
734
735
736
# File 'lib/pgbus/web/data_source.rb', line 725

def queue_health_detail(queue_name)
  sanitized = sanitize_name(queue_name)
  tables = [
    fetch_table_stats("pgmq", "q_#{sanitized}", "queue"),
    fetch_table_stats("pgmq", "a_#{sanitized}", "archive")
  ].compact

  { tables: tables, oldest_transaction_age_sec: oldest_transaction_age }
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching health detail for #{queue_name}: #{e.message}" }
  { tables: [], oldest_transaction_age_sec: nil }
end

#queue_health_statsObject

Queue health — vacuum stats, dead tuples, bloat, MVCC horizon. Returns aggregate health across all queue and archive tables, plus the oldest open transaction age (MVCC horizon pinning risk).



697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
# File 'lib/pgbus/web/data_source.rb', line 697

def queue_health_stats
  tables = fetch_all_table_stats

  total_dead = tables.sum { |t| t[:dead_tuples] }
  total_live = tables.sum { |t| t[:live_tuples] }
  worst_bloat = tables.map { |t| t[:bloat_ratio] }.max || 0.0
  needs_vacuum = tables.count { |t| t[:bloat_ratio] > 0.1 }
  oldest_vacuum = tables.filter_map { |t| t[:last_vacuum_ago_sec] }.max

  {
    total_dead_tuples: total_dead,
    total_live_tuples: total_live,
    worst_bloat_ratio: worst_bloat.round(4),
    tables_needing_vacuum: needs_vacuum,
    oldest_vacuum_ago_sec: oldest_vacuum,
    oldest_transaction_age_sec: oldest_transaction_age,
    tables: tables
  }
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching queue health stats: #{e.class}: #{e.message}" }
  {
    total_dead_tuples: 0, total_live_tuples: 0, worst_bloat_ratio: 0.0,
    tables_needing_vacuum: 0, oldest_vacuum_ago_sec: nil,
    oldest_transaction_age_sec: nil, tables: []
  }
end

#queue_paused?(name) ⇒ Boolean

Returns:

  • (Boolean)


99
100
101
102
103
# File 'lib/pgbus/web/data_source.rb', line 99

def queue_paused?(name)
  QueueState.paused?(logical_queue_name(name))
rescue StandardError
  false
end

#queues_with_metricsObject

Queues — query via ActiveRecord for reliability in web processes (avoids PGMQ client connection issues when the web server uses a different connection lifecycle than the worker processes). Memoized for the lifetime of this data-source instance (one per web request — see ApplicationController#data_source). A single page can ask for queue metrics more than once (e.g. the DLQ page reads both the message rows and the total count); without the memo each call re-runs the meta query + batched metrics. Mutations redirect to a fresh request with a new instance, so a per-request memo never serves stale data.



50
51
52
# File 'lib/pgbus/web/data_source.rb', line 50

def queues_with_metrics
  @queues_with_metrics ||= fetch_queues_with_metrics
end

#recurring_task(id) ⇒ Object



476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
# File 'lib/pgbus/web/data_source.rb', line 476

def recurring_task(id)
  record = RecurringTask.find_by(id: id)
  return nil unless record

  task = Recurring::Task.from_configuration(record.key,
                                            class: record.class_name,
                                            command: record.command,
                                            schedule: record.schedule,
                                            queue: record.queue_name,
                                            args: parse_arguments(record.arguments),
                                            priority: record.priority,
                                            description: record.description)

  executions = RecurringExecution.for_task(record.key).recent(25).map do |exec|
    { run_at: exec.run_at, created_at: exec.created_at }
  end

  {
    id: record.id,
    key: record.key,
    class_name: record.class_name,
    command: record.command,
    schedule: record.schedule,
    human_schedule: task.human_schedule,
    queue_name: record.queue_name,
    arguments: parse_arguments(record.arguments),
    priority: record.priority,
    description: record.description,
    enabled: record.enabled,
    static: record.static,
    next_run_at: task.next_time,
    executions: executions,
    created_at: record.created_at,
    updated_at: record.updated_at
  }
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error fetching recurring task #{id}: #{e.class}: #{e.message}" }
  nil
end

#recurring_tasksObject

Recurring tasks



434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
# File 'lib/pgbus/web/data_source.rb', line 434

def recurring_tasks
  records = RecurringTask.order(:key).to_a
  last_runs = RecurringExecution
              .where(task_key: records.map(&:key))
              .select("task_key, MAX(run_at) AS run_at")
              .group(:task_key)
              .index_by(&:task_key)

  records.map do |record|
    last_exec = last_runs[record.key]
    task = Recurring::Task.from_configuration(record.key,
                                              class: record.class_name,
                                              command: record.command,
                                              schedule: record.schedule,
                                              queue: record.queue_name,
                                              args: parse_arguments(record.arguments),
                                              priority: record.priority,
                                              description: record.description)

    {
      id: record.id,
      key: record.key,
      class_name: record.class_name,
      command: record.command,
      schedule: record.schedule,
      human_schedule: task.human_schedule,
      queue_name: record.queue_name,
      priority: record.priority,
      description: record.description,
      enabled: record.enabled,
      static: record.static,
      next_run_at: task.next_time,
      last_run_at: last_exec&.run_at,
      created_at: record.created_at,
      updated_at: record.updated_at
    }
  end
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error fetching recurring tasks: #{e.class}: #{e.message}" }
  []
end

#recurring_tasks_countObject



547
548
549
550
551
552
# File 'lib/pgbus/web/data_source.rb', line 547

def recurring_tasks_count
  RecurringTask.count
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error counting recurring tasks: #{e.message}" }
  0
end

#registered_subscribersObject

Subscriber registry. queue_name is the logical name the subscriber registered with; physical_queue_name is what the queue is actually called in pgmq.meta (e.g. logical "task_completion_handler" -> physical "pgbus_task_completion_handler"). The dashboard needs the physical name to match against pending messages / target queues.



921
922
923
924
925
926
927
928
929
930
931
932
933
# File 'lib/pgbus/web/data_source.rb', line 921

def registered_subscribers
  EventBus::Registry.instance.subscribers.map do |s|
    {
      pattern: s.pattern,
      handler_class: s.handler_class.name,
      queue_name: s.queue_name,
      physical_queue_name: @client.config.queue_name(s.queue_name)
    }
  end
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching subscribers: #{e.message}" }
  []
end

#replay_event(event) ⇒ Object



421
422
423
424
425
426
427
428
429
430
431
# File 'lib/pgbus/web/data_source.rb', line 421

def replay_event(event)
  # Re-publish the event payload to all matching subscribers
  routing_key = event["routing_key"] || event["handler_class"]
  return false unless routing_key

  @client.publish_to_topic(routing_key, event["payload"] || "{}")
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error replaying event: #{e.message}" }
  false
end

#reroute_event(source_queue, msg_id, target_queue) ⇒ Object

Reroute an event from one handler queue to another. Wrapped in a PGMQ transaction so produce on the target and delete on the source are atomic.



888
889
890
891
892
893
894
895
896
897
898
899
900
# File 'lib/pgbus/web/data_source.rb', line 888

def reroute_event(source_queue, msg_id, target_queue)
  detail = job_detail(source_queue, msg_id)
  return false unless detail

  @client.transaction do |txn|
    txn.produce(target_queue, detail[:message], headers: detail[:headers])
    txn.delete(source_queue, msg_id.to_i)
  end
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error rerouting event #{msg_id}: #{e.message}" }
  false
end

#resume_queue(name) ⇒ Object



93
94
95
96
97
# File 'lib/pgbus/web/data_source.rb', line 93

def resume_queue(name)
  QueueState.resume!(logical_queue_name(name))
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error resuming queue #{name}: #{e.message}" }
end

#retry_all_dlqObject



349
350
351
352
353
354
355
356
357
358
359
# File 'lib/pgbus/web/data_source.rb', line 349

def retry_all_dlq
  messages = dlq_messages(page: 1, per_page: 1000)
  count = 0
  messages.each do |m|
    retry_dlq_message(m[:queue_name], m[:msg_id]) && count += 1
  rescue StandardError => e
    Pgbus.logger.debug { "[Pgbus::Web] Error retrying DLQ message #{m[:msg_id]}: #{e.message}" }
    next
  end
  count
end

#retry_all_failedObject



238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/pgbus/web/data_source.rb', line 238

def retry_all_failed
  count = 0
  loop do
    batch = connection.select_all(
      "SELECT * FROM pgbus_failed_events ORDER BY id LIMIT 100", "Pgbus Retry Batch"
    ).to_a
    break if batch.empty?

    batch.each do |event|
      payload = JSON.parse(event["payload"])
      headers = event["headers"]
      headers = JSON.parse(headers) if headers.is_a?(String)

      connection.transaction do
        @client.send_message(event["queue_name"], payload, headers: headers)
        connection.exec_delete(
          "DELETE FROM pgbus_failed_events WHERE id = $1", "Pgbus Delete Failed Event", [event["id"].to_i]
        )
      end
      count += 1
    rescue StandardError => e
      Pgbus.logger.error { "[Pgbus::Web] Failed to retry event #{event["id"]}: #{e.message}" }
    end
  end
  count
end

#retry_dlq_message(queue_name, msg_id) ⇒ Object



317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/pgbus/web/data_source.rb', line 317

def retry_dlq_message(queue_name, msg_id)
  # queue_name here is the full DLQ name (already prefixed)
  dlq_suffix = Pgbus::DEAD_LETTER_SUFFIX
  original_queue = queue_name.delete_suffix(dlq_suffix)

  row = connection.select_one(
    "SELECT * FROM pgmq.q_#{sanitize_name(queue_name)} WHERE msg_id = $1",
    "Pgbus DLQ Read",
    [msg_id.to_i]
  )
  return false unless row

  @client.transaction do |txn|
    txn.produce(original_queue, row["message"], headers: row["headers"])
    txn.delete(queue_name, msg_id.to_i)
  end
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error retrying DLQ message #{msg_id}: #{e.message}" }
  false
end

#retry_failed_event(id) ⇒ Object



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/pgbus/web/data_source.rb', line 194

def retry_failed_event(id)
  event = failed_event(id)
  return false unless event

  # Prefer resetting the existing message's visibility timeout to 0
  # so the worker picks it up immediately. This avoids creating a
  # duplicate (the original is still in the queue waiting for retry).
  # Falls back to enqueueing a fresh copy only if the original is gone
  # (e.g., already moved to DLQ).
  msg_id = event["msg_id"]
  if msg_id && @client.message_exists?(event["queue_name"], msg_id: msg_id.to_i)
    @client.set_visibility_timeout(event["queue_name"], msg_id.to_i, vt: 0)
  else
    payload = JSON.parse(event["payload"])
    headers = event["headers"]
    headers = JSON.parse(headers) if headers.is_a?(String)
    @client.send_message(event["queue_name"], payload, headers: headers)
  end

  connection.exec_delete(
    "DELETE FROM pgbus_failed_events WHERE id = $1", "Pgbus Delete Failed Event", [id.to_i]
  )
  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error retrying failed event #{id}: #{e.message}" }
  false
end

#retry_job(queue_name, msg_id) ⇒ Object



131
132
133
# File 'lib/pgbus/web/data_source.rb', line 131

def retry_job(queue_name, msg_id)
  @client.set_visibility_timeout(queue_name, msg_id.to_i, vt: 0, prefixed: false)
end

#slowest_job_classes(limit: 10, minutes: 60) ⇒ Object



673
674
675
676
677
678
# File 'lib/pgbus/web/data_source.rb', line 673

def slowest_job_classes(limit: 10, minutes: 60)
  JobStat.slowest_classes(limit: limit, minutes: minutes)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching slowest classes: #{e.message}" }
  []
end

#stream_stats_available?Boolean

Returns:

  • (Boolean)


755
756
757
758
759
760
# File 'lib/pgbus/web/data_source.rb', line 755

def stream_stats_available?
  Pgbus.configuration.streams_stats_enabled && StreamStat.table_exists?
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error checking stream stats availability: #{e.message}" }
  false
end

#stream_stats_summary(minutes: 60) ⇒ Object



762
763
764
765
766
767
768
769
770
771
# File 'lib/pgbus/web/data_source.rb', line 762

def stream_stats_summary(minutes: 60)
  StreamStat.summary(minutes: minutes)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching stream stats summary: #{e.message}" }
  {
    broadcasts: 0, connects: 0, disconnects: 0,
    active_estimate: 0, avg_fanout: 0,
    avg_broadcast_ms: 0, avg_connect_ms: 0
  }
end

#summary_statsObject

Dashboard summary



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/pgbus/web/data_source.rb', line 15

def summary_stats
  queues = queues_with_metrics
  total_depth = queues.sum { |q| q[:queue_length] }
  total_visible = queues.sum { |q| q[:queue_visible_length] }
  dlq_suffix = Pgbus::DEAD_LETTER_SUFFIX
  dlq_depth = queues.select { |q| q[:name].end_with?(dlq_suffix) }.sum { |q| q[:queue_length] }

  throughput = compute_throughput(queues)

  health = queue_health_stats

  {
    total_queues: queues.size,
    total_depth: total_depth,
    total_visible: total_visible,
    active_processes: processes.count,
    failed_count: failed_events_count,
    dlq_depth: dlq_depth,
    recurring_count: recurring_tasks_count,
    throughput_rate: throughput,
    total_dead_tuples: health[:total_dead_tuples],
    tables_needing_vacuum: health[:tables_needing_vacuum],
    oldest_transaction_age_sec: health[:oldest_transaction_age_sec]
  }
end

#toggle_recurring_task(id) ⇒ Object



516
517
518
519
520
521
522
523
524
525
# File 'lib/pgbus/web/data_source.rb', line 516

def toggle_recurring_task(id)
  record = RecurringTask.find_by(id: id)
  return nil unless record

  record.update!(enabled: !record.enabled)
  record.enabled ? :enabled : :disabled
rescue StandardError => e
  Pgbus.logger.error { "[Pgbus::Web] Error toggling recurring task #{id}: #{e.message}" }
  nil
end

#top_streams(limit: 10, minutes: 60) ⇒ Object



773
774
775
776
777
778
# File 'lib/pgbus/web/data_source.rb', line 773

def top_streams(limit: 10, minutes: 60)
  StreamStat.top_streams(limit: limit, minutes: minutes)
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus::Web] Error fetching top streams: #{e.message}" }
  []
end