Module: ResilientReads

Defined in:
lib/resilient_reads.rb,
lib/resilient_reads/railtie.rb,
lib/resilient_reads/replica.rb,
lib/resilient_reads/version.rb,
lib/resilient_reads/middleware.rb,
lib/resilient_reads/lag_checker.rb,
lib/resilient_reads/query_cache.rb,
lib/resilient_reads/replica_pool.rb,
lib/resilient_reads/adapter_patch.rb,
lib/resilient_reads/configuration.rb,
lib/resilient_reads/health_checker.rb,
lib/resilient_reads/active_job_extension.rb

Defined Under Namespace

Modules: ActiveJobExtension, AdapterPatch, LagChecker Classes: Configuration, HealthChecker, Middleware, NoHealthyReplica, QueryCache, Railtie, Replica, ReplicaPool, TooMuchLag

Constant Summary collapse

WRITE_PATTERN =

SQL patterns that indicate a write operation (PostgreSQL + MySQL/MariaDB).

/\A\s*(INSERT|UPDATE|DELETE|CREATE|ALTER|DROP|TRUNCATE|GRANT|REVOKE|LOCK\s+TABLE|SET\s|BEGIN|COMMIT|ROLLBACK|SAVEPOINT|RELEASE|COPY\s|REPLACE\s|LOAD\s+DATA|CALL\s)/i
VERSION =
"0.1.1"

Class Method Summary collapse

Class Method Details

.bust_query_cache!Object

Clear cached SQL pattern results.



105
106
107
# File 'lib/resilient_reads.rb', line 105

def bust_query_cache!
  @query_cache&.clear!
end

.configObject



23
24
25
# File 'lib/resilient_reads.rb', line 23

def config
  @config ||= Configuration.new
end

.configure {|config| ... } ⇒ Object

Yields:



27
28
29
# File 'lib/resilient_reads.rb', line 27

def configure
  yield config
end

.distributing?Boolean

Are we currently inside a distribute_reads block?

Returns:

  • (Boolean)


80
81
82
83
# File 'lib/resilient_reads.rb', line 80

def distributing?
  ctx = Thread.current[:resilient_reads_context]
  ctx && ctx[:distributing]
end

.health_checkerObject



35
36
37
# File 'lib/resilient_reads.rb', line 35

def health_checker
  @health_checker
end

.log(level, message) ⇒ Object



109
110
111
112
113
114
115
116
# File 'lib/resilient_reads.rb', line 109

def log(level, message)
  logger = config.logger
  return unless logger

  logger.public_send(level, "[ResilientReads] #{message}")
rescue
  # Never let logging break query flow.
end

.log_query(connection_name, sql, query_name = nil, reason: nil) ⇒ Object

Per-query routing log. Only emits when config.log_query_routing is true. Uses config.log_query_level (default :info) so messages are visible in standard Rails development/production logs.

Output example:

[ResilientReads] → replica 'replica1' | User Load | SELECT "users".* …
[ResilientReads] → primary (write query) | User Create | INSERT INTO …


125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/resilient_reads.rb', line 125

def log_query(connection_name, sql, query_name = nil, reason: nil)
  return unless config.log_query_routing

  logger = config.logger
  return unless logger

  label = if connection_name == "primary"
            reason ? "primary (#{reason})" : "primary"
  else
            reason ? "replica '#{connection_name}' (#{reason})" : "replica '#{connection_name}'"
  end

  truncated_sql = sql.length > 120 ? "#{sql[0, 120]}" : sql
  parts = [ "[ResilientReads] → #{label}" ]
  parts << query_name if query_name && !query_name.empty?
  parts << truncated_sql.gsub(/\s+/, " ").strip
  logger.public_send(config.log_query_level, parts.join(" | "))
rescue
  # Never let logging break query flow.
end

.patch_adapter!Object



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/resilient_reads.rb', line 175

def patch_adapter!
  patched = []
  if defined?(ActiveRecord::ConnectionAdapters::PostgreSQLAdapter)
    ActiveRecord::ConnectionAdapters::PostgreSQLAdapter.prepend(AdapterPatch)
    patched << "PostgreSQLAdapter"
  end
  if defined?(ActiveRecord::ConnectionAdapters::Mysql2Adapter)
    ActiveRecord::ConnectionAdapters::Mysql2Adapter.prepend(AdapterPatch)
    patched << "Mysql2Adapter"
  end
  if defined?(ActiveRecord::ConnectionAdapters::TrilogyAdapter)
    ActiveRecord::ConnectionAdapters::TrilogyAdapter.prepend(AdapterPatch)
    patched << "TrilogyAdapter"
  end

  log(:debug, "Patched adapters: #{patched.join(', ')}") if patched.any?
end

.query_cacheObject



90
91
92
93
94
# File 'lib/resilient_reads.rb', line 90

def query_cache
  @query_cache ||= QueryCache.new(
    max_size: config.query_cache_max_size
  )
end

.replica_poolObject



31
32
33
# File 'lib/resilient_reads.rb', line 31

def replica_pool
  @replica_pool ||= ReplicaPool.new
end

.replication_lagObject

Convenience: get current replication lag (seconds).



86
87
88
# File 'lib/resilient_reads.rb', line 86

def replication_lag
  LagChecker.replication_lag
end

.restart_health_checker!Object



207
208
209
210
# File 'lib/resilient_reads.rb', line 207

def restart_health_checker!
  stop_health_checker!
  start_health_checker!
end

.run(**options, &block) ⇒ Object


Core entry point — wraps a block so read queries go to a replica.




42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/resilient_reads.rb', line 42

def run(**options, &block)
  opts = config.default_options.merge(options)

  # Explicit primary override
  if opts[:primary]
    return block.call
  end

  # No replicas configured — just run on primary.
  if replica_pool.empty? || !replica_pool.any_healthy?
    if opts.fetch(:failover, config.failover)
      return block.call
    else
      raise NoHealthyReplica, "No healthy replicas available"
    end
  end

  prev_ctx = Thread.current[:resilient_reads_context]
  Thread.current[:resilient_reads_context] = {
    distributing: true,
    on_replica: false,
    routing: false,
    options: opts
  }

  begin
    result = block.call
    if config.eager_load && result.is_a?(ActiveRecord::Relation) && !result.loaded?
      result = result.load
    end
    result
  ensure
    Thread.current[:resilient_reads_context] = prev_ctx
    replica_pool.release_all_connections
  end
end

.setup_replicas!Object


Setup helpers (called from Railtie or manually in non-Rails apps)




150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/resilient_reads.rb', line 150

def setup_replicas!
  names = replica_config_names

  if names.empty?
    log(:info, "No replica configs detected (pattern: #{config.replica_pattern.inspect}). All reads will use primary.")
    return
  end

  names.each do |name|
    klass = build_replica_class(name)
    replica = Replica.new(name, klass)
    replica_pool.add(replica)
  end

  # Initial health probe — failures are non-fatal.
  replica_pool.each do |r|
    r.check_health!
  rescue => e
    log(:warn, "Initial health check for '#{r.name}' failed: #{e.message}")
  end

  log(:info, "Configured #{replica_pool.size} replica(s): #{names.join(', ')} " \
             "(#{replica_pool.healthy_count} healthy)")
end

.start_health_checker!Object



193
194
195
196
197
198
199
200
201
# File 'lib/resilient_reads.rb', line 193

def start_health_checker!
  return if replica_pool.empty?

  @health_checker = HealthChecker.new(
    replica_pool,
    interval: config.health_check_interval
  )
  @health_checker.start
end

.stop_health_checker!Object



203
204
205
# File 'lib/resilient_reads.rb', line 203

def stop_health_checker!
  @health_checker&.stop
end

.write_query?(sql) ⇒ Boolean

Returns:

  • (Boolean)


96
97
98
99
100
101
102
# File 'lib/resilient_reads.rb', line 96

def write_query?(sql)
  if config.query_cache_enabled
    query_cache.fetch(sql) { |s| WRITE_PATTERN.match?(s) }
  else
    WRITE_PATTERN.match?(sql)
  end
end