Class: Pinot::CircuitBreaker

Inherits:
Object
  • Object
show all
Defined in:
lib/pinot/circuit_breaker.rb

Overview

Per-broker circuit breaker implementing the classic three-state machine:

CLOSED    — normal operation; failures are counted
OPEN      — all calls rejected immediately with BrokerCircuitOpenError
HALF_OPEN — one probe call allowed through; success → CLOSED, failure → OPEN

A breaker opens after failure_threshold consecutive transport-level failures (BrokerUnavailableError, connection resets, timeouts). It automatically transitions to HALF_OPEN after open_timeout seconds.

Use CircuitBreakerRegistry to share breakers across Connection instances.

Configuration

config = Pinot::ClientConfig.new(
  broker_list:               ["broker:8099"],
  circuit_breaker_enabled:   true,
  circuit_breaker_threshold: 3,   # open after 3 failures (default 5)
  circuit_breaker_timeout:   10   # reopen probe after 10 s (default 30)
)
conn = Pinot.from_config(config)

Error class

Pinot::CircuitBreaker::BrokerCircuitOpenError
  — raised when the circuit is OPEN; inherits from BrokerNotFoundError
    so callers that already rescue BrokerNotFoundError get it for free.

Constant Summary collapse

CLOSED =
:closed
OPEN =
:open
HALF_OPEN =
:half_open
BrokerCircuitOpenError =
Class.new(BrokerNotFoundError)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(failure_threshold: 5, open_timeout: 30) ⇒ CircuitBreaker

Returns a new instance of CircuitBreaker.



41
42
43
44
45
46
47
48
# File 'lib/pinot/circuit_breaker.rb', line 41

def initialize(failure_threshold: 5, open_timeout: 30)
  @failure_threshold = failure_threshold
  @open_timeout      = open_timeout
  @mutex             = Mutex.new
  @state             = CLOSED
  @failure_count     = 0
  @opened_at         = nil
end

Instance Attribute Details

#failure_countObject (readonly)

Returns the value of attribute failure_count.



39
40
41
# File 'lib/pinot/circuit_breaker.rb', line 39

def failure_count
  @failure_count
end

#stateObject (readonly)

Returns the value of attribute state.



39
40
41
# File 'lib/pinot/circuit_breaker.rb', line 39

def state
  @state
end

Instance Method Details

#call(broker_address) ⇒ Object

Call the block; record success/failure and enforce open-circuit rejection.



51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/pinot/circuit_breaker.rb', line 51

def call(broker_address)
  @mutex.synchronize { check_state! }
  begin
    result = yield
    @mutex.synchronize { on_success }
    result
  rescue BrokerUnavailableError, Errno::ECONNRESET, Errno::ECONNREFUSED,
         Errno::ETIMEDOUT, Net::OpenTimeout, Net::ReadTimeout, Net::WriteTimeout => e
    @mutex.synchronize { on_failure }
    raise
  end
end

#open?Boolean

Returns:

  • (Boolean)


64
65
66
# File 'lib/pinot/circuit_breaker.rb', line 64

def open?
  @mutex.synchronize { @state == OPEN }
end

#resetObject



68
69
70
71
72
73
74
# File 'lib/pinot/circuit_breaker.rb', line 68

def reset
  @mutex.synchronize do
    @state         = CLOSED
    @failure_count = 0
    @opened_at     = nil
  end
end