Class: Bricolage::PSQLDataSource
- Inherits:
-
DataSource
- Object
- DataSource
- Bricolage::PSQLDataSource
- Includes:
- CommandUtils, VacuumLock
- Defined in:
- lib/bricolage/psqldatasource.rb
Constant Summary collapse
- DEFAULT_RETRY_LIMIT =
3
Constants included from VacuumLock
VacuumLock::DEFAULT_VACUUM_LOCK_FILE, VacuumLock::DEFAULT_VACUUM_LOCK_TIMEOUT
Constants inherited from DataSource
Instance Attribute Summary collapse
-
#database ⇒ Object
readonly
Returns the value of attribute database.
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#query_sql_log_level ⇒ Object
readonly
Returns the value of attribute query_sql_log_level.
-
#sql_log_level ⇒ Object
readonly
Returns the value of attribute sql_log_level.
-
#update_sql_log_level ⇒ Object
readonly
Returns the value of attribute update_sql_log_level.
-
#user ⇒ Object
readonly
Returns the value of attribute user.
Attributes inherited from DataSource
Instance Method Summary collapse
- #analyze(table) ⇒ Object
-
#clear_connection_pool ⇒ Object
not MT-safe.
- #drop_table(name) ⇒ Object
- #drop_table_force(name) ⇒ Object
- #execute(source, options = []) ⇒ Object
- #get_psql_env ⇒ Object
-
#initialize(host: 'localhost', port: 5439, database: 'dev', username: ENV['LOGNAME'], password: nil, pgpass: nil, encoding: nil, psql: 'psql', sql_log_level: nil, query_sql_log_level: nil, update_sql_log_level: nil, tmpdir: Dir.tmpdir) ⇒ PSQLDataSource
constructor
A new instance of PSQLDataSource.
- #new_task ⇒ Object
- #open ⇒ Object
- #open_for_batch ⇒ Object
- #open_shared_connection ⇒ Object
-
#password ⇒ Object
Ruby Library Interface.
- #query_batch(query, batch_size = 5000, &block) ⇒ Object
- #read_password_from_pgpass(path, user) ⇒ Object
- #select(table, &block) ⇒ Object
- #vacuum(table) ⇒ Object
- #vacuum_sort_only(table) ⇒ Object
Methods included from VacuumLock
cleanup_vacuum_lock, create_lockfile_cmd, create_vacuum_lock_file, enable_vacuum_lock?, locking?, psql_serialize_vacuum_begin, psql_serialize_vacuum_end, serialize_vacuum, using, #using_vacuum_lock, vacuum_lock_parameters
Methods included from CommandUtils
#command, #make_tmpfile, #new_tmpfile_path
Methods inherited from DataSource
get_class, new_for_type, #redshift_loader_source?
Constructor Details
#initialize(host: 'localhost', port: 5439, database: 'dev', username: ENV['LOGNAME'], password: nil, pgpass: nil, encoding: nil, psql: 'psql', sql_log_level: nil, query_sql_log_level: nil, update_sql_log_level: nil, tmpdir: Dir.tmpdir) ⇒ PSQLDataSource
Returns a new instance of PSQLDataSource.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/bricolage/psqldatasource.rb', line 20 def initialize( host: 'localhost', port: 5439, database: 'dev', username: ENV['LOGNAME'], password: nil, pgpass: nil, encoding: nil, psql: 'psql', sql_log_level: nil, query_sql_log_level: nil, update_sql_log_level: nil, tmpdir: Dir.tmpdir) @host = host @port = port @database = database @user = username @password = password @pgpass = pgpass @encoding = encoding @psql = psql @sql_log_level = Logger.intern_severity(sql_log_level || Logger::DEBUG) @query_sql_log_level = Logger.intern_severity(query_sql_log_level || sql_log_level || Logger::DEBUG) @update_sql_log_level = Logger.intern_severity(update_sql_log_level || sql_log_level || Logger::INFO) @tmpdir = tmpdir @connection_pool = [] raise ParameterError, "missing psql host" unless @host raise ParameterError, "missing psql port" unless @port raise ParameterError, "missing psql database" unless @database raise ParameterError, "missing psql username" unless @user unless @pgpass or @password raise ParameterError, "missing psql password" end end |
Instance Attribute Details
#database ⇒ Object (readonly)
Returns the value of attribute database.
57 58 59 |
# File 'lib/bricolage/psqldatasource.rb', line 57 def database @database end |
#host ⇒ Object (readonly)
Returns the value of attribute host.
55 56 57 |
# File 'lib/bricolage/psqldatasource.rb', line 55 def host @host end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
56 57 58 |
# File 'lib/bricolage/psqldatasource.rb', line 56 def port @port end |
#query_sql_log_level ⇒ Object (readonly)
Returns the value of attribute query_sql_log_level.
61 62 63 |
# File 'lib/bricolage/psqldatasource.rb', line 61 def query_sql_log_level @query_sql_log_level end |
#sql_log_level ⇒ Object (readonly)
Returns the value of attribute sql_log_level.
60 61 62 |
# File 'lib/bricolage/psqldatasource.rb', line 60 def sql_log_level @sql_log_level end |
#update_sql_log_level ⇒ Object (readonly)
Returns the value of attribute update_sql_log_level.
62 63 64 |
# File 'lib/bricolage/psqldatasource.rb', line 62 def update_sql_log_level @update_sql_log_level end |
#user ⇒ Object (readonly)
Returns the value of attribute user.
58 59 60 |
# File 'lib/bricolage/psqldatasource.rb', line 58 def user @user end |
Instance Method Details
#analyze(table) ⇒ Object
214 215 216 |
# File 'lib/bricolage/psqldatasource.rb', line 214 def analyze(table) open {|conn| conn.analyze(table) } end |
#clear_connection_pool ⇒ Object
not MT-safe
179 180 181 182 |
# File 'lib/bricolage/psqldatasource.rb', line 179 def clear_connection_pool @connection_pool.map(&:close) @connection_pool = [] end |
#drop_table(name) ⇒ Object
188 189 190 |
# File 'lib/bricolage/psqldatasource.rb', line 188 def drop_table(name) open {|conn| conn.drop_table(name) } end |
#drop_table_force(name) ⇒ Object
192 193 194 |
# File 'lib/bricolage/psqldatasource.rb', line 192 def drop_table_force(name) open {|conn| conn.drop_table_force(name) } end |
#execute(source, options = []) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/bricolage/psqldatasource.rb', line 73 def execute(source, = []) make_tmpfile(source, tmpdir: @tmpdir) {|path| st = command @psql, "--no-psqlrc", "--host=#{@host}", "--port=#{@port}", "--username=#{@user}", @database, '--echo-all', '-v', 'ON_ERROR_STOP=true', '-f', path, '--no-password', *, env: get_psql_env unless st.success? begin msg = LogLocator.slice_last_stderr(/^psql:.*?:\d+: ERROR: (.*)/, 1) rescue IOError => ex # slice_last_stderr may fail if stderr is not a file logger.error ex. msg = nil end end JobResult.for_process_status(st, msg) } end |
#get_psql_env ⇒ Object
96 97 98 99 100 101 102 103 104 |
# File 'lib/bricolage/psqldatasource.rb', line 96 def get_psql_env env = {} if @pgpass env["PGPASSFILE"] = @pgpass elsif @password env["PGPASSWORD"] = @password end env end |
#new_task ⇒ Object
64 65 66 |
# File 'lib/bricolage/psqldatasource.rb', line 64 def new_task PSQLTask.new(self) end |
#open ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/bricolage/psqldatasource.rb', line 120 def open retries = (ENV['BRICOLAGE_OPEN_RETRY_LIMIT'] || DEFAULT_RETRY_LIMIT).to_i begin conn = PostgresConnection.open_data_source(self) conn.execute_query('select 1') {} rescue PG::ConnectionBad, PG::UnableToSend => ex conn.close rescue nil retries -= 1 if retries >= 0 logger.warn "Could not open postgres connection; retry: #{ex.}" sleep 1 retry else raise end end if block_given? begin yield conn ensure conn.close end else return conn end end |
#open_for_batch ⇒ Object
68 69 70 71 |
# File 'lib/bricolage/psqldatasource.rb', line 68 def open_for_batch # do not call #open yield end |
#open_shared_connection ⇒ Object
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/bricolage/psqldatasource.rb', line 147 def open_shared_connection raise ParameterError, 'open_shared_connection require block' unless block_given? conn = nil until conn if conn_tmp = @connection_pool.shift begin conn_tmp.query('select 1') {} rescue PG::ConnectionBad, PG::UnableToSend # retry else # no exception occured conn = conn_tmp conn_tmp = nil ensure if conn_tmp conn_tmp.close conn_tmp = nil end end else # Get a fresh connection instead of pooled connections. conn = open end end begin yield conn ensure @connection_pool.push(conn) end end |
#password ⇒ Object
Ruby Library Interface
110 111 112 113 |
# File 'lib/bricolage/psqldatasource.rb', line 110 def password # FIXME: same user must not exist @password ||= read_password_from_pgpass(@pgpass, @user) end |
#query_batch(query, batch_size = 5000, &block) ⇒ Object
184 185 186 |
# File 'lib/bricolage/psqldatasource.rb', line 184 def query_batch(query, batch_size = 5000, &block) open {|conn| conn.query_batch(query, batch_size, &block) } end |
#read_password_from_pgpass(path, user) ⇒ Object
115 116 117 118 |
# File 'lib/bricolage/psqldatasource.rb', line 115 def read_password_from_pgpass(path, user) File.read(path).slice(/:#{user}:([^:\r\n]+)$/, 1) or raise ParameterError, "could not read password: #{path}, #{user}" end |
#select(table, &block) ⇒ Object
196 197 198 |
# File 'lib/bricolage/psqldatasource.rb', line 196 def select(table, &block) open {|conn| conn.select(table, &block) } end |
#vacuum(table) ⇒ Object
202 203 204 205 206 |
# File 'lib/bricolage/psqldatasource.rb', line 202 def vacuum(table) serialize_vacuum { open {|conn| conn.vacuum(table) } } end |
#vacuum_sort_only(table) ⇒ Object
208 209 210 211 212 |
# File 'lib/bricolage/psqldatasource.rb', line 208 def vacuum_sort_only(table) serialize_vacuum { open {|conn| conn.vacuum_sort_only(table) } } end |