Class: PG::Connection
- Inherits:
-
Object
- Object
- PG::Connection
- Defined in:
- lib/tep/pg.rb
Instance Attribute Summary collapse
-
#last_error_message ⇒ Object
Error context for the most recent exception raised by this connection.
-
#last_result_rh ⇒ Object
Error context for the most recent exception raised by this connection.
-
#last_sqlstate ⇒ Object
Error context for the most recent exception raised by this connection.
-
#pgh ⇒ Object
‘:pgh` rather than `:handle` – same poly-dispatch widening concern as Tep::SQLite#dbh (sharing a method name with Tep::Handler#handle confuses spinel’s same-named-imeth-across- classes unifier).
Class Method Summary collapse
-
.async_connect(conninfo) ⇒ Object
Drive PQconnectStart + PQconnectPoll, parking on io_wait between poll calls.
-
.drain_remaining_results(pgh) ⇒ Object
After the first PQgetResult returned a real Result, libpq requires the conn be drained via additional PQgetResult calls until NULL is returned.
-
.drain_send(pgh) ⇒ Object
Drain libpq’s send buffer.
-
.escape_string(s) ⇒ Object
Class-method form – ruby-pg allows escape_string and quote_ident without a live conn.
- .quote_ident(s) ⇒ Object
-
.raise_send_failure(conn) ⇒ Object
PQsendQuery returned 0 (immediate failure – conn already closed, send buffer error, etc.).
-
.record_error_if_any(conn, r) ⇒ Object
If the Result is in an error state, mirror SQLSTATE + message + result-handle onto the conn so post-rescue (or post-‘if !r.ok?`) callers can read them via `conn.last_*`.
-
.wait_for_result_ready(pgh) ⇒ Object
Wait until PQisBusy returns 0 (PQgetResult won’t block).
Instance Method Summary collapse
-
#async_exec(sql) ⇒ Object
Explicit async exec.
-
#async_exec_params(sql, params) ⇒ Object
Parameterised async exec.
-
#async_exec_params_after_clear(sql) ⇒ Object
Internal: param accumulator has already been populated by the caller (either exec_params routing here on context detect, or async_exec_params after its own push loop).
- #close ⇒ Object
- #connected? ⇒ Boolean
- #error_message ⇒ Object
- #escape_identifier(s) ⇒ Object
- #escape_literal(s) ⇒ Object
- #escape_string(s) ⇒ Object
-
#exec(sql) ⇒ Object
Run a no-params query.
-
#exec_params(sql, params) ⇒ Object
Parameterised query with positional binds ($1, $2, …).
- #finish ⇒ Object
-
#initialize(opts) ⇒ Connection
constructor
A new instance of Connection.
- #last_notify_channel ⇒ Object
- #last_notify_payload ⇒ Object
-
#listen(channel) ⇒ Object
LISTEN / NOTIFY (Battery 2 chunk 2.2).
- #notify(channel, payload) ⇒ Object
-
#poll_notification(timeout_ms) ⇒ Object
Block up to ‘timeout_ms` waiting for one notification on the connection.
- #reset ⇒ Object
- #server_version ⇒ Object
- #status ⇒ Object
- #transaction_status ⇒ Object
- #unlisten(channel) ⇒ Object
Constructor Details
#initialize(opts) ⇒ Connection
Returns a new instance of Connection.
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/tep/pg.rb', line 188 def initialize(opts) @pgh = -1 @last_sqlstate = "" @last_error_message = "" @last_result_rh = -1 if opts.is_a?(String) if Tep::Scheduler.scheduled_context? h = Connection.async_connect(opts) else h = Pg.tep_pg_connect(opts) end else # Hash form. Pack keys and values into parallel \0-delimited # buffers; the shim splits them apart and calls # PQconnectdbParams. (No async-connect path for the Hash # form yet -- AR uses the String form for connect, so the # Scheduled-context shortcut points only at conninfo.) keys = "" vals = "" n = 0 opts.each do |k, v| keys = keys + k + "\0" vals = vals + v + "\0" n += 1 end h = Pg.tep_pg_connect_kv(keys, vals, n) end if h < 0 # Slot 0 holds the most recent connect-failure error message # (PQstatus on a failed PQconnectdb still gives a readable # error, but the conn itself is closed by the time we get # here -- the shim stashes the message before PQfinish). @last_error_message = Pg.(0) @last_sqlstate = "" # Connection-failure surfaces via `c.last_error_message` + # `c.connected?` after the constructor returns -- the # constructor stays non-raising on purpose (PG::Pool seeds its # free list with `PG::Connection.new("")` before a server is # reachable; a raising constructor would blow up at module # load). Callers must check `c.connected?` before exec. NB: # this is the lone non-raising path -- query methods raise # PG::Error subclasses now that spinel supports namespaced # raise + rescue (matz/spinel#627 + #1041). end @pgh = h end |
Instance Attribute Details
#last_error_message ⇒ Object
Error context for the most recent exception raised by this connection. Spinel’s ‘raise X.new(msg, …)` lowering doesn’t handle custom initializers (#622), so the SQLSTATE / message / owning-result-handle live here instead. Read after ‘rescue PG::Error => e`:
begin; conn.exec_params(sql, params)
rescue PG::Error => e
sqlstate = conn.last_sqlstate
full_msg = conn.
end
AR’s ‘translate_exception_class(message, sql, binds)` uses `e.is_a?(PG::UniqueViolation)` etc., which still works – the class hierarchy is intact; only the per-exception accessors move to the connection.
186 187 188 |
# File 'lib/tep/pg.rb', line 186 def @last_error_message end |
#last_result_rh ⇒ Object
Error context for the most recent exception raised by this connection. Spinel’s ‘raise X.new(msg, …)` lowering doesn’t handle custom initializers (#622), so the SQLSTATE / message / owning-result-handle live here instead. Read after ‘rescue PG::Error => e`:
begin; conn.exec_params(sql, params)
rescue PG::Error => e
sqlstate = conn.last_sqlstate
full_msg = conn.
end
AR’s ‘translate_exception_class(message, sql, binds)` uses `e.is_a?(PG::UniqueViolation)` etc., which still works – the class hierarchy is intact; only the per-exception accessors move to the connection.
186 187 188 |
# File 'lib/tep/pg.rb', line 186 def last_result_rh @last_result_rh end |
#last_sqlstate ⇒ Object
Error context for the most recent exception raised by this connection. Spinel’s ‘raise X.new(msg, …)` lowering doesn’t handle custom initializers (#622), so the SQLSTATE / message / owning-result-handle live here instead. Read after ‘rescue PG::Error => e`:
begin; conn.exec_params(sql, params)
rescue PG::Error => e
sqlstate = conn.last_sqlstate
full_msg = conn.
end
AR’s ‘translate_exception_class(message, sql, binds)` uses `e.is_a?(PG::UniqueViolation)` etc., which still works – the class hierarchy is intact; only the per-exception accessors move to the connection.
186 187 188 |
# File 'lib/tep/pg.rb', line 186 def last_sqlstate @last_sqlstate end |
#pgh ⇒ Object
‘:pgh` rather than `:handle` – same poly-dispatch widening concern as Tep::SQLite#dbh (sharing a method name with Tep::Handler#handle confuses spinel’s same-named-imeth-across- classes unifier).
169 170 171 |
# File 'lib/tep/pg.rb', line 169 def pgh @pgh end |
Class Method Details
.async_connect(conninfo) ⇒ Object
Drive PQconnectStart + PQconnectPoll, parking on io_wait between poll calls. Returns the conn slot (>=1) on success or -1 on failure. The C shim’s tep_pg_connect_poll stashes the libpq error message on a FAILED return so ‘Pg.tep_pg_error_message(0)` still surfaces the diagnostic for the Connection.new “connect failed” branch.
libpq’s PostgresPollingStatusType:
0 = PGRES_POLLING_FAILED
1 = PGRES_POLLING_READING (wait for fd READ-ready)
2 = PGRES_POLLING_WRITING (wait for fd WRITE-ready)
3 = PGRES_POLLING_OK (connected; stop polling)
450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 |
# File 'lib/tep/pg.rb', line 450 def self.async_connect(conninfo) h = Pg.tep_pg_connect_start(conninfo) if h < 0 return -1 end fd = Pg.tep_pg_socket(h) while true state = Pg.tep_pg_connect_poll(h) if state == 3 # PGRES_POLLING_OK Pg.tep_pg_set_client_encoding(h, "UTF8") return h end if state == 0 # PGRES_POLLING_FAILED. The shim has already stashed the # error message; we PQfinish the slot. Pg.tep_pg_finish(h) return -1 end mode = state == 1 ? Tep::Scheduler::READ : Tep::Scheduler::WRITE Tep::Scheduler.io_wait(fd, mode, 10) end -1 end |
.drain_remaining_results(pgh) ⇒ Object
After the first PQgetResult returned a real Result, libpq requires the conn be drained via additional PQgetResult calls until NULL is returned. This is a fast in-memory drain (no network), but it has to happen between async_exec calls or the next send_query will fail. Each tep_pg_get_result call that produces a non-NULL result stashes it in the slot table; we PQclear those immediately since they’re trailing status results we don’t expose.
537 538 539 540 541 542 543 544 545 546 547 548 |
# File 'lib/tep/pg.rb', line 537 def self.drain_remaining_results(pgh) while true rh = Pg.tep_pg_get_result(pgh) if rh < 0 return 0 end # A trailing result -- shouldn't normally happen for # single-statement queries, but defensively free. Pg.tep_pg_clear(rh) end 0 end |
.drain_send(pgh) ⇒ Object
Drain libpq’s send buffer. PQflush returns 0 when done; 1 when the kernel send-buffer is full and we should park on WRITE-ready; -1 on error. Timeout is generous (10s); a genuinely-stuck PG is the rare case worth bailing on.
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 |
# File 'lib/tep/pg.rb', line 494 def self.drain_send(pgh) fd = Pg.tep_pg_socket(pgh) while true rc = Pg.tep_pg_flush(pgh) if rc == 0 return 0 end if rc < 0 return -1 end # rc == 1: send buffer full, park on writability. Tep::Scheduler.io_wait(fd, Tep::Scheduler::WRITE, 10) end 0 end |
.escape_string(s) ⇒ Object
Class-method form – ruby-pg allows escape_string and quote_ident without a live conn. We route through slot 0 which the shim treats as “no conn, fall back to standalone PQescapeString”. Use the instance method when a conn is available – it goes through PQescapeStringConn which is the standards-compliant path.
568 569 570 |
# File 'lib/tep/pg.rb', line 568 def self.escape_string(s) Pg.tep_pg_escape_string(0, s) end |
.quote_ident(s) ⇒ Object
572 573 574 575 576 577 |
# File 'lib/tep/pg.rb', line 572 def self.quote_ident(s) # PQescapeIdentifier requires a conn; without one we fall # through to "" which is wrong but rare. Apps with a live # PG::Connection should use the instance method. Pg.tep_pg_escape_identifier(0, s) end |
.raise_send_failure(conn) ⇒ Object
PQsendQuery returned 0 (immediate failure – conn already closed, send buffer error, etc.). Mirror the error onto the conn’s last_* and raise, matching the exec error path (ruby-pg surfaces a send failure as PG::UnableToSend < PG::Error). No SQLSTATE is available pre-result, so this maps to the transport leaf rather than going through raise_for_sqlstate.
483 484 485 486 487 488 |
# File 'lib/tep/pg.rb', line 483 def self.raise_send_failure(conn) conn.last_sqlstate = "" conn. = conn. conn.last_result_rh = -1 raise PG::UnableToSend, conn. end |
.record_error_if_any(conn, r) ⇒ Object
If the Result is in an error state, mirror SQLSTATE + message + result-handle onto the conn so post-rescue (or post-‘if !r.ok?`) callers can read them via `conn.last_*`. No raise here – see the docstring on `exec` for why.
583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 |
# File 'lib/tep/pg.rb', line 583 def self.record_error_if_any(conn, r) st = r.status if st == Pg::RES_TUPLES || st == Pg::RES_COMMAND || st == Pg::RES_EMPTY return 0 end sqlstate = r.error_field(PG::DIAG_SQLSTATE) msg = r. if msg.length == 0 msg = conn. end conn.last_sqlstate = sqlstate conn. = msg # Free the failed PGresult NOW: once we raise out of # exec/exec_params the caller's `r.clear` never runs, so this is # the only chance to release it. The SQLSTATE / message are # already copied onto conn.last_* (Strings) for post-rescue # inspection, so dropping the handle loses nothing callers need. conn.last_result_rh = -1 r.clear # ruby-pg / AR parity: raise the SQLSTATE-mapped PG::Error # subclass (live since matz/spinel#627 + #1041 -- namespaced # raise + hierarchy-walking rescue). Callers `rescue # PG::UniqueViolation` (leaf) or `rescue PG::Error` (base). PG.raise_for_sqlstate(sqlstate, msg) 0 end |
.wait_for_result_ready(pgh) ⇒ Object
Wait until PQisBusy returns 0 (PQgetResult won’t block). Pumps PQconsumeInput in between io_wait calls so the libpq state machine advances. Timeout is generous (30s) since the query itself can take that long; the io_wait timeout is per-iteration, not cumulative.
515 516 517 518 519 520 521 522 523 524 525 526 527 |
# File 'lib/tep/pg.rb', line 515 def self.wait_for_result_ready(pgh) fd = Pg.tep_pg_socket(pgh) while true if Pg.tep_pg_consume_input(pgh) != 1 return -1 end if Pg.tep_pg_is_busy(pgh) == 0 return 0 end Tep::Scheduler.io_wait(fd, Tep::Scheduler::READ, 30) end 0 end |
Instance Method Details
#async_exec(sql) ⇒ Object
Explicit async exec. Same shape as ‘exec` but doesn’t context-detect – always uses the libpq async surface. If called outside Tep::Server::Scheduled, Tep::Scheduler.io_wait falls back to a single-shot poll(2), so this still works under prefork (just without the cross-fiber concurrency win).
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 |
# File 'lib/tep/pg.rb', line 382 def async_exec(sql) Pg.tep_pg_set_nonblocking(@pgh, 1) ok = Pg.tep_pg_send_query(@pgh, sql) if ok != 1 Connection.raise_send_failure(self) end Connection.drain_send(@pgh) Connection.wait_for_result_ready(@pgh) rh = Pg.tep_pg_get_result(@pgh) r = PG::Result.new(rh) # Drain trailing NULL terminator (libpq requires reading # until PQgetResult returns NULL to mark the conn ready for # the next send_query). Connection.drain_remaining_results(@pgh) Connection.record_error_if_any(self, r) r end |
#async_exec_params(sql, params) ⇒ Object
Parameterised async exec. ‘params` is an Array of String / Integer / nil; same conversion as exec_params.
402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 |
# File 'lib/tep/pg.rb', line 402 def async_exec_params(sql, params) Pg.tep_pg_param_clear i = 0 n = params.length while i < n p = params[i] if p == nil Pg.tep_pg_param_push_null else Pg.tep_pg_param_push_str(p.to_s) end i += 1 end async_exec_params_after_clear(sql) end |
#async_exec_params_after_clear(sql) ⇒ Object
Internal: param accumulator has already been populated by the caller (either exec_params routing here on context detect, or async_exec_params after its own push loop).
421 422 423 424 425 426 427 428 429 430 431 432 433 434 |
# File 'lib/tep/pg.rb', line 421 def async_exec_params_after_clear(sql) Pg.tep_pg_set_nonblocking(@pgh, 1) ok = Pg.tep_pg_send_query_params(@pgh, sql) if ok != 1 Connection.raise_send_failure(self) end Connection.drain_send(@pgh) Connection.wait_for_result_ready(@pgh) rh = Pg.tep_pg_get_result(@pgh) r = PG::Result.new(rh) Connection.drain_remaining_results(@pgh) Connection.record_error_if_any(self, r) r end |
#close ⇒ Object
239 240 241 242 243 244 245 |
# File 'lib/tep/pg.rb', line 239 def close if @pgh >= 0 Pg.tep_pg_finish(@pgh) @pgh = -1 end 0 end |
#connected? ⇒ Boolean
235 236 237 |
# File 'lib/tep/pg.rb', line 235 def connected? @pgh >= 0 end |
#error_message ⇒ Object
270 271 272 |
# File 'lib/tep/pg.rb', line 270 def @pgh < 0 ? "" : Pg.(@pgh) end |
#escape_identifier(s) ⇒ Object
554 555 556 |
# File 'lib/tep/pg.rb', line 554 def escape_identifier(s) Pg.tep_pg_escape_identifier(@pgh, s) end |
#escape_literal(s) ⇒ Object
558 559 560 |
# File 'lib/tep/pg.rb', line 558 def escape_literal(s) Pg.tep_pg_escape_literal(@pgh, s) end |
#escape_string(s) ⇒ Object
550 551 552 |
# File 'lib/tep/pg.rb', line 550 def escape_string(s) Pg.tep_pg_escape_string(@pgh, s) end |
#exec(sql) ⇒ Object
Run a no-params query. Returns a PG::Result on success.
ON ERROR IT RAISES the SQLSTATE-mapped PG::Error subclass (PG::UniqueViolation, PG::UndefinedTable, … -> PG::ServerError for unmapped states) – the ruby-pg / AR shape. The failed PGresult is freed before the raise; the SQLSTATE / message stay readable on ‘conn.last_sqlstate` / `#last_error_message` for post-rescue inspection:
begin
c.exec(sql)
rescue PG::UniqueViolation => e
... # e.message + c.last_sqlstate
rescue PG::Error => e # base catches any server error
...
end
Raising (instead of the old Result-on-error sentinel) became viable once spinel learned namespaced raise + hierarchy-walking rescue (matz/spinel#627 + #1041). NB: PG.connect is the one path that still does NOT raise – it returns a connection-failed instance so PG::Pool can type-seed without a live server (check ‘conn.connected?`).
Under ‘Tep::Server::Scheduled` this routes through the libpq async surface (PQsendQuery + PQflush + PQconsumeInput parked on Tep::Scheduler.io_wait), so other fibers in the same worker can run while the query is in flight. Under prefork it routes through the blocking PQexec. Both raise identically.
341 342 343 344 345 346 347 348 349 |
# File 'lib/tep/pg.rb', line 341 def exec(sql) if Tep::Scheduler.scheduled_context? return async_exec(sql) end rh = Pg.tep_pg_exec(@pgh, sql) r = PG::Result.new(rh) Connection.record_error_if_any(self, r) r end |
#exec_params(sql, params) ⇒ Object
Parameterised query with positional binds ($1, $2, …). ‘params` is an Array of String / Integer / nil. Same raise-on-error contract + auto-routing as `exec`.
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 |
# File 'lib/tep/pg.rb', line 354 def exec_params(sql, params) Pg.tep_pg_param_clear i = 0 n = params.length while i < n p = params[i] if p == nil Pg.tep_pg_param_push_null else Pg.tep_pg_param_push_str(p.to_s) end i += 1 end if Tep::Scheduler.scheduled_context? return async_exec_params_after_clear(sql) end rh = Pg.tep_pg_exec_params(@pgh, sql) r = PG::Result.new(rh) Connection.record_error_if_any(self, r) r end |
#finish ⇒ Object
247 248 249 |
# File 'lib/tep/pg.rb', line 247 def finish close end |
#last_notify_channel ⇒ Object
304 305 306 |
# File 'lib/tep/pg.rb', line 304 def last_notify_channel Pg.tep_pg_notify_channel end |
#last_notify_payload ⇒ Object
308 309 310 |
# File 'lib/tep/pg.rb', line 308 def last_notify_payload Pg.tep_pg_notify_payload end |
#listen(channel) ⇒ Object
LISTEN / NOTIFY (Battery 2 chunk 2.2). Used by Tep::Broadcast’s PG backend for cross-worker pub/sub. Channel names must be safe SQL identifiers (no caller- controlled interpolation – use a hard-coded constant). Payload max size is 8000 bytes per PG default.
279 280 281 282 |
# File 'lib/tep/pg.rb', line 279 def listen(channel) return -1 if @pgh < 0 Pg.tep_pg_listen(@pgh, channel) end |
#notify(channel, payload) ⇒ Object
289 290 291 292 |
# File 'lib/tep/pg.rb', line 289 def notify(channel, payload) return -1 if @pgh < 0 Pg.tep_pg_notify(@pgh, channel, payload) end |
#poll_notification(timeout_ms) ⇒ Object
Block up to ‘timeout_ms` waiting for one notification on the connection. Returns 1 on receipt (caller then reads #last_notify_channel + #last_notify_payload), 0 on timeout, -1 on connection error. Connection must already be in LISTEN mode for the channel of interest.
299 300 301 302 |
# File 'lib/tep/pg.rb', line 299 def poll_notification(timeout_ms) return -1 if @pgh < 0 Pg.tep_pg_poll_notification(@pgh, timeout_ms) end |
#reset ⇒ Object
251 252 253 254 255 256 |
# File 'lib/tep/pg.rb', line 251 def reset if @pgh >= 0 Pg.tep_pg_reset(@pgh) end self end |
#server_version ⇒ Object
266 267 268 |
# File 'lib/tep/pg.rb', line 266 def server_version @pgh < 0 ? 0 : Pg.tep_pg_server_version(@pgh) end |
#status ⇒ Object
258 259 260 |
# File 'lib/tep/pg.rb', line 258 def status @pgh < 0 ? PG::CONNECTION_BAD : Pg.tep_pg_status(@pgh) end |
#transaction_status ⇒ Object
262 263 264 |
# File 'lib/tep/pg.rb', line 262 def transaction_status @pgh < 0 ? PG::PQTRANS_UNKNOWN : Pg.tep_pg_transaction_status(@pgh) end |
#unlisten(channel) ⇒ Object
284 285 286 287 |
# File 'lib/tep/pg.rb', line 284 def unlisten(channel) return -1 if @pgh < 0 Pg.tep_pg_unlisten(@pgh, channel) end |