Class: Bricolage::PSQLDataSource

Inherits:
DataSource show all
Includes:
CommandUtils, VacuumLock
Defined in:
lib/bricolage/psqldatasource.rb

Constant Summary collapse

DEFAULT_RETRY_LIMIT =
3

Constants included from VacuumLock

VacuumLock::DEFAULT_VACUUM_LOCK_FILE, VacuumLock::DEFAULT_VACUUM_LOCK_TIMEOUT

Constants inherited from DataSource

DataSource::CLASSES

Instance Attribute Summary collapse

Attributes inherited from DataSource

#context, #logger, #name

Instance Method Summary collapse

Methods included from VacuumLock

cleanup_vacuum_lock, create_lockfile_cmd, create_vacuum_lock_file, enable_vacuum_lock?, locking?, psql_serialize_vacuum_begin, psql_serialize_vacuum_end, serialize_vacuum, using, #using_vacuum_lock, vacuum_lock_parameters

Methods included from CommandUtils

#command, #make_tmpfile, #new_tmpfile_path

Methods inherited from DataSource

get_class, new_for_type, #redshift_loader_source?

Constructor Details

#initialize(host: 'localhost', port: 5439, database: 'dev', username: ENV['LOGNAME'], password: nil, pgpass: nil, encoding: nil, psql: 'psql', sql_log_level: nil, query_sql_log_level: nil, update_sql_log_level: nil, tmpdir: Dir.tmpdir) ⇒ PSQLDataSource

Returns a new instance of PSQLDataSource.

Raises:



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/bricolage/psqldatasource.rb', line 20

def initialize(
    host: 'localhost',
    port: 5439,
    database: 'dev',
    username: ENV['LOGNAME'],
    password: nil,
    pgpass: nil,
    encoding: nil,
    psql: 'psql',
    sql_log_level: nil,
    query_sql_log_level: nil,
    update_sql_log_level: nil,
    tmpdir: Dir.tmpdir)
  @host = host
  @port = port
  @database = database
  @user = username
  @password = password
  @pgpass = pgpass
  @encoding = encoding
  @psql = psql
  @sql_log_level = Logger.intern_severity(sql_log_level || Logger::DEBUG)
  @query_sql_log_level = Logger.intern_severity(query_sql_log_level || sql_log_level || Logger::DEBUG)
  @update_sql_log_level = Logger.intern_severity(update_sql_log_level || sql_log_level || Logger::INFO)
  @tmpdir = tmpdir
  @connection_pool = []
  raise ParameterError, "missing psql host" unless @host
  raise ParameterError, "missing psql port" unless @port
  raise ParameterError, "missing psql database" unless @database
  raise ParameterError, "missing psql username" unless @user
  unless @pgpass or @password
    raise ParameterError, "missing psql password"
  end
end

Instance Attribute Details

#databaseObject (readonly)

Returns the value of attribute database.



57
58
59
# File 'lib/bricolage/psqldatasource.rb', line 57

def database
  @database
end

#hostObject (readonly)

Returns the value of attribute host.



55
56
57
# File 'lib/bricolage/psqldatasource.rb', line 55

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



56
57
58
# File 'lib/bricolage/psqldatasource.rb', line 56

def port
  @port
end

#query_sql_log_levelObject (readonly)

Returns the value of attribute query_sql_log_level.



61
62
63
# File 'lib/bricolage/psqldatasource.rb', line 61

def query_sql_log_level
  @query_sql_log_level
end

#sql_log_levelObject (readonly)

Returns the value of attribute sql_log_level.



60
61
62
# File 'lib/bricolage/psqldatasource.rb', line 60

def sql_log_level
  @sql_log_level
end

#update_sql_log_levelObject (readonly)

Returns the value of attribute update_sql_log_level.



62
63
64
# File 'lib/bricolage/psqldatasource.rb', line 62

def update_sql_log_level
  @update_sql_log_level
end

#userObject (readonly)

Returns the value of attribute user.



58
59
60
# File 'lib/bricolage/psqldatasource.rb', line 58

def user
  @user
end

Instance Method Details

#analyze(table) ⇒ Object



214
215
216
# File 'lib/bricolage/psqldatasource.rb', line 214

def analyze(table)
  open {|conn| conn.analyze(table) }
end

#clear_connection_poolObject

not MT-safe



179
180
181
182
# File 'lib/bricolage/psqldatasource.rb', line 179

def clear_connection_pool
  @connection_pool.map(&:close)
  @connection_pool = []
end

#drop_table(name) ⇒ Object



188
189
190
# File 'lib/bricolage/psqldatasource.rb', line 188

def drop_table(name)
  open {|conn| conn.drop_table(name) }
end

#drop_table_force(name) ⇒ Object



192
193
194
# File 'lib/bricolage/psqldatasource.rb', line 192

def drop_table_force(name)
  open {|conn| conn.drop_table_force(name) }
end

#execute(source, options = []) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/bricolage/psqldatasource.rb', line 73

def execute(source, options = [])
  make_tmpfile(source, tmpdir: @tmpdir) {|path|
    st = command @psql, "--no-psqlrc", "--host=#{@host}", "--port=#{@port}",
        "--username=#{@user}", @database,
        '--echo-all',
        '-v', 'ON_ERROR_STOP=true',
        '-f', path,
        '--no-password',
        *options,
        env: get_psql_env
    unless st.success?
      begin
        msg = LogLocator.slice_last_stderr(/^psql:.*?:\d+: ERROR: (.*)/, 1)
      rescue IOError => ex
        # slice_last_stderr may fail if stderr is not a file
        logger.error ex.message
        msg = nil
      end
    end
    JobResult.for_process_status(st, msg)
  }
end

#get_psql_envObject



96
97
98
99
100
101
102
103
104
# File 'lib/bricolage/psqldatasource.rb', line 96

def get_psql_env
  env = {}
  if @pgpass
    env["PGPASSFILE"] = @pgpass
  elsif @password
    env["PGPASSWORD"] = @password
  end
  env
end

#new_taskObject



64
65
66
# File 'lib/bricolage/psqldatasource.rb', line 64

def new_task
  PSQLTask.new(self)
end

#openObject



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/bricolage/psqldatasource.rb', line 120

def open
  retries = (ENV['BRICOLAGE_OPEN_RETRY_LIMIT'] || DEFAULT_RETRY_LIMIT).to_i
  begin
    conn = PostgresConnection.open_data_source(self)
    conn.execute_query('select 1') {}
  rescue PG::ConnectionBad, PG::UnableToSend => ex
    conn.close rescue nil
    retries -= 1
    if retries >= 0
      logger.warn "Could not open postgres connection; retry: #{ex.message}"
      sleep 1
      retry
    else
      raise
    end
  end
  if block_given?
    begin
      yield conn
    ensure
      conn.close
    end
  else
    return conn
  end
end

#open_for_batchObject



68
69
70
71
# File 'lib/bricolage/psqldatasource.rb', line 68

def open_for_batch
  # do not call #open
  yield
end

#open_shared_connectionObject

Raises:



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/bricolage/psqldatasource.rb', line 147

def open_shared_connection
  raise ParameterError, 'open_shared_connection require block' unless block_given?
  conn = nil
  until conn
    if conn_tmp = @connection_pool.shift
      begin
        conn_tmp.query('select 1') {}
      rescue PG::ConnectionBad, PG::UnableToSend
        # retry
      else
        # no exception occured
        conn = conn_tmp
        conn_tmp = nil
      ensure
        if conn_tmp
          conn_tmp.close
          conn_tmp = nil
        end
      end
    else
      # Get a fresh connection instead of pooled connections.
      conn = open
    end
  end
  begin
    yield conn
  ensure
    @connection_pool.push(conn)
  end
end

#passwordObject

Ruby Library Interface



110
111
112
113
# File 'lib/bricolage/psqldatasource.rb', line 110

def password
  # FIXME: same user must not exist
  @password ||= read_password_from_pgpass(@pgpass, @user)
end

#query_batch(query, batch_size = 5000, &block) ⇒ Object



184
185
186
# File 'lib/bricolage/psqldatasource.rb', line 184

def query_batch(query, batch_size = 5000, &block)
  open {|conn| conn.query_batch(query, batch_size, &block) }
end

#read_password_from_pgpass(path, user) ⇒ Object



115
116
117
118
# File 'lib/bricolage/psqldatasource.rb', line 115

def read_password_from_pgpass(path, user)
  File.read(path).slice(/:#{user}:([^:\r\n]+)$/, 1) or
      raise ParameterError, "could not read password: #{path}, #{user}"
end

#select(table, &block) ⇒ Object



196
197
198
# File 'lib/bricolage/psqldatasource.rb', line 196

def select(table, &block)
  open {|conn| conn.select(table, &block) }
end

#vacuum(table) ⇒ Object



202
203
204
205
206
# File 'lib/bricolage/psqldatasource.rb', line 202

def vacuum(table)
  serialize_vacuum {
    open {|conn| conn.vacuum(table) }
  }
end

#vacuum_sort_only(table) ⇒ Object



208
209
210
211
212
# File 'lib/bricolage/psqldatasource.rb', line 208

def vacuum_sort_only(table)
  serialize_vacuum {
    open {|conn| conn.vacuum_sort_only(table) }
  }
end