Class: ResilientReads::Replica

Inherits:
Object
  • Object
show all
Defined in:
lib/resilient_reads/replica.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, connection_class) ⇒ Replica

Returns a new instance of Replica.



5
6
7
8
9
10
11
12
13
14
# File 'lib/resilient_reads/replica.rb', line 5

def initialize(name, connection_class)
  @name = name.to_s
  @connection_class = connection_class
  @healthy = true
  @failure_count = 0
  @last_check_at = nil
  @last_lag = nil
  @last_lag_check_at = nil
  @mutex = Mutex.new
end

Instance Attribute Details

#connection_classObject (readonly)

Returns the value of attribute connection_class.



3
4
5
# File 'lib/resilient_reads/replica.rb', line 3

def connection_class
  @connection_class
end

#nameObject (readonly)

Returns the value of attribute name.



3
4
5
# File 'lib/resilient_reads/replica.rb', line 3

def name
  @name
end

Instance Method Details

#cached_lagObject

Returns the cached lag value if still fresh, otherwise queries the replica for the current replication lag. The TTL is controlled by ResilientReads.config.lag_check_interval (default 5 s).



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/resilient_reads/replica.rb', line 47

def cached_lag
  now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  ttl = ResilientReads.config.lag_check_interval

  @mutex.synchronize do
    if @last_lag_check_at && (now - @last_lag_check_at) < ttl
      return @last_lag
    end
  end

  lag = LagChecker.lag_for(self)

  @mutex.synchronize do
    @last_lag = lag
    @last_lag_check_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  end

  lag
end

#check_health!Object

Verify the replica is reachable. Returns true/false and updates health.



88
89
90
91
92
93
94
95
96
97
98
# File 'lib/resilient_reads/replica.rb', line 88

def check_health!
  @connection_class.connection.execute("SELECT 1")
  mark_healthy!
  true
rescue => e
  mark_unhealthy!
  ResilientReads.log(:debug, "Health check failed for '#{@name}': #{e.message}")
  false
ensure
  release_connection rescue nil
end

#connectionObject



75
76
77
# File 'lib/resilient_reads/replica.rb', line 75

def connection
  @connection_class.connection
end

#connection_poolObject



79
80
81
# File 'lib/resilient_reads/replica.rb', line 79

def connection_pool
  @connection_class.connection_pool
end

#failure_countObject



40
41
42
# File 'lib/resilient_reads/replica.rb', line 40

def failure_count
  @mutex.synchronize { @failure_count }
end

#healthy?Boolean

Returns:

  • (Boolean)


16
17
18
# File 'lib/resilient_reads/replica.rb', line 16

def healthy?
  @mutex.synchronize { @healthy }
end

#invalidate_lag_cache!Object

Invalidate the cached lag so the next call to cached_lag re-queries.



68
69
70
71
72
73
# File 'lib/resilient_reads/replica.rb', line 68

def invalidate_lag_cache!
  @mutex.synchronize do
    @last_lag_check_at = nil
    @last_lag = nil
  end
end

#mark_healthy!Object



20
21
22
23
24
25
26
27
28
# File 'lib/resilient_reads/replica.rb', line 20

def mark_healthy!
  @mutex.synchronize do
    was_unhealthy = !@healthy
    @healthy = true
    @failure_count = 0
    @last_check_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    ResilientReads.log(:info, "Replica '#{@name}' recovered") if was_unhealthy
  end
end

#mark_unhealthy!Object



30
31
32
33
34
35
36
37
38
# File 'lib/resilient_reads/replica.rb', line 30

def mark_unhealthy!
  @mutex.synchronize do
    was_healthy = @healthy
    @healthy = false
    @failure_count += 1
    @last_check_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    ResilientReads.log(:warn, "Replica '#{@name}' marked unhealthy (failure ##{@failure_count})") if was_healthy
  end
end

#release_connectionObject



83
84
85
# File 'lib/resilient_reads/replica.rb', line 83

def release_connection
  @connection_class.connection_pool.release_connection
end