Class: Pgbus::CircuitBreaker

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/circuit_breaker.rb

Constant Summary collapse

THRESHOLD =

Number of consecutive failures on a queue before the breaker trips and pauses the queue. Tuned via constants rather than configuration because the value rarely needs adjusting and exposing it as a setting never proved useful in practice.

5
BASE_BACKOFF =

Initial backoff (seconds) on the first trip. Doubles on each subsequent trip up to MAX_BACKOFF.

30
MAX_BACKOFF =

Cap on the exponential backoff (seconds). After ~5 trips the curve plateaus here so a perpetually-failing queue stops spamming retries.

600

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config: Pgbus.configuration) ⇒ CircuitBreaker

Returns a new instance of CircuitBreaker.



23
24
25
26
27
28
# File 'lib/pgbus/circuit_breaker.rb', line 23

def initialize(config: Pgbus.configuration)
  @config = config
  @failure_counts = Concurrent::Map.new
  @pause_cache = Concurrent::Map.new
  @pause_cache_ttl = 30 # seconds
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



21
22
23
# File 'lib/pgbus/circuit_breaker.rb', line 21

def config
  @config
end

Instance Method Details

#invalidate_cache(queue_name = nil) ⇒ Object



64
65
66
67
68
69
70
# File 'lib/pgbus/circuit_breaker.rb', line 64

def invalidate_cache(queue_name = nil)
  if queue_name
    @pause_cache.delete(queue_name)
  else
    @pause_cache.clear
  end
end

#pause!(queue_name, reason: nil) ⇒ Object



53
54
55
56
# File 'lib/pgbus/circuit_breaker.rb', line 53

def pause!(queue_name, reason: nil)
  QueueState.pause!(queue_name, reason: reason)
  invalidate_cache(queue_name)
end

#paused?(queue_name) ⇒ Boolean

Returns:

  • (Boolean)


44
45
46
47
48
49
50
51
# File 'lib/pgbus/circuit_breaker.rb', line 44

def paused?(queue_name)
  cached = @pause_cache[queue_name]
  return cached[:paused] if cached && (Time.current - cached[:checked_at]) < @pause_cache_ttl

  paused = check_paused(queue_name)
  @pause_cache[queue_name] = { paused: paused, checked_at: Time.current }
  paused
end

#record_failure(queue_name) ⇒ Object



34
35
36
37
38
39
40
41
42
# File 'lib/pgbus/circuit_breaker.rb', line 34

def record_failure(queue_name)
  return unless config.circuit_breaker_enabled

  count = @failure_counts.compute(queue_name) { |val| (val || 0) + 1 }

  return unless count >= THRESHOLD

  trip!(queue_name, count)
end

#record_success(queue_name) ⇒ Object



30
31
32
# File 'lib/pgbus/circuit_breaker.rb', line 30

def record_success(queue_name)
  @failure_counts.delete(queue_name)
end

#resume!(queue_name) ⇒ Object



58
59
60
61
62
# File 'lib/pgbus/circuit_breaker.rb', line 58

def resume!(queue_name)
  QueueState.resume!(queue_name)
  @failure_counts.delete(queue_name)
  invalidate_cache(queue_name)
end