Class: Pgbus::CircuitBreaker
- Inherits:
-
Object
- Object
- Pgbus::CircuitBreaker
- Defined in:
- lib/pgbus/circuit_breaker.rb
Constant Summary collapse
- THRESHOLD =
Number of consecutive failures on a queue before the breaker trips and pauses the queue. Tuned via constants rather than configuration because the value rarely needs adjusting and exposing it as a setting never proved useful in practice.
5- BASE_BACKOFF =
Initial backoff (seconds) on the first trip. Doubles on each subsequent trip up to MAX_BACKOFF.
30- MAX_BACKOFF =
Cap on the exponential backoff (seconds). After ~5 trips the curve plateaus here so a perpetually-failing queue stops spamming retries.
600
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
Instance Method Summary collapse
-
#initialize(config: Pgbus.configuration) ⇒ CircuitBreaker
constructor
A new instance of CircuitBreaker.
- #invalidate_cache(queue_name = nil) ⇒ Object
- #pause!(queue_name, reason: nil) ⇒ Object
- #paused?(queue_name) ⇒ Boolean
- #record_failure(queue_name) ⇒ Object
- #record_success(queue_name) ⇒ Object
- #resume!(queue_name) ⇒ Object
Constructor Details
#initialize(config: Pgbus.configuration) ⇒ CircuitBreaker
Returns a new instance of CircuitBreaker.
23 24 25 26 27 28 |
# File 'lib/pgbus/circuit_breaker.rb', line 23 def initialize(config: Pgbus.configuration) @config = config @failure_counts = Concurrent::Map.new @pause_cache = Concurrent::Map.new @pause_cache_ttl = 30 # seconds end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
21 22 23 |
# File 'lib/pgbus/circuit_breaker.rb', line 21 def config @config end |
Instance Method Details
#invalidate_cache(queue_name = nil) ⇒ Object
64 65 66 67 68 69 70 |
# File 'lib/pgbus/circuit_breaker.rb', line 64 def invalidate_cache(queue_name = nil) if queue_name @pause_cache.delete(queue_name) else @pause_cache.clear end end |
#pause!(queue_name, reason: nil) ⇒ Object
53 54 55 56 |
# File 'lib/pgbus/circuit_breaker.rb', line 53 def pause!(queue_name, reason: nil) QueueState.pause!(queue_name, reason: reason) invalidate_cache(queue_name) end |
#paused?(queue_name) ⇒ Boolean
44 45 46 47 48 49 50 51 |
# File 'lib/pgbus/circuit_breaker.rb', line 44 def paused?(queue_name) cached = @pause_cache[queue_name] return cached[:paused] if cached && (Time.current - cached[:checked_at]) < @pause_cache_ttl paused = check_paused(queue_name) @pause_cache[queue_name] = { paused: paused, checked_at: Time.current } paused end |
#record_failure(queue_name) ⇒ Object
34 35 36 37 38 39 40 41 42 |
# File 'lib/pgbus/circuit_breaker.rb', line 34 def record_failure(queue_name) return unless config.circuit_breaker_enabled count = @failure_counts.compute(queue_name) { |val| (val || 0) + 1 } return unless count >= THRESHOLD trip!(queue_name, count) end |
#record_success(queue_name) ⇒ Object
30 31 32 |
# File 'lib/pgbus/circuit_breaker.rb', line 30 def record_success(queue_name) @failure_counts.delete(queue_name) end |
#resume!(queue_name) ⇒ Object
58 59 60 61 62 |
# File 'lib/pgbus/circuit_breaker.rb', line 58 def resume!(queue_name) QueueState.resume!(queue_name) @failure_counts.delete(queue_name) invalidate_cache(queue_name) end |