Class: PG::Connection
- Inherits:
-
Object
- Object
- PG::Connection
- Defined in:
- lib/tep/pg.rb
Instance Attribute Summary collapse
-
#last_error_message ⇒ Object
Error context for the most recent exception raised by this connection.
-
#last_result_rh ⇒ Object
Error context for the most recent exception raised by this connection.
-
#last_sqlstate ⇒ Object
Error context for the most recent exception raised by this connection.
-
#pgh ⇒ Object
:pghrather than:handle-- same poly-dispatch widening concern as Tep::SQLite#dbh (sharing a method name with Tep::Handler#handle confuses spinel's same-named-imeth-across- classes unifier).
Class Method Summary collapse
-
.async_connect(conninfo) ⇒ Object
Drive PQconnectStart + PQconnectPoll, parking on io_wait between poll calls.
-
.drain_remaining_results(pgh) ⇒ Object
After the first PQgetResult returned a real Result, libpq requires the conn be drained via additional PQgetResult calls until NULL is returned.
-
.drain_send(pgh) ⇒ Object
Drain libpq's send buffer.
-
.escape_string(s) ⇒ Object
Class-method form -- ruby-pg allows escape_string and quote_ident without a live conn.
- .quote_ident(s) ⇒ Object
-
.raise_send_failure(conn) ⇒ Object
PQsendQuery returned 0 (immediate failure -- conn already closed, send buffer error, etc.).
-
.record_error_if_any(conn, r) ⇒ Object
If the Result is in an error state, mirror SQLSTATE + message + result-handle onto the conn so post-rescue (or post-
if !r.ok?) callers can read them viaconn.last_*. -
.wait_for_result_ready(pgh) ⇒ Object
Wait until PQisBusy returns 0 (PQgetResult won't block).
Instance Method Summary collapse
-
#async_exec(sql) ⇒ Object
Explicit async exec.
-
#async_exec_params(sql, params) ⇒ Object
Parameterised async exec.
-
#async_exec_params_after_clear(sql) ⇒ Object
Internal: param accumulator has already been populated by the caller (either exec_params routing here on context detect, or async_exec_params after its own push loop).
- #close ⇒ Object
- #connected? ⇒ Boolean
- #error_message ⇒ Object
- #escape_identifier(s) ⇒ Object
- #escape_literal(s) ⇒ Object
- #escape_string(s) ⇒ Object
-
#exec(sql) ⇒ Object
Run a no-params query.
-
#exec_params(sql, params) ⇒ Object
Parameterised query with positional binds ($1, $2, ...).
- #finish ⇒ Object
-
#initialize(opts) ⇒ Connection
constructor
A new instance of Connection.
- #last_notify_channel ⇒ Object
- #last_notify_payload ⇒ Object
-
#listen(channel) ⇒ Object
LISTEN / NOTIFY (Battery 2 chunk 2.2).
- #notify(channel, payload) ⇒ Object
-
#poll_notification(timeout_ms) ⇒ Object
Block up to
timeout_mswaiting for one notification on the connection. - #reset ⇒ Object
- #server_version ⇒ Object
- #status ⇒ Object
- #transaction_status ⇒ Object
- #unlisten(channel) ⇒ Object
Constructor Details
#initialize(opts) ⇒ Connection
Returns a new instance of Connection.
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/tep/pg.rb', line 188 def initialize(opts) @pgh = -1 @last_sqlstate = "" @last_error_message = "" @last_result_rh = -1 if opts.is_a?(String) if Tep::Scheduler.scheduled_context? h = Connection.async_connect(opts) else h = Pg.tep_pg_connect(opts) end else # Hash-conninfo form: pack the key/value pairs into NUL-delimited # buffers for the C shim. (`opts` narrows to Hash in this # is_a?(String) ELSE branch -- the narrowing gap that blocked the # re-pin, matz/spinel#1434, is fixed as of the SPINEL_PIN bump.) keys = "" vals = "" n = 0 opts.each do |k, v| keys = keys + k + "\0" vals = vals + v + "\0" n += 1 end h = Pg.tep_pg_connect_kv(keys, vals, n) end if h < 0 # Slot 0 holds the most recent connect-failure error message # (PQstatus on a failed PQconnectdb still gives a readable # error, but the conn itself is closed by the time we get # here -- the shim stashes the message before PQfinish). @last_error_message = Pg.(0) @last_sqlstate = "" # Connection-failure surfaces via `c.last_error_message` + # `c.connected?` after the constructor returns -- the # constructor stays non-raising on purpose (PG::Pool seeds its # free list with `PG::Connection.new("")` before a server is # reachable; a raising constructor would blow up at module # load). Callers must check `c.connected?` before exec. NB: # this is the lone non-raising path -- query methods raise # PG::Error subclasses now that spinel supports namespaced # raise + rescue (matz/spinel#627 + #1041). end @pgh = h end |
Instance Attribute Details
#last_error_message ⇒ Object
Error context for the most recent exception raised by this
connection. Spinel's raise X.new(msg, ...) lowering doesn't
handle custom initializers (#622), so the SQLSTATE / message /
owning-result-handle live here instead. Read after
rescue PG::Error => e:
begin; conn.exec_params(sql, params)
rescue PG::Error => e
sqlstate = conn.last_sqlstate
full_msg = conn.
end
AR's translate_exception_class(message, sql, binds) uses
e.is_a?(PG::UniqueViolation) etc., which still works -- the
class hierarchy is intact; only the per-exception accessors
move to the connection.
186 187 188 |
# File 'lib/tep/pg.rb', line 186 def @last_error_message end |
#last_result_rh ⇒ Object
Error context for the most recent exception raised by this
connection. Spinel's raise X.new(msg, ...) lowering doesn't
handle custom initializers (#622), so the SQLSTATE / message /
owning-result-handle live here instead. Read after
rescue PG::Error => e:
begin; conn.exec_params(sql, params)
rescue PG::Error => e
sqlstate = conn.last_sqlstate
full_msg = conn.
end
AR's translate_exception_class(message, sql, binds) uses
e.is_a?(PG::UniqueViolation) etc., which still works -- the
class hierarchy is intact; only the per-exception accessors
move to the connection.
186 187 188 |
# File 'lib/tep/pg.rb', line 186 def last_result_rh @last_result_rh end |
#last_sqlstate ⇒ Object
Error context for the most recent exception raised by this
connection. Spinel's raise X.new(msg, ...) lowering doesn't
handle custom initializers (#622), so the SQLSTATE / message /
owning-result-handle live here instead. Read after
rescue PG::Error => e:
begin; conn.exec_params(sql, params)
rescue PG::Error => e
sqlstate = conn.last_sqlstate
full_msg = conn.
end
AR's translate_exception_class(message, sql, binds) uses
e.is_a?(PG::UniqueViolation) etc., which still works -- the
class hierarchy is intact; only the per-exception accessors
move to the connection.
186 187 188 |
# File 'lib/tep/pg.rb', line 186 def last_sqlstate @last_sqlstate end |
#pgh ⇒ Object
:pgh rather than :handle -- same poly-dispatch widening
concern as Tep::SQLite#dbh (sharing a method name with
Tep::Handler#handle confuses spinel's same-named-imeth-across-
classes unifier).
169 170 171 |
# File 'lib/tep/pg.rb', line 169 def pgh @pgh end |
Class Method Details
.async_connect(conninfo) ⇒ Object
Drive PQconnectStart + PQconnectPoll, parking on io_wait
between poll calls. Returns the conn slot (>=1) on success
or -1 on failure. The C shim's tep_pg_connect_poll stashes
the libpq error message on a FAILED return so
Pg.tep_pg_error_message(0) still surfaces the diagnostic
for the Connection.new "connect failed" branch.
libpq's PostgresPollingStatusType:
0 = PGRES_POLLING_FAILED
1 = PGRES_POLLING_READING (wait for fd READ-ready)
2 = PGRES_POLLING_WRITING (wait for fd WRITE-ready)
3 = PGRES_POLLING_OK (connected; stop polling)
449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 |
# File 'lib/tep/pg.rb', line 449 def self.async_connect(conninfo) h = Pg.tep_pg_connect_start(conninfo) if h < 0 return -1 end fd = Pg.tep_pg_socket(h) while true state = Pg.tep_pg_connect_poll(h) if state == 3 # PGRES_POLLING_OK Pg.tep_pg_set_client_encoding(h, "UTF8") return h end if state == 0 # PGRES_POLLING_FAILED. The shim has already stashed the # error message; we PQfinish the slot. Pg.tep_pg_finish(h) return -1 end mode = state == 1 ? Tep::Scheduler::READ : Tep::Scheduler::WRITE Tep::Scheduler.io_wait(fd, mode, 10) end -1 end |
.drain_remaining_results(pgh) ⇒ Object
After the first PQgetResult returned a real Result, libpq requires the conn be drained via additional PQgetResult calls until NULL is returned. This is a fast in-memory drain (no network), but it has to happen between async_exec calls or the next send_query will fail. Each tep_pg_get_result call that produces a non-NULL result stashes it in the slot table; we PQclear those immediately since they're trailing status results we don't expose.
536 537 538 539 540 541 542 543 544 545 546 547 |
# File 'lib/tep/pg.rb', line 536 def self.drain_remaining_results(pgh) while true rh = Pg.tep_pg_get_result(pgh) if rh < 0 return 0 end # A trailing result -- shouldn't normally happen for # single-statement queries, but defensively free. Pg.tep_pg_clear(rh) end 0 end |
.drain_send(pgh) ⇒ Object
Drain libpq's send buffer. PQflush returns 0 when done; 1 when the kernel send-buffer is full and we should park on WRITE-ready; -1 on error. Timeout is generous (10s); a genuinely-stuck PG is the rare case worth bailing on.
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 |
# File 'lib/tep/pg.rb', line 493 def self.drain_send(pgh) fd = Pg.tep_pg_socket(pgh) while true rc = Pg.tep_pg_flush(pgh) if rc == 0 return 0 end if rc < 0 return -1 end # rc == 1: send buffer full, park on writability. Tep::Scheduler.io_wait(fd, Tep::Scheduler::WRITE, 10) end 0 end |
.escape_string(s) ⇒ Object
Class-method form -- ruby-pg allows escape_string and quote_ident without a live conn. We route through slot 0 which the shim treats as "no conn, fall back to standalone PQescapeString". Use the instance method when a conn is available -- it goes through PQescapeStringConn which is the standards-compliant path.
567 568 569 |
# File 'lib/tep/pg.rb', line 567 def self.escape_string(s) Pg.tep_pg_escape_string(0, s) end |
.quote_ident(s) ⇒ Object
571 572 573 574 575 576 |
# File 'lib/tep/pg.rb', line 571 def self.quote_ident(s) # PQescapeIdentifier requires a conn; without one we fall # through to "" which is wrong but rare. Apps with a live # PG::Connection should use the instance method. Pg.tep_pg_escape_identifier(0, s) end |
.raise_send_failure(conn) ⇒ Object
PQsendQuery returned 0 (immediate failure -- conn already closed, send buffer error, etc.). Mirror the error onto the conn's last_* and raise, matching the exec error path (ruby-pg surfaces a send failure as PG::UnableToSend < PG::Error). No SQLSTATE is available pre-result, so this maps to the transport leaf rather than going through raise_for_sqlstate.
482 483 484 485 486 487 |
# File 'lib/tep/pg.rb', line 482 def self.raise_send_failure(conn) conn.last_sqlstate = "" conn. = conn. conn.last_result_rh = -1 raise PG::UnableToSend, conn. end |
.record_error_if_any(conn, r) ⇒ Object
If the Result is in an error state, mirror SQLSTATE +
message + result-handle onto the conn so post-rescue (or
post-if !r.ok?) callers can read them via conn.last_*.
No raise here -- see the docstring on exec for why.
582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 |
# File 'lib/tep/pg.rb', line 582 def self.record_error_if_any(conn, r) st = r.status if st == Pg::RES_TUPLES || st == Pg::RES_COMMAND || st == Pg::RES_EMPTY return 0 end sqlstate = r.error_field(PG::DIAG_SQLSTATE) msg = r. if msg.length == 0 msg = conn. end conn.last_sqlstate = sqlstate conn. = msg # Free the failed PGresult NOW: once we raise out of # exec/exec_params the caller's `r.clear` never runs, so this is # the only chance to release it. The SQLSTATE / message are # already copied onto conn.last_* (Strings) for post-rescue # inspection, so dropping the handle loses nothing callers need. conn.last_result_rh = -1 r.clear # ruby-pg / AR parity: raise the SQLSTATE-mapped PG::Error # subclass (live since matz/spinel#627 + #1041 -- namespaced # raise + hierarchy-walking rescue). Callers `rescue # PG::UniqueViolation` (leaf) or `rescue PG::Error` (base). PG.raise_for_sqlstate(sqlstate, msg) 0 end |
.wait_for_result_ready(pgh) ⇒ Object
Wait until PQisBusy returns 0 (PQgetResult won't block). Pumps PQconsumeInput in between io_wait calls so the libpq state machine advances. Timeout is generous (30s) since the query itself can take that long; the io_wait timeout is per-iteration, not cumulative.
514 515 516 517 518 519 520 521 522 523 524 525 526 |
# File 'lib/tep/pg.rb', line 514 def self.wait_for_result_ready(pgh) fd = Pg.tep_pg_socket(pgh) while true if Pg.tep_pg_consume_input(pgh) != 1 return -1 end if Pg.tep_pg_is_busy(pgh) == 0 return 0 end Tep::Scheduler.io_wait(fd, Tep::Scheduler::READ, 30) end 0 end |
Instance Method Details
#async_exec(sql) ⇒ Object
Explicit async exec. Same shape as exec but doesn't
context-detect -- always uses the libpq async surface. If
called outside Tep::Server::Scheduled, Tep::Scheduler.io_wait
falls back to a single-shot poll(2), so this still works
under prefork (just without the cross-fiber concurrency
win).
381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 |
# File 'lib/tep/pg.rb', line 381 def async_exec(sql) Pg.tep_pg_set_nonblocking(@pgh, 1) ok = Pg.tep_pg_send_query(@pgh, sql) if ok != 1 Connection.raise_send_failure(self) end Connection.drain_send(@pgh) Connection.wait_for_result_ready(@pgh) rh = Pg.tep_pg_get_result(@pgh) r = PG::Result.new(rh) # Drain trailing NULL terminator (libpq requires reading # until PQgetResult returns NULL to mark the conn ready for # the next send_query). Connection.drain_remaining_results(@pgh) Connection.record_error_if_any(self, r) r end |
#async_exec_params(sql, params) ⇒ Object
Parameterised async exec. params is an Array of
String / Integer / nil; same conversion as exec_params.
401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 |
# File 'lib/tep/pg.rb', line 401 def async_exec_params(sql, params) Pg.tep_pg_param_clear i = 0 n = params.length while i < n p = params[i] if p == nil Pg.tep_pg_param_push_null else Pg.tep_pg_param_push_str(p.to_s) end i += 1 end async_exec_params_after_clear(sql) end |
#async_exec_params_after_clear(sql) ⇒ Object
Internal: param accumulator has already been populated by the caller (either exec_params routing here on context detect, or async_exec_params after its own push loop).
420 421 422 423 424 425 426 427 428 429 430 431 432 433 |
# File 'lib/tep/pg.rb', line 420 def async_exec_params_after_clear(sql) Pg.tep_pg_set_nonblocking(@pgh, 1) ok = Pg.tep_pg_send_query_params(@pgh, sql) if ok != 1 Connection.raise_send_failure(self) end Connection.drain_send(@pgh) Connection.wait_for_result_ready(@pgh) rh = Pg.tep_pg_get_result(@pgh) r = PG::Result.new(rh) Connection.drain_remaining_results(@pgh) Connection.record_error_if_any(self, r) r end |
#close ⇒ Object
238 239 240 241 242 243 244 |
# File 'lib/tep/pg.rb', line 238 def close if @pgh >= 0 Pg.tep_pg_finish(@pgh) @pgh = -1 end 0 end |
#connected? ⇒ Boolean
234 235 236 |
# File 'lib/tep/pg.rb', line 234 def connected? @pgh >= 0 end |
#error_message ⇒ Object
269 270 271 |
# File 'lib/tep/pg.rb', line 269 def @pgh < 0 ? "" : Pg.(@pgh) end |
#escape_identifier(s) ⇒ Object
553 554 555 |
# File 'lib/tep/pg.rb', line 553 def escape_identifier(s) Pg.tep_pg_escape_identifier(@pgh, s) end |
#escape_literal(s) ⇒ Object
557 558 559 |
# File 'lib/tep/pg.rb', line 557 def escape_literal(s) Pg.tep_pg_escape_literal(@pgh, s) end |
#escape_string(s) ⇒ Object
549 550 551 |
# File 'lib/tep/pg.rb', line 549 def escape_string(s) Pg.tep_pg_escape_string(@pgh, s) end |
#exec(sql) ⇒ Object
Run a no-params query. Returns a PG::Result on success.
ON ERROR IT RAISES the SQLSTATE-mapped PG::Error subclass
(PG::UniqueViolation, PG::UndefinedTable, ... -> PG::ServerError
for unmapped states) -- the ruby-pg / AR shape. The failed
PGresult is freed before the raise; the SQLSTATE / message stay
readable on conn.last_sqlstate / #last_error_message for
post-rescue inspection:
begin
c.exec(sql)
rescue PG::UniqueViolation => e
... # e.message + c.last_sqlstate
rescue PG::Error => e # base catches any server error
...
end
Raising (instead of the old Result-on-error sentinel) became
viable once spinel learned namespaced raise + hierarchy-walking
rescue (matz/spinel#627 + #1041). NB: PG.connect is the one path
that still does NOT raise -- it returns a connection-failed
instance so PG::Pool can type-seed without a live server (check
conn.connected?).
Under Tep::Server::Scheduled this routes through the libpq
async surface (PQsendQuery + PQflush + PQconsumeInput parked
on Tep::Scheduler.io_wait), so other fibers in the same
worker can run while the query is in flight. Under prefork
it routes through the blocking PQexec. Both raise identically.
340 341 342 343 344 345 346 347 348 |
# File 'lib/tep/pg.rb', line 340 def exec(sql) if Tep::Scheduler.scheduled_context? return async_exec(sql) end rh = Pg.tep_pg_exec(@pgh, sql) r = PG::Result.new(rh) Connection.record_error_if_any(self, r) r end |
#exec_params(sql, params) ⇒ Object
Parameterised query with positional binds ($1, $2, ...).
params is an Array of String / Integer / nil. Same
raise-on-error contract + auto-routing as exec.
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 |
# File 'lib/tep/pg.rb', line 353 def exec_params(sql, params) Pg.tep_pg_param_clear i = 0 n = params.length while i < n p = params[i] if p == nil Pg.tep_pg_param_push_null else Pg.tep_pg_param_push_str(p.to_s) end i += 1 end if Tep::Scheduler.scheduled_context? return async_exec_params_after_clear(sql) end rh = Pg.tep_pg_exec_params(@pgh, sql) r = PG::Result.new(rh) Connection.record_error_if_any(self, r) r end |
#finish ⇒ Object
246 247 248 |
# File 'lib/tep/pg.rb', line 246 def finish close end |
#last_notify_channel ⇒ Object
303 304 305 |
# File 'lib/tep/pg.rb', line 303 def last_notify_channel Pg.tep_pg_notify_channel end |
#last_notify_payload ⇒ Object
307 308 309 |
# File 'lib/tep/pg.rb', line 307 def last_notify_payload Pg.tep_pg_notify_payload end |
#listen(channel) ⇒ Object
LISTEN / NOTIFY (Battery 2 chunk 2.2). Used by Tep::Broadcast's PG backend for cross-worker pub/sub. Channel names must be safe SQL identifiers (no caller- controlled interpolation -- use a hard-coded constant). Payload max size is 8000 bytes per PG default.
278 279 280 281 |
# File 'lib/tep/pg.rb', line 278 def listen(channel) return -1 if @pgh < 0 Pg.tep_pg_listen(@pgh, channel) end |
#notify(channel, payload) ⇒ Object
288 289 290 291 |
# File 'lib/tep/pg.rb', line 288 def notify(channel, payload) return -1 if @pgh < 0 Pg.tep_pg_notify(@pgh, channel, payload) end |
#poll_notification(timeout_ms) ⇒ Object
Block up to timeout_ms waiting for one notification on the
connection. Returns 1 on receipt (caller then reads
#last_notify_channel + #last_notify_payload), 0 on timeout,
-1 on connection error. Connection must already be in LISTEN
mode for the channel of interest.
298 299 300 301 |
# File 'lib/tep/pg.rb', line 298 def poll_notification(timeout_ms) return -1 if @pgh < 0 Pg.tep_pg_poll_notification(@pgh, timeout_ms) end |
#reset ⇒ Object
250 251 252 253 254 255 |
# File 'lib/tep/pg.rb', line 250 def reset if @pgh >= 0 Pg.tep_pg_reset(@pgh) end self end |
#server_version ⇒ Object
265 266 267 |
# File 'lib/tep/pg.rb', line 265 def server_version @pgh < 0 ? 0 : Pg.tep_pg_server_version(@pgh) end |
#status ⇒ Object
257 258 259 |
# File 'lib/tep/pg.rb', line 257 def status @pgh < 0 ? PG::CONNECTION_BAD : Pg.tep_pg_status(@pgh) end |
#transaction_status ⇒ Object
261 262 263 |
# File 'lib/tep/pg.rb', line 261 def transaction_status @pgh < 0 ? PG::PQTRANS_UNKNOWN : Pg.tep_pg_transaction_status(@pgh) end |
#unlisten(channel) ⇒ Object
283 284 285 286 |
# File 'lib/tep/pg.rb', line 283 def unlisten(channel) return -1 if @pgh < 0 Pg.tep_pg_unlisten(@pgh, channel) end |