Class: JobWorkflow::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/job_workflow/queue.rb

Overview

Queue provides a unified interface for queue operations across different queue adapters.

Examples:

Pausing and resuming a queue

```ruby
JobWorkflow::Queue.pause(:import_workflow)
JobWorkflow::Queue.paused?(:import_workflow)  # => true
JobWorkflow::Queue.resume(:import_workflow)
JobWorkflow::Queue.paused?(:import_workflow)  # => false
```

Getting queue metrics

```ruby
JobWorkflow::Queue.latency(:import_workflow)  # => 120 (seconds)
JobWorkflow::Queue.size(:import_workflow)     # => 42 (pending jobs)
```

Listing workflows associated with a queue

```ruby
JobWorkflow::Queue.workflows(:import_workflow)  # => [ImportJob, DataSyncJob]
```

Class Method Summary collapse

Class Method Details

.clear(queue_name) ⇒ Object

: (String | Symbol) -> bool



63
64
65
# File 'lib/job_workflow/queue.rb', line 63

def clear(queue_name)
  QueueAdapter.current.clear_queue(queue_name.to_s)
end

.latency(queue_name) ⇒ Object

: (String | Symbol) -> Integer?



53
54
55
# File 'lib/job_workflow/queue.rb', line 53

def latency(queue_name)
  QueueAdapter.current.queue_latency(queue_name.to_s)
end

.pause(queue_name) ⇒ Object

: (String | Symbol) -> bool



27
28
29
30
31
32
# File 'lib/job_workflow/queue.rb', line 27

def pause(queue_name)
  queue_name_str = queue_name.to_s
  result = QueueAdapter.current.pause_queue(queue_name_str)
  Instrumentation.notify_queue_pause(queue_name_str) if result
  result
end

.paused?(queue_name) ⇒ Boolean

: (String | Symbol) -> bool

Returns:

  • (Boolean)


43
44
45
# File 'lib/job_workflow/queue.rb', line 43

def paused?(queue_name)
  QueueAdapter.current.queue_paused?(queue_name.to_s)
end

.paused_queuesObject

: () -> Array



48
49
50
# File 'lib/job_workflow/queue.rb', line 48

def paused_queues
  QueueAdapter.current.paused_queues
end

.resume(queue_name) ⇒ Object

: (String | Symbol) -> bool



35
36
37
38
39
40
# File 'lib/job_workflow/queue.rb', line 35

def resume(queue_name)
  queue_name_str = queue_name.to_s
  result = QueueAdapter.current.resume_queue(queue_name_str)
  Instrumentation.notify_queue_resume(queue_name_str) if result
  result
end

.size(queue_name) ⇒ Object

: (String | Symbol) -> Integer



58
59
60
# File 'lib/job_workflow/queue.rb', line 58

def size(queue_name)
  QueueAdapter.current.queue_size(queue_name.to_s)
end

.workflows(queue_name) ⇒ Object

: (String | Symbol) -> Array



68
69
70
71
# File 'lib/job_workflow/queue.rb', line 68

def workflows(queue_name)
  queue_name_str = queue_name.to_s
  DSL._included_classes.filter { |job_class| job_class.queue_name == queue_name_str }.to_a
end