Class: Bricolage::PostgresConnection

Inherits:
Object
  • Object
show all
Defined in:
lib/bricolage/postgresconnection.rb

Defined Under Namespace

Classes: Cursor, Transaction

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, ds, logger) ⇒ PostgresConnection

Returns a new instance of PostgresConnection.



38
39
40
41
42
43
44
# File 'lib/bricolage/postgresconnection.rb', line 38

def initialize(connection, ds, logger)
  @connection = connection
  @ds = ds
  @logger = logger
  @closed = false
  @connection_failed = false
end

Class Method Details

.install_signal_handlersObject



10
11
12
13
14
15
# File 'lib/bricolage/postgresconnection.rb', line 10

def PostgresConnection.install_signal_handlers
  Signal.trap(:TERM) {
    $stderr.puts 'receive SIGTERM'
    raise Interrupt, 'SIGTERM'
  }
end

.open_data_source(ds) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/bricolage/postgresconnection.rb', line 17

def PostgresConnection.open_data_source(ds)
  conn = _open_ds(ds)
  if block_given?
    begin
      yield conn
    ensure
      conn.close_force
    end
  else
    return conn
  end
end

Instance Method Details

#analyze(table) ⇒ Object



259
260
261
# File 'lib/bricolage/postgresconnection.rb', line 259

def analyze(table)
  execute "analyze #{table};"
end

#cancelObject



69
70
71
72
73
74
75
76
77
78
# File 'lib/bricolage/postgresconnection.rb', line 69

def cancel
  @logger.info "cancelling PostgreSQL query..."
  err = @connection.cancel
  if err
    @logger.error "could not cancel query: #{err}"
    raise PostgreSQLException, "could not cancel query: #{err}"
  else
    @logger.info "successfully cancelled"
  end
end

#cancel_forceObject



64
65
66
67
# File 'lib/bricolage/postgresconnection.rb', line 64

def cancel_force
  cancel
rescue PostgreSQLException
end

#closeObject



50
51
52
53
# File 'lib/bricolage/postgresconnection.rb', line 50

def close
  @connection.close
  @closed = true
end

#close_forceObject



55
56
57
58
# File 'lib/bricolage/postgresconnection.rb', line 55

def close_force
  close
rescue
end

#closed?Boolean

Returns:

  • (Boolean)


60
61
62
# File 'lib/bricolage/postgresconnection.rb', line 60

def closed?
  @closed
end

#drop_table(name) ⇒ Object



243
244
245
# File 'lib/bricolage/postgresconnection.rb', line 243

def drop_table(name)
  execute "drop table #{name} cascade;"
end

#drop_table_force(name) ⇒ Object



247
248
249
# File 'lib/bricolage/postgresconnection.rb', line 247

def drop_table_force(name)
  execute "drop table if exists #{name} cascade;"
end

#execute_query(query, &block) ⇒ Object Also known as: query



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/bricolage/postgresconnection.rb', line 132

def execute_query(query, &block)
  log_query query, @ds.query_sql_log_level
  rs = log_elapsed_time {
    querying {
      @connection.async_exec(query)
    }
  }
  return (yield rs)
rescue PG::ConnectionBad, PG::UnableToSend => ex
  @connection_failed = true
  raise ConnectionError.wrap(ex)
rescue PG::Error => ex
  raise PostgreSQLException.wrap(ex)
ensure
  rs.clear if rs
end

#execute_update(query) ⇒ Object Also known as: execute, update



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/bricolage/postgresconnection.rb', line 89

def execute_update(query)
  log_query query, @ds.update_sql_log_level
  rs = log_elapsed_time {
    querying {
      @connection.async_exec(query)
    }
  }
  return rs.to_a
rescue PG::ConnectionBad, PG::UnableToSend => ex
  @connection_failed = true
  raise ConnectionError.wrap(ex)
rescue PG::UniqueViolation => ex
  raise UniqueViolationException.wrap(ex)
rescue PG::Error => ex
  raise PostgreSQLException.wrap(ex)
ensure
  rs.clear if rs
end

#in_transaction?Boolean

Returns:

  • (Boolean)


157
158
159
# File 'lib/bricolage/postgresconnection.rb', line 157

def in_transaction?
  @connection.transaction_status == PG::Constants::PQTRANS_INTRANS
end

#lock(table) ⇒ Object



263
264
265
# File 'lib/bricolage/postgresconnection.rb', line 263

def lock(table)
  execute("lock #{table}")
end

#log_elapsed_timeObject



276
277
278
279
280
281
282
283
# File 'lib/bricolage/postgresconnection.rb', line 276

def log_elapsed_time
  b = Time.now
  return yield
ensure
  e = Time.now
  t = e - b
  @logger.log(@ds.sql_log_level) { "#{'%.1f' % t} secs" }
end

#log_query(query, log_level = @ds.sql_log_level) ⇒ Object



267
268
269
# File 'lib/bricolage/postgresconnection.rb', line 267

def log_query(query, log_level = @ds.sql_log_level)
  @logger.log(log_level) { "[#{@ds.name}] #{mask_secrets query}" }
end

#make_unique_cursor_nameObject



219
220
221
222
# File 'lib/bricolage/postgresconnection.rb', line 219

def make_unique_cursor_name
  seq = (Thread.current['bricolage_cursor_seq'] += 1)
  "cur_bric_#{$$}_#{'%X' % Thread.current.object_id}_#{seq}"
end

#open_cursor(query, name = nil) {|Cursor.new(name, self, @logger)| ... } ⇒ Object

Yields:

  • (Cursor.new(name, self, @logger))


206
207
208
209
210
211
212
213
214
215
# File 'lib/bricolage/postgresconnection.rb', line 206

def open_cursor(query, name = nil, &block)
  unless in_transaction?
    transaction {
      return open_cursor(query, &block)
    }
  end
  name ||= make_unique_cursor_name
  execute "declare #{name} cursor for #{query}"
  yield Cursor.new(name, self, @logger)
end

#query_batch(query, batch_size = 5000, &block) ⇒ Object



151
152
153
154
155
# File 'lib/bricolage/postgresconnection.rb', line 151

def query_batch(query, batch_size = 5000, &block)
  open_cursor(query) {|cur|
    cur.each_result_set(batch_size, &block)
  }
end

#query_row(query) ⇒ Object



124
125
126
# File 'lib/bricolage/postgresconnection.rb', line 124

def query_row(query)
  execute_query(query) {|rs| rs.to_a.first }
end

#query_rows(query) ⇒ Object



128
129
130
# File 'lib/bricolage/postgresconnection.rb', line 128

def query_rows(query)
  execute_query(query) {|rs| rs.to_a }
end

#query_value(query) ⇒ Object



115
116
117
118
# File 'lib/bricolage/postgresconnection.rb', line 115

def query_value(query)
  row = query_row(query)
  row ? row.values.first : nil
end

#query_values(query) ⇒ Object



120
121
122
# File 'lib/bricolage/postgresconnection.rb', line 120

def query_values(query)
  execute_query(query) {|rs| rs.to_a }.flat_map {|rec| rec.values }
end

#select(table, &block) ⇒ Object



111
112
113
# File 'lib/bricolage/postgresconnection.rb', line 111

def select(table, &block)
  execute_query("select * from #{table}", &block)
end

#sourceObject



46
47
48
# File 'lib/bricolage/postgresconnection.rb', line 46

def source
  @connection
end

#transactionObject



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/bricolage/postgresconnection.rb', line 161

def transaction
  execute 'begin transaction'
  txn = Transaction.new(self)
  begin
    yield txn
  rescue
    begin
      if not txn.committed? and not @connection_failed
        txn.abort
      end
    rescue => ex
      @logger.error "SQL error on transaction abort: #{ex.message} (ignored)"
    end
    raise
  ensure
    txn.commit unless txn.committed?
  end
end

#vacuum(table) ⇒ Object



251
252
253
# File 'lib/bricolage/postgresconnection.rb', line 251

def vacuum(table)
  execute "vacuum #{table};"
end

#vacuum_sort_only(table) ⇒ Object



255
256
257
# File 'lib/bricolage/postgresconnection.rb', line 255

def vacuum_sort_only(table)
  execute "vacuum sort only #{table};"
end