Module: Ec::Pg::RlsManager

Defined in:
lib/ec/pg/rls_manager.rb

Overview

Manages ActiveRecord database shard switching.

Leverages ActiveRecord’s native multi-database support (AR 6.1+) via connected_to(shard:). Falls back to manual connection swapping for connection configurations that are not registered as named shards.

Configuration in database.yml (Rails multi-db style)

production:
  primary:
    <<: *default
    database: app_primary
  db_sharded_shard_1:
    <<: *default
    database: app_shard_one
    migrations_paths: db/migrate_shards

Usage

ShardManager.with_shard(:shard_one) { User.all }
ShardManager.with_shard(:shard_one, role: :reading) { User.all }

Defined Under Namespace

Classes: UnregisteredVariable

Class Method Summary collapse

Class Method Details

.apply_rls(connection, rls_mode, selected_variables) ⇒ Object

Executes the SQL to set the RLS variable.



70
71
72
# File 'lib/ec/pg/rls_manager.rb', line 70

def apply_rls(connection, rls_mode, selected_variables)
  connection.execute(sanitized_query(rls_mode, selected_variables))
end

.reset!(connection:, variables:) ⇒ Object

Resets the RLS variable to its default (empty) value. For :session mode this uses RESET; for :local mode this is a no-op because the variable resets automatically at transaction end.



62
63
64
65
66
# File 'lib/ec/pg/rls_manager.rb', line 62

def reset!(connection:, variables:)
  variables.each do |variable|
    reset_variable!(connection, variable)
  end
end

.reset_variable!(connection, variable) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/ec/pg/rls_manager.rb', line 105

def reset_variable!(connection, variable)
  connection.execute("RESET #{variable}")
rescue StandardError
  # Swallow reset errors: connection may have been released or variable
  # may not support RESET (application-level variables cannot be RESET in
  # some Postgres versions; in that case use SET to empty string instead).
  begin
    connection.execute("SET #{variable} TO DEFAULT")
  rescue StandardError
    nil
  end
end

.sanitized_query(mode, selected_variables) ⇒ Object



74
75
76
77
78
79
80
81
82
# File 'lib/ec/pg/rls_manager.rb', line 74

def sanitized_query(mode, selected_variables)
  local = if mode == :local 
    'LOCAL' 
  end

  selected_variables.map do |variable, value|
    ("SET %s #{variable} = #{value};" % local).squeeze(' ')
  end.join(' ')
end

.variable_value_for(registered_variables, variables) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
# File 'lib/ec/pg/rls_manager.rb', line 93

def variable_value_for(registered_variables, variables) 
  {}.tap do |hash|
    variables.each do |key, value| 
      if registered_variables.has_key?(key)
        hash[registered_variables[key]] = value
      else 
        raise UnregisteredVariable, "'#{key}' has not been registered through acts_as_rls."
      end
    end
  end
end

.with_rls(rls_mode: nil, registered_variables: {}, variables: {}, connection: nil) { ... } ⇒ Object

Executes block with the AR connection switched to shard_name.

Parameters:

  • shard_name (Symbol)

    the registered shard key

  • role (Symbol)

    :writing (default) or :reading

  • klass (Class)

    the AR base class whose connection pool to switch (default: ActiveRecord::Base)

Yields:

  • block to run in shard context

Returns:

  • the return value of block



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/ec/pg/rls_manager.rb', line 38

def with_rls(rls_mode: nil, registered_variables: {}, variables: {}, connection: nil, &block)
  connection ||= ActiveRecord::Base.connection 
  rls_mode ||= Ec::Pg.configuration.rls_mode || :local

  selected_variables = variable_value_for(registered_variables, variables)

  if rls_mode == :local
    wrap_in_transaction(connection) do
      apply_rls(connection, rls_mode, selected_variables)
      block.call
    end
  else
    begin
      apply_rls(connection, rls_mode, selected_variables)
      block.call
    ensure
      reset!(connection: connection, variables: selected_variables.keys)
    end
  end
end

.wrap_in_transaction(connection, &block) ⇒ Object

Wraps the block in a transaction, reusing an existing one if open.



85
86
87
88
89
90
91
# File 'lib/ec/pg/rls_manager.rb', line 85

def wrap_in_transaction(connection, &block)
  if connection.transaction_open?
    block.call
  else
    connection.transaction(&block)
  end
end