Class: Pinot::CircuitBreaker
- Inherits:
-
Object
- Object
- Pinot::CircuitBreaker
- Defined in:
- lib/pinot/circuit_breaker.rb
Overview
Per-broker circuit breaker implementing the classic three-state machine:
CLOSED — normal operation; failures are counted
OPEN — all calls rejected immediately with BrokerCircuitOpenError
HALF_OPEN — one probe call allowed through; success → CLOSED, failure → OPEN
A breaker opens after failure_threshold consecutive transport-level failures (BrokerUnavailableError, connection resets, timeouts). It automatically transitions to HALF_OPEN after open_timeout seconds.
Use CircuitBreakerRegistry to share breakers across Connection instances.
Configuration
config = Pinot::ClientConfig.new(
broker_list: ["broker:8099"],
circuit_breaker_enabled: true,
circuit_breaker_threshold: 3, # open after 3 failures (default 5)
circuit_breaker_timeout: 10 # reopen probe after 10 s (default 30)
)
conn = Pinot.from_config(config)
Error class
Pinot::CircuitBreaker::BrokerCircuitOpenError
— raised when the circuit is OPEN; inherits from BrokerNotFoundError
so callers that already rescue BrokerNotFoundError get it for free.
Constant Summary collapse
- CLOSED =
:closed- OPEN =
:open- HALF_OPEN =
:half_open- BrokerCircuitOpenError =
Class.new(BrokerNotFoundError)
Instance Attribute Summary collapse
-
#failure_count ⇒ Object
readonly
Returns the value of attribute failure_count.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
Instance Method Summary collapse
-
#call(broker_address) ⇒ Object
Call the block; record success/failure and enforce open-circuit rejection.
-
#initialize(failure_threshold: 5, open_timeout: 30) ⇒ CircuitBreaker
constructor
A new instance of CircuitBreaker.
- #open? ⇒ Boolean
- #reset ⇒ Object
Constructor Details
#initialize(failure_threshold: 5, open_timeout: 30) ⇒ CircuitBreaker
Returns a new instance of CircuitBreaker.
41 42 43 44 45 46 47 48 |
# File 'lib/pinot/circuit_breaker.rb', line 41 def initialize(failure_threshold: 5, open_timeout: 30) @failure_threshold = failure_threshold @open_timeout = open_timeout @mutex = Mutex.new @state = CLOSED @failure_count = 0 @opened_at = nil end |
Instance Attribute Details
#failure_count ⇒ Object (readonly)
Returns the value of attribute failure_count.
39 40 41 |
# File 'lib/pinot/circuit_breaker.rb', line 39 def failure_count @failure_count end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
39 40 41 |
# File 'lib/pinot/circuit_breaker.rb', line 39 def state @state end |
Instance Method Details
#call(broker_address) ⇒ Object
Call the block; record success/failure and enforce open-circuit rejection.
51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/pinot/circuit_breaker.rb', line 51 def call(broker_address) @mutex.synchronize { check_state! } begin result = yield @mutex.synchronize { on_success } result rescue BrokerUnavailableError, Errno::ECONNRESET, Errno::ECONNREFUSED, Errno::ETIMEDOUT, Net::OpenTimeout, Net::ReadTimeout, Net::WriteTimeout => e @mutex.synchronize { on_failure } raise end end |
#open? ⇒ Boolean
64 65 66 |
# File 'lib/pinot/circuit_breaker.rb', line 64 def open? @mutex.synchronize { @state == OPEN } end |
#reset ⇒ Object
68 69 70 71 72 73 74 |
# File 'lib/pinot/circuit_breaker.rb', line 68 def reset @mutex.synchronize do @state = CLOSED @failure_count = 0 @opened_at = nil end end |