Class: Pinot::CircuitBreaker

Inherits:
Object
  • Object
show all
Defined in:
lib/pinot/circuit_breaker.rb

Overview

Per-broker circuit breaker implementing the classic three-state machine:

CLOSED    — normal operation; failures are counted
OPEN      — all calls rejected immediately with BrokerCircuitOpenError
HALF_OPEN — one probe call allowed through; success → CLOSED, failure → OPEN

A breaker opens after failure_threshold consecutive transport-level failures (BrokerUnavailableError, connection resets, timeouts). It automatically transitions to HALF_OPEN after open_timeout seconds.

Use CircuitBreakerRegistry to share breakers across Connection instances.

Configuration

config = Pinot::ClientConfig.new(
  broker_list:               ["broker:8099"],
  circuit_breaker_enabled:   true,
  circuit_breaker_threshold: 3,   # open after 3 failures (default 5)
  circuit_breaker_timeout:   10   # reopen probe after 10 s (default 30)
)
conn = Pinot.from_config(config)

Error class

Pinot::CircuitBreaker::BrokerCircuitOpenError
  — raised when the circuit is OPEN; inherits from BrokerNotFoundError
    so callers that already rescue BrokerNotFoundError get it for free.

Defined Under Namespace

Classes: BrokerCircuitOpenError

Constant Summary collapse

CLOSED =
:closed
OPEN =
:open
HALF_OPEN =
:half_open

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(failure_threshold: 5, open_timeout: 30) ⇒ CircuitBreaker

Returns a new instance of CircuitBreaker.



42
43
44
45
46
47
48
49
# File 'lib/pinot/circuit_breaker.rb', line 42

def initialize(failure_threshold: 5, open_timeout: 30)
  @failure_threshold = failure_threshold
  @open_timeout      = open_timeout
  @mutex             = Mutex.new
  @state             = CLOSED
  @failure_count     = 0
  @opened_at         = nil
end

Instance Attribute Details

#failure_countObject (readonly)

Returns the value of attribute failure_count.



40
41
42
# File 'lib/pinot/circuit_breaker.rb', line 40

def failure_count
  @failure_count
end

#stateObject (readonly)

Returns the value of attribute state.



40
41
42
# File 'lib/pinot/circuit_breaker.rb', line 40

def state
  @state
end

Instance Method Details

#call(_broker_address) ⇒ Object

Call the block; record success/failure and enforce open-circuit rejection.



52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/pinot/circuit_breaker.rb', line 52

def call(_broker_address)
  @mutex.synchronize { check_state! }
  begin
    result = yield
    @mutex.synchronize { on_success }
    result
  rescue BrokerUnavailableError, Errno::ECONNRESET, Errno::ECONNREFUSED,
         Errno::ETIMEDOUT, Net::OpenTimeout, Net::ReadTimeout, Net::WriteTimeout
    @mutex.synchronize { on_failure }
    raise
  end
end

#open?Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/pinot/circuit_breaker.rb', line 65

def open?
  @mutex.synchronize { @state == OPEN }
end

#resetObject



69
70
71
72
73
74
75
# File 'lib/pinot/circuit_breaker.rb', line 69

def reset
  @mutex.synchronize do
    @state         = CLOSED
    @failure_count = 0
    @opened_at     = nil
  end
end