Class: ResilientReads::Replica
- Inherits:
-
Object
- Object
- ResilientReads::Replica
- Defined in:
- lib/resilient_reads/replica.rb
Instance Attribute Summary collapse
-
#connection_class ⇒ Object
readonly
Returns the value of attribute connection_class.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
-
#cached_lag ⇒ Object
Returns the cached lag value if still fresh, otherwise queries the replica for the current replication lag.
-
#check_health! ⇒ Object
Verify the replica is reachable.
- #connection ⇒ Object
- #connection_pool ⇒ Object
- #failure_count ⇒ Object
- #healthy? ⇒ Boolean
-
#initialize(name, connection_class) ⇒ Replica
constructor
A new instance of Replica.
-
#invalidate_lag_cache! ⇒ Object
Invalidate the cached lag so the next call to
cached_lagre-queries. - #mark_healthy! ⇒ Object
- #mark_unhealthy! ⇒ Object
- #release_connection ⇒ Object
Constructor Details
#initialize(name, connection_class) ⇒ Replica
Returns a new instance of Replica.
5 6 7 8 9 10 11 12 13 14 |
# File 'lib/resilient_reads/replica.rb', line 5 def initialize(name, connection_class) @name = name.to_s @connection_class = connection_class @healthy = true @failure_count = 0 @last_check_at = nil @last_lag = nil @last_lag_check_at = nil @mutex = Mutex.new end |
Instance Attribute Details
#connection_class ⇒ Object (readonly)
Returns the value of attribute connection_class.
3 4 5 |
# File 'lib/resilient_reads/replica.rb', line 3 def connection_class @connection_class end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
3 4 5 |
# File 'lib/resilient_reads/replica.rb', line 3 def name @name end |
Instance Method Details
#cached_lag ⇒ Object
Returns the cached lag value if still fresh, otherwise queries the replica for the current replication lag. The TTL is controlled by ResilientReads.config.lag_check_interval (default 5 s).
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/resilient_reads/replica.rb', line 47 def cached_lag now = Process.clock_gettime(Process::CLOCK_MONOTONIC) ttl = ResilientReads.config.lag_check_interval @mutex.synchronize do if @last_lag_check_at && (now - @last_lag_check_at) < ttl return @last_lag end end lag = LagChecker.lag_for(self) @mutex.synchronize do @last_lag = lag @last_lag_check_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) end lag end |
#check_health! ⇒ Object
Verify the replica is reachable. Returns true/false and updates health.
88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/resilient_reads/replica.rb', line 88 def check_health! @connection_class.connection.execute("SELECT 1") mark_healthy! true rescue => e mark_unhealthy! ResilientReads.log(:debug, "Health check failed for '#{@name}': #{e.}") false ensure release_connection rescue nil end |
#connection ⇒ Object
75 76 77 |
# File 'lib/resilient_reads/replica.rb', line 75 def connection @connection_class.connection end |
#connection_pool ⇒ Object
79 80 81 |
# File 'lib/resilient_reads/replica.rb', line 79 def connection_pool @connection_class.connection_pool end |
#failure_count ⇒ Object
40 41 42 |
# File 'lib/resilient_reads/replica.rb', line 40 def failure_count @mutex.synchronize { @failure_count } end |
#healthy? ⇒ Boolean
16 17 18 |
# File 'lib/resilient_reads/replica.rb', line 16 def healthy? @mutex.synchronize { @healthy } end |
#invalidate_lag_cache! ⇒ Object
Invalidate the cached lag so the next call to cached_lag re-queries.
68 69 70 71 72 73 |
# File 'lib/resilient_reads/replica.rb', line 68 def invalidate_lag_cache! @mutex.synchronize do @last_lag_check_at = nil @last_lag = nil end end |
#mark_healthy! ⇒ Object
20 21 22 23 24 25 26 27 28 |
# File 'lib/resilient_reads/replica.rb', line 20 def mark_healthy! @mutex.synchronize do was_unhealthy = !@healthy @healthy = true @failure_count = 0 @last_check_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) ResilientReads.log(:info, "Replica '#{@name}' recovered") if was_unhealthy end end |
#mark_unhealthy! ⇒ Object
30 31 32 33 34 35 36 37 38 |
# File 'lib/resilient_reads/replica.rb', line 30 def mark_unhealthy! @mutex.synchronize do was_healthy = @healthy @healthy = false @failure_count += 1 @last_check_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) ResilientReads.log(:warn, "Replica '#{@name}' marked unhealthy (failure ##{@failure_count})") if was_healthy end end |
#release_connection ⇒ Object
83 84 85 |
# File 'lib/resilient_reads/replica.rb', line 83 def release_connection @connection_class.connection_pool.release_connection end |