Class: Pinot::CircuitBreaker

Inherits:
Object
  • Object
show all
Defined in:
lib/pinot/circuit_breaker.rb

Overview

Per-broker circuit breaker. States: CLOSED (normal), OPEN (rejecting), HALF_OPEN (probing).

failure_threshold - consecutive failures before opening (default 5) open_timeout - seconds to stay OPEN before moving to HALF_OPEN (default 30)

Constant Summary collapse

CLOSED =
:closed
OPEN =
:open
HALF_OPEN =
:half_open
BrokerCircuitOpenError =
Class.new(BrokerNotFoundError)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(failure_threshold: 5, open_timeout: 30) ⇒ CircuitBreaker

Returns a new instance of CircuitBreaker.



15
16
17
18
19
20
21
22
# File 'lib/pinot/circuit_breaker.rb', line 15

def initialize(failure_threshold: 5, open_timeout: 30)
  @failure_threshold = failure_threshold
  @open_timeout      = open_timeout
  @mutex             = Mutex.new
  @state             = CLOSED
  @failure_count     = 0
  @opened_at         = nil
end

Instance Attribute Details

#failure_countObject (readonly)

Returns the value of attribute failure_count.



13
14
15
# File 'lib/pinot/circuit_breaker.rb', line 13

def failure_count
  @failure_count
end

#stateObject (readonly)

Returns the value of attribute state.



13
14
15
# File 'lib/pinot/circuit_breaker.rb', line 13

def state
  @state
end

Instance Method Details

#call(broker_address) ⇒ Object

Call the block; record success/failure and enforce open-circuit rejection.



25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/pinot/circuit_breaker.rb', line 25

def call(broker_address)
  @mutex.synchronize { check_state! }
  begin
    result = yield
    @mutex.synchronize { on_success }
    result
  rescue BrokerUnavailableError, Errno::ECONNRESET, Errno::ECONNREFUSED,
         Errno::ETIMEDOUT, Net::OpenTimeout, Net::ReadTimeout, Net::WriteTimeout => e
    @mutex.synchronize { on_failure }
    raise
  end
end

#open?Boolean

Returns:

  • (Boolean)


38
39
40
# File 'lib/pinot/circuit_breaker.rb', line 38

def open?
  @mutex.synchronize { @state == OPEN }
end

#resetObject



42
43
44
45
46
47
48
# File 'lib/pinot/circuit_breaker.rb', line 42

def reset
  @mutex.synchronize do
    @state         = CLOSED
    @failure_count = 0
    @opened_at     = nil
  end
end