Class: Bricolage::PostgresConnection
- Inherits:
-
Object
- Object
- Bricolage::PostgresConnection
show all
- Defined in:
- lib/bricolage/postgresconnection.rb
Defined Under Namespace
Classes: Cursor, Transaction
Class Method Summary
collapse
Instance Method Summary
collapse
-
#analyze(table) ⇒ Object
-
#cancel ⇒ Object
-
#cancel_force ⇒ Object
-
#close ⇒ Object
-
#close_force ⇒ Object
-
#closed? ⇒ Boolean
-
#drop_table(name) ⇒ Object
-
#drop_table_force(name) ⇒ Object
-
#execute_query(query, &block) ⇒ Object
(also: #query)
-
#execute_update(query) ⇒ Object
(also: #execute, #update)
-
#in_transaction? ⇒ Boolean
-
#initialize(connection, ds, logger) ⇒ PostgresConnection
constructor
A new instance of PostgresConnection.
-
#lock(table) ⇒ Object
-
#log_elapsed_time ⇒ Object
-
#log_query(query, log_level = @ds.sql_log_level) ⇒ Object
-
#make_unique_cursor_name ⇒ Object
-
#open_cursor(query, name = nil) {|Cursor.new(name, self, @logger)| ... } ⇒ Object
-
#query_batch(query, batch_size = 5000, &block) ⇒ Object
-
#query_row(query) ⇒ Object
-
#query_rows(query) ⇒ Object
-
#query_value(query) ⇒ Object
-
#query_values(query) ⇒ Object
-
#select(table, &block) ⇒ Object
-
#source ⇒ Object
-
#transaction ⇒ Object
-
#vacuum(table) ⇒ Object
-
#vacuum_sort_only(table) ⇒ Object
Constructor Details
#initialize(connection, ds, logger) ⇒ PostgresConnection
Returns a new instance of PostgresConnection.
38
39
40
41
42
43
44
|
# File 'lib/bricolage/postgresconnection.rb', line 38
def initialize(connection, ds, logger)
@connection = connection
@ds = ds
@logger = logger
@closed = false
@connection_failed = false
end
|
Class Method Details
.install_signal_handlers ⇒ Object
10
11
12
13
14
15
|
# File 'lib/bricolage/postgresconnection.rb', line 10
def PostgresConnection.install_signal_handlers
Signal.trap(:TERM) {
$stderr.puts 'receive SIGTERM'
raise Interrupt, 'SIGTERM'
}
end
|
.open_data_source(ds) ⇒ Object
17
18
19
20
21
22
23
24
25
26
27
28
|
# File 'lib/bricolage/postgresconnection.rb', line 17
def PostgresConnection.open_data_source(ds)
conn = _open_ds(ds)
if block_given?
begin
yield conn
ensure
conn.close_force
end
else
return conn
end
end
|
Instance Method Details
#analyze(table) ⇒ Object
259
260
261
|
# File 'lib/bricolage/postgresconnection.rb', line 259
def analyze(table)
execute "analyze #{table};"
end
|
#cancel ⇒ Object
69
70
71
72
73
74
75
76
77
78
|
# File 'lib/bricolage/postgresconnection.rb', line 69
def cancel
@logger.info "cancelling PostgreSQL query..."
err = @connection.cancel
if err
@logger.error "could not cancel query: #{err}"
raise PostgreSQLException, "could not cancel query: #{err}"
else
@logger.info "successfully cancelled"
end
end
|
#cancel_force ⇒ Object
64
65
66
67
|
# File 'lib/bricolage/postgresconnection.rb', line 64
def cancel_force
cancel
rescue PostgreSQLException
end
|
#close ⇒ Object
50
51
52
53
|
# File 'lib/bricolage/postgresconnection.rb', line 50
def close
@connection.close
@closed = true
end
|
#close_force ⇒ Object
55
56
57
58
|
# File 'lib/bricolage/postgresconnection.rb', line 55
def close_force
close
rescue
end
|
#closed? ⇒ Boolean
60
61
62
|
# File 'lib/bricolage/postgresconnection.rb', line 60
def closed?
@closed
end
|
#drop_table(name) ⇒ Object
243
244
245
|
# File 'lib/bricolage/postgresconnection.rb', line 243
def drop_table(name)
execute "drop table #{name} cascade;"
end
|
#drop_table_force(name) ⇒ Object
247
248
249
|
# File 'lib/bricolage/postgresconnection.rb', line 247
def drop_table_force(name)
execute "drop table if exists #{name} cascade;"
end
|
#execute_query(query, &block) ⇒ Object
Also known as:
query
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
# File 'lib/bricolage/postgresconnection.rb', line 132
def execute_query(query, &block)
log_query query, @ds.query_sql_log_level
rs = log_elapsed_time {
querying {
@connection.async_exec(query)
}
}
return (yield rs)
rescue PG::ConnectionBad, PG::UnableToSend => ex
@connection_failed = true
raise ConnectionError.wrap(ex)
rescue PG::Error => ex
raise PostgreSQLException.wrap(ex)
ensure
rs.clear if rs
end
|
#execute_update(query) ⇒ Object
Also known as:
execute, update
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
# File 'lib/bricolage/postgresconnection.rb', line 89
def execute_update(query)
log_query query, @ds.update_sql_log_level
rs = log_elapsed_time {
querying {
@connection.async_exec(query)
}
}
return rs.to_a
rescue PG::ConnectionBad, PG::UnableToSend => ex
@connection_failed = true
raise ConnectionError.wrap(ex)
rescue PG::UniqueViolation => ex
raise UniqueViolationException.wrap(ex)
rescue PG::Error => ex
raise PostgreSQLException.wrap(ex)
ensure
rs.clear if rs
end
|
#in_transaction? ⇒ Boolean
157
158
159
|
# File 'lib/bricolage/postgresconnection.rb', line 157
def in_transaction?
@connection.transaction_status == PG::Constants::PQTRANS_INTRANS
end
|
#lock(table) ⇒ Object
263
264
265
|
# File 'lib/bricolage/postgresconnection.rb', line 263
def lock(table)
execute("lock #{table}")
end
|
#log_elapsed_time ⇒ Object
276
277
278
279
280
281
282
283
|
# File 'lib/bricolage/postgresconnection.rb', line 276
def log_elapsed_time
b = Time.now
return yield
ensure
e = Time.now
t = e - b
@logger.log(@ds.sql_log_level) { "#{'%.1f' % t} secs" }
end
|
#log_query(query, log_level = @ds.sql_log_level) ⇒ Object
267
268
269
|
# File 'lib/bricolage/postgresconnection.rb', line 267
def log_query(query, log_level = @ds.sql_log_level)
@logger.log(log_level) { "[#{@ds.name}] #{mask_secrets query}" }
end
|
#make_unique_cursor_name ⇒ Object
219
220
221
222
|
# File 'lib/bricolage/postgresconnection.rb', line 219
def make_unique_cursor_name
seq = (Thread.current['bricolage_cursor_seq'] += 1)
"cur_bric_#{$$}_#{'%X' % Thread.current.object_id}_#{seq}"
end
|
#open_cursor(query, name = nil) {|Cursor.new(name, self, @logger)| ... } ⇒ Object
206
207
208
209
210
211
212
213
214
215
|
# File 'lib/bricolage/postgresconnection.rb', line 206
def open_cursor(query, name = nil, &block)
unless in_transaction?
transaction {
return open_cursor(query, &block)
}
end
name ||= make_unique_cursor_name
execute "declare #{name} cursor for #{query}"
yield Cursor.new(name, self, @logger)
end
|
#query_batch(query, batch_size = 5000, &block) ⇒ Object
151
152
153
154
155
|
# File 'lib/bricolage/postgresconnection.rb', line 151
def query_batch(query, batch_size = 5000, &block)
open_cursor(query) {|cur|
cur.each_result_set(batch_size, &block)
}
end
|
#query_row(query) ⇒ Object
124
125
126
|
# File 'lib/bricolage/postgresconnection.rb', line 124
def query_row(query)
execute_query(query) {|rs| rs.to_a.first }
end
|
#query_rows(query) ⇒ Object
128
129
130
|
# File 'lib/bricolage/postgresconnection.rb', line 128
def query_rows(query)
execute_query(query) {|rs| rs.to_a }
end
|
#query_value(query) ⇒ Object
115
116
117
118
|
# File 'lib/bricolage/postgresconnection.rb', line 115
def query_value(query)
row = query_row(query)
row ? row.values.first : nil
end
|
#query_values(query) ⇒ Object
120
121
122
|
# File 'lib/bricolage/postgresconnection.rb', line 120
def query_values(query)
execute_query(query) {|rs| rs.to_a }.flat_map {|rec| rec.values }
end
|
#select(table, &block) ⇒ Object
111
112
113
|
# File 'lib/bricolage/postgresconnection.rb', line 111
def select(table, &block)
execute_query("select * from #{table}", &block)
end
|
#source ⇒ Object
46
47
48
|
# File 'lib/bricolage/postgresconnection.rb', line 46
def source
@connection
end
|
#transaction ⇒ Object
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
|
# File 'lib/bricolage/postgresconnection.rb', line 161
def transaction
execute 'begin transaction'
txn = Transaction.new(self)
begin
yield txn
rescue
begin
if not txn.committed? and not @connection_failed
txn.abort
end
rescue => ex
@logger.error "SQL error on transaction abort: #{ex.message} (ignored)"
end
raise
ensure
txn.commit unless txn.committed?
end
end
|
#vacuum(table) ⇒ Object
251
252
253
|
# File 'lib/bricolage/postgresconnection.rb', line 251
def vacuum(table)
execute "vacuum #{table};"
end
|
#vacuum_sort_only(table) ⇒ Object
255
256
257
|
# File 'lib/bricolage/postgresconnection.rb', line 255
def vacuum_sort_only(table)
execute "vacuum sort only #{table};"
end
|