Module: RoundhouseUi::Pause

Defined in:
lib/roundhouse_ui/pause.rb

Overview

Roundhouse's own queue-pause registry — pure OSS, no Sidekiq Pro.

Paused queue names live in a Redis set. RoundhouseUi::Fetch consults this set and skips paused queues when pulling work, so a paused queue stops being consumed without stopping the worker process.

Constant Summary collapse

KEY =
"roundhouse:paused"
FETCH_FLAG =

liveness beacon set by the fetcher

"roundhouse:fetch_alive"

Class Method Summary collapse

Class Method Details

.fetch_installed?Boolean

True when a RoundhouseUi::Fetch has reported in recently — i.e. pausing will take effect. When false, the UI warns instead of pretending.

Returns:

  • (Boolean)


55
56
57
# File 'lib/roundhouse_ui/pause.rb', line 55

def fetch_installed?
  Sidekiq.redis { |conn| conn.call("EXISTS", FETCH_FLAG) } == 1
end

.mark_fetch_alive!(ttl = 30) ⇒ Object

The fetcher calls this periodically so the web UI can tell whether pausing is actually enforced (the worker and web run in separate processes). The short TTL means the flag disappears soon after all Roundhouse fetchers stop.



49
50
51
# File 'lib/roundhouse_ui/pause.rb', line 49

def mark_fetch_alive!(ttl = 30)
  Sidekiq.redis { |conn| conn.call("SET", FETCH_FLAG, "1", "EX", ttl) }
end

.pause!(queue) ⇒ Object



16
17
18
# File 'lib/roundhouse_ui/pause.rb', line 16

def pause!(queue)
  Sidekiq.redis { |conn| conn.call("SADD", KEY, queue.to_s) }
end

.paused?(queue) ⇒ Boolean

Returns:

  • (Boolean)


24
25
26
# File 'lib/roundhouse_ui/pause.rb', line 24

def paused?(queue)
  Sidekiq.redis { |conn| conn.call("SISMEMBER", KEY, queue.to_s) } == 1
end

.paused_queuesObject



28
29
30
# File 'lib/roundhouse_ui/pause.rb', line 28

def paused_queues
  Sidekiq.redis { |conn| conn.call("SMEMBERS", KEY) }.sort
end

.paused_setObject



32
33
34
# File 'lib/roundhouse_ui/pause.rb', line 32

def paused_set
  Set.new(Sidekiq.redis { |conn| conn.call("SMEMBERS", KEY) })
end

.reject_paused(queue_keys) ⇒ Object

Given the redis queue keys BasicFetch would poll (e.g. "queue:default"), drop any whose queue is paused. Pure given the paused set, so it's unit testable without a running Sidekiq.



39
40
41
42
43
44
# File 'lib/roundhouse_ui/pause.rb', line 39

def reject_paused(queue_keys)
  paused = paused_set
  return queue_keys if paused.empty?

  queue_keys.reject { |key| paused.include?(key.to_s.delete_prefix("queue:")) }
end

.unpause!(queue) ⇒ Object



20
21
22
# File 'lib/roundhouse_ui/pause.rb', line 20

def unpause!(queue)
  Sidekiq.redis { |conn| conn.call("SREM", KEY, queue.to_s) }
end