Class: PG::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/tep/pg.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ Connection

Returns a new instance of Connection.



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/tep/pg.rb', line 188

def initialize(opts)
  @pgh = -1
  @last_sqlstate = ""
  @last_error_message = ""
  @last_result_rh = -1
  if opts.is_a?(String)
    if Tep::Scheduler.scheduled_context?
      h = Connection.async_connect(opts)
    else
      h = Pg.tep_pg_connect(opts)
    end
  else
    # Hash-conninfo form: pack the key/value pairs into NUL-delimited
    # buffers for the C shim. (`opts` narrows to Hash in this
    # is_a?(String) ELSE branch -- the narrowing gap that blocked the
    # re-pin, matz/spinel#1434, is fixed as of the SPINEL_PIN bump.)
    keys = ""
    vals = ""
    n = 0
    opts.each do |k, v|
      keys = keys + k + "\0"
      vals = vals + v + "\0"
      n += 1
    end
    h = Pg.tep_pg_connect_kv(keys, vals, n)
  end
  if h < 0
    # Slot 0 holds the most recent connect-failure error message
    # (PQstatus on a failed PQconnectdb still gives a readable
    # error, but the conn itself is closed by the time we get
    # here -- the shim stashes the message before PQfinish).
    @last_error_message = Pg.tep_pg_error_message(0)
    @last_sqlstate = ""
    # Connection-failure surfaces via `c.last_error_message` +
    # `c.connected?` after the constructor returns -- the
    # constructor stays non-raising on purpose (PG::Pool seeds its
    # free list with `PG::Connection.new("")` before a server is
    # reachable; a raising constructor would blow up at module
    # load). Callers must check `c.connected?` before exec. NB:
    # this is the lone non-raising path -- query methods raise
    # PG::Error subclasses now that spinel supports namespaced
    # raise + rescue (matz/spinel#627 + #1041).
  end
  @pgh = h
end

Instance Attribute Details

#last_error_messageObject

Error context for the most recent exception raised by this connection. Spinel's raise X.new(msg, ...) lowering doesn't handle custom initializers (#622), so the SQLSTATE / message / owning-result-handle live here instead. Read after rescue PG::Error => e:

begin; conn.exec_params(sql, params)
rescue PG::Error => e
  sqlstate = conn.last_sqlstate
  full_msg = conn.last_error_message
end

AR's translate_exception_class(message, sql, binds) uses e.is_a?(PG::UniqueViolation) etc., which still works -- the class hierarchy is intact; only the per-exception accessors move to the connection.



186
187
188
# File 'lib/tep/pg.rb', line 186

def last_error_message
  @last_error_message
end

#last_result_rhObject

Error context for the most recent exception raised by this connection. Spinel's raise X.new(msg, ...) lowering doesn't handle custom initializers (#622), so the SQLSTATE / message / owning-result-handle live here instead. Read after rescue PG::Error => e:

begin; conn.exec_params(sql, params)
rescue PG::Error => e
  sqlstate = conn.last_sqlstate
  full_msg = conn.last_error_message
end

AR's translate_exception_class(message, sql, binds) uses e.is_a?(PG::UniqueViolation) etc., which still works -- the class hierarchy is intact; only the per-exception accessors move to the connection.



186
187
188
# File 'lib/tep/pg.rb', line 186

def last_result_rh
  @last_result_rh
end

#last_sqlstateObject

Error context for the most recent exception raised by this connection. Spinel's raise X.new(msg, ...) lowering doesn't handle custom initializers (#622), so the SQLSTATE / message / owning-result-handle live here instead. Read after rescue PG::Error => e:

begin; conn.exec_params(sql, params)
rescue PG::Error => e
  sqlstate = conn.last_sqlstate
  full_msg = conn.last_error_message
end

AR's translate_exception_class(message, sql, binds) uses e.is_a?(PG::UniqueViolation) etc., which still works -- the class hierarchy is intact; only the per-exception accessors move to the connection.



186
187
188
# File 'lib/tep/pg.rb', line 186

def last_sqlstate
  @last_sqlstate
end

#pghObject

:pgh rather than :handle -- same poly-dispatch widening concern as Tep::SQLite#dbh (sharing a method name with Tep::Handler#handle confuses spinel's same-named-imeth-across- classes unifier).



169
170
171
# File 'lib/tep/pg.rb', line 169

def pgh
  @pgh
end

Class Method Details

.async_connect(conninfo) ⇒ Object

Drive PQconnectStart + PQconnectPoll, parking on io_wait between poll calls. Returns the conn slot (>=1) on success or -1 on failure. The C shim's tep_pg_connect_poll stashes the libpq error message on a FAILED return so Pg.tep_pg_error_message(0) still surfaces the diagnostic for the Connection.new "connect failed" branch.

libpq's PostgresPollingStatusType:

0 = PGRES_POLLING_FAILED
1 = PGRES_POLLING_READING    (wait for fd READ-ready)
2 = PGRES_POLLING_WRITING    (wait for fd WRITE-ready)
3 = PGRES_POLLING_OK         (connected; stop polling)


449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
# File 'lib/tep/pg.rb', line 449

def self.async_connect(conninfo)
  h = Pg.tep_pg_connect_start(conninfo)
  if h < 0
    return -1
  end
  fd = Pg.tep_pg_socket(h)
  while true
    state = Pg.tep_pg_connect_poll(h)
    if state == 3
      # PGRES_POLLING_OK
      Pg.tep_pg_set_client_encoding(h, "UTF8")
      return h
    end
    if state == 0
      # PGRES_POLLING_FAILED. The shim has already stashed the
      # error message; we PQfinish the slot.
      Pg.tep_pg_finish(h)
      return -1
    end
    mode = state == 1 ? Tep::Scheduler::READ : Tep::Scheduler::WRITE
    Tep::Scheduler.io_wait(fd, mode, 10)
  end
  -1
end

.drain_remaining_results(pgh) ⇒ Object

After the first PQgetResult returned a real Result, libpq requires the conn be drained via additional PQgetResult calls until NULL is returned. This is a fast in-memory drain (no network), but it has to happen between async_exec calls or the next send_query will fail. Each tep_pg_get_result call that produces a non-NULL result stashes it in the slot table; we PQclear those immediately since they're trailing status results we don't expose.



536
537
538
539
540
541
542
543
544
545
546
547
# File 'lib/tep/pg.rb', line 536

def self.drain_remaining_results(pgh)
  while true
    rh = Pg.tep_pg_get_result(pgh)
    if rh < 0
      return 0
    end
    # A trailing result -- shouldn't normally happen for
    # single-statement queries, but defensively free.
    Pg.tep_pg_clear(rh)
  end
  0
end

.drain_send(pgh) ⇒ Object

Drain libpq's send buffer. PQflush returns 0 when done; 1 when the kernel send-buffer is full and we should park on WRITE-ready; -1 on error. Timeout is generous (10s); a genuinely-stuck PG is the rare case worth bailing on.



493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
# File 'lib/tep/pg.rb', line 493

def self.drain_send(pgh)
  fd = Pg.tep_pg_socket(pgh)
  while true
    rc = Pg.tep_pg_flush(pgh)
    if rc == 0
      return 0
    end
    if rc < 0
      return -1
    end
    # rc == 1: send buffer full, park on writability.
    Tep::Scheduler.io_wait(fd, Tep::Scheduler::WRITE, 10)
  end
  0
end

.escape_string(s) ⇒ Object

Class-method form -- ruby-pg allows escape_string and quote_ident without a live conn. We route through slot 0 which the shim treats as "no conn, fall back to standalone PQescapeString". Use the instance method when a conn is available -- it goes through PQescapeStringConn which is the standards-compliant path.



567
568
569
# File 'lib/tep/pg.rb', line 567

def self.escape_string(s)
  Pg.tep_pg_escape_string(0, s)
end

.quote_ident(s) ⇒ Object



571
572
573
574
575
576
# File 'lib/tep/pg.rb', line 571

def self.quote_ident(s)
  # PQescapeIdentifier requires a conn; without one we fall
  # through to "" which is wrong but rare. Apps with a live
  # PG::Connection should use the instance method.
  Pg.tep_pg_escape_identifier(0, s)
end

.raise_send_failure(conn) ⇒ Object

PQsendQuery returned 0 (immediate failure -- conn already closed, send buffer error, etc.). Mirror the error onto the conn's last_* and raise, matching the exec error path (ruby-pg surfaces a send failure as PG::UnableToSend < PG::Error). No SQLSTATE is available pre-result, so this maps to the transport leaf rather than going through raise_for_sqlstate.

Raises:



482
483
484
485
486
487
# File 'lib/tep/pg.rb', line 482

def self.raise_send_failure(conn)
  conn.last_sqlstate = ""
  conn.last_error_message = conn.error_message
  conn.last_result_rh = -1
  raise PG::UnableToSend, conn.error_message
end

.record_error_if_any(conn, r) ⇒ Object

If the Result is in an error state, mirror SQLSTATE + message + result-handle onto the conn so post-rescue (or post-if !r.ok?) callers can read them via conn.last_*. No raise here -- see the docstring on exec for why.



582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
# File 'lib/tep/pg.rb', line 582

def self.record_error_if_any(conn, r)
  st = r.status
  if st == Pg::RES_TUPLES || st == Pg::RES_COMMAND || st == Pg::RES_EMPTY
    return 0
  end
  sqlstate = r.error_field(PG::DIAG_SQLSTATE)
  msg = r.error_message
  if msg.length == 0
    msg = conn.error_message
  end
  conn.last_sqlstate = sqlstate
  conn.last_error_message = msg
  # Free the failed PGresult NOW: once we raise out of
  # exec/exec_params the caller's `r.clear` never runs, so this is
  # the only chance to release it. The SQLSTATE / message are
  # already copied onto conn.last_* (Strings) for post-rescue
  # inspection, so dropping the handle loses nothing callers need.
  conn.last_result_rh = -1
  r.clear
  # ruby-pg / AR parity: raise the SQLSTATE-mapped PG::Error
  # subclass (live since matz/spinel#627 + #1041 -- namespaced
  # raise + hierarchy-walking rescue). Callers `rescue
  # PG::UniqueViolation` (leaf) or `rescue PG::Error` (base).
  PG.raise_for_sqlstate(sqlstate, msg)
  0
end

.wait_for_result_ready(pgh) ⇒ Object

Wait until PQisBusy returns 0 (PQgetResult won't block). Pumps PQconsumeInput in between io_wait calls so the libpq state machine advances. Timeout is generous (30s) since the query itself can take that long; the io_wait timeout is per-iteration, not cumulative.



514
515
516
517
518
519
520
521
522
523
524
525
526
# File 'lib/tep/pg.rb', line 514

def self.wait_for_result_ready(pgh)
  fd = Pg.tep_pg_socket(pgh)
  while true
    if Pg.tep_pg_consume_input(pgh) != 1
      return -1
    end
    if Pg.tep_pg_is_busy(pgh) == 0
      return 0
    end
    Tep::Scheduler.io_wait(fd, Tep::Scheduler::READ, 30)
  end
  0
end

Instance Method Details

#async_exec(sql) ⇒ Object

Explicit async exec. Same shape as exec but doesn't context-detect -- always uses the libpq async surface. If called outside Tep::Server::Scheduled, Tep::Scheduler.io_wait falls back to a single-shot poll(2), so this still works under prefork (just without the cross-fiber concurrency win).



381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
# File 'lib/tep/pg.rb', line 381

def async_exec(sql)
  Pg.tep_pg_set_nonblocking(@pgh, 1)
  ok = Pg.tep_pg_send_query(@pgh, sql)
  if ok != 1
    Connection.raise_send_failure(self)
  end
  Connection.drain_send(@pgh)
  Connection.wait_for_result_ready(@pgh)
  rh = Pg.tep_pg_get_result(@pgh)
  r = PG::Result.new(rh)
  # Drain trailing NULL terminator (libpq requires reading
  # until PQgetResult returns NULL to mark the conn ready for
  # the next send_query).
  Connection.drain_remaining_results(@pgh)
  Connection.record_error_if_any(self, r)
  r
end

#async_exec_params(sql, params) ⇒ Object

Parameterised async exec. params is an Array of String / Integer / nil; same conversion as exec_params.



401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
# File 'lib/tep/pg.rb', line 401

def async_exec_params(sql, params)
  Pg.tep_pg_param_clear
  i = 0
  n = params.length
  while i < n
    p = params[i]
    if p == nil
      Pg.tep_pg_param_push_null
    else
      Pg.tep_pg_param_push_str(p.to_s)
    end
    i += 1
  end
  async_exec_params_after_clear(sql)
end

#async_exec_params_after_clear(sql) ⇒ Object

Internal: param accumulator has already been populated by the caller (either exec_params routing here on context detect, or async_exec_params after its own push loop).



420
421
422
423
424
425
426
427
428
429
430
431
432
433
# File 'lib/tep/pg.rb', line 420

def async_exec_params_after_clear(sql)
  Pg.tep_pg_set_nonblocking(@pgh, 1)
  ok = Pg.tep_pg_send_query_params(@pgh, sql)
  if ok != 1
    Connection.raise_send_failure(self)
  end
  Connection.drain_send(@pgh)
  Connection.wait_for_result_ready(@pgh)
  rh = Pg.tep_pg_get_result(@pgh)
  r = PG::Result.new(rh)
  Connection.drain_remaining_results(@pgh)
  Connection.record_error_if_any(self, r)
  r
end

#closeObject



238
239
240
241
242
243
244
# File 'lib/tep/pg.rb', line 238

def close
  if @pgh >= 0
    Pg.tep_pg_finish(@pgh)
    @pgh = -1
  end
  0
end

#connected?Boolean

Returns:

  • (Boolean)


234
235
236
# File 'lib/tep/pg.rb', line 234

def connected?
  @pgh >= 0
end

#error_messageObject



269
270
271
# File 'lib/tep/pg.rb', line 269

def error_message
  @pgh < 0 ? "" : Pg.tep_pg_error_message(@pgh)
end

#escape_identifier(s) ⇒ Object



553
554
555
# File 'lib/tep/pg.rb', line 553

def escape_identifier(s)
  Pg.tep_pg_escape_identifier(@pgh, s)
end

#escape_literal(s) ⇒ Object



557
558
559
# File 'lib/tep/pg.rb', line 557

def escape_literal(s)
  Pg.tep_pg_escape_literal(@pgh, s)
end

#escape_string(s) ⇒ Object



549
550
551
# File 'lib/tep/pg.rb', line 549

def escape_string(s)
  Pg.tep_pg_escape_string(@pgh, s)
end

#exec(sql) ⇒ Object

Run a no-params query. Returns a PG::Result on success.

ON ERROR IT RAISES the SQLSTATE-mapped PG::Error subclass (PG::UniqueViolation, PG::UndefinedTable, ... -> PG::ServerError for unmapped states) -- the ruby-pg / AR shape. The failed PGresult is freed before the raise; the SQLSTATE / message stay readable on conn.last_sqlstate / #last_error_message for post-rescue inspection:

begin
  c.exec(sql)
rescue PG::UniqueViolation => e
  ...                     # e.message + c.last_sqlstate
rescue PG::Error => e     # base catches any server error
  ...
end

Raising (instead of the old Result-on-error sentinel) became viable once spinel learned namespaced raise + hierarchy-walking rescue (matz/spinel#627 + #1041). NB: PG.connect is the one path that still does NOT raise -- it returns a connection-failed instance so PG::Pool can type-seed without a live server (check conn.connected?).

Under Tep::Server::Scheduled this routes through the libpq async surface (PQsendQuery + PQflush + PQconsumeInput parked on Tep::Scheduler.io_wait), so other fibers in the same worker can run while the query is in flight. Under prefork it routes through the blocking PQexec. Both raise identically.



340
341
342
343
344
345
346
347
348
# File 'lib/tep/pg.rb', line 340

def exec(sql)
  if Tep::Scheduler.scheduled_context?
    return async_exec(sql)
  end
  rh = Pg.tep_pg_exec(@pgh, sql)
  r = PG::Result.new(rh)
  Connection.record_error_if_any(self, r)
  r
end

#exec_params(sql, params) ⇒ Object

Parameterised query with positional binds ($1, $2, ...). params is an Array of String / Integer / nil. Same raise-on-error contract + auto-routing as exec.



353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
# File 'lib/tep/pg.rb', line 353

def exec_params(sql, params)
  Pg.tep_pg_param_clear
  i = 0
  n = params.length
  while i < n
    p = params[i]
    if p == nil
      Pg.tep_pg_param_push_null
    else
      Pg.tep_pg_param_push_str(p.to_s)
    end
    i += 1
  end
  if Tep::Scheduler.scheduled_context?
    return async_exec_params_after_clear(sql)
  end
  rh = Pg.tep_pg_exec_params(@pgh, sql)
  r = PG::Result.new(rh)
  Connection.record_error_if_any(self, r)
  r
end

#finishObject



246
247
248
# File 'lib/tep/pg.rb', line 246

def finish
  close
end

#last_notify_channelObject



303
304
305
# File 'lib/tep/pg.rb', line 303

def last_notify_channel
  Pg.tep_pg_notify_channel
end

#last_notify_payloadObject



307
308
309
# File 'lib/tep/pg.rb', line 307

def last_notify_payload
  Pg.tep_pg_notify_payload
end

#listen(channel) ⇒ Object

LISTEN / NOTIFY (Battery 2 chunk 2.2). Used by Tep::Broadcast's PG backend for cross-worker pub/sub. Channel names must be safe SQL identifiers (no caller- controlled interpolation -- use a hard-coded constant). Payload max size is 8000 bytes per PG default.



278
279
280
281
# File 'lib/tep/pg.rb', line 278

def listen(channel)
  return -1 if @pgh < 0
  Pg.tep_pg_listen(@pgh, channel)
end

#notify(channel, payload) ⇒ Object



288
289
290
291
# File 'lib/tep/pg.rb', line 288

def notify(channel, payload)
  return -1 if @pgh < 0
  Pg.tep_pg_notify(@pgh, channel, payload)
end

#poll_notification(timeout_ms) ⇒ Object

Block up to timeout_ms waiting for one notification on the connection. Returns 1 on receipt (caller then reads #last_notify_channel + #last_notify_payload), 0 on timeout, -1 on connection error. Connection must already be in LISTEN mode for the channel of interest.



298
299
300
301
# File 'lib/tep/pg.rb', line 298

def poll_notification(timeout_ms)
  return -1 if @pgh < 0
  Pg.tep_pg_poll_notification(@pgh, timeout_ms)
end

#resetObject



250
251
252
253
254
255
# File 'lib/tep/pg.rb', line 250

def reset
  if @pgh >= 0
    Pg.tep_pg_reset(@pgh)
  end
  self
end

#server_versionObject



265
266
267
# File 'lib/tep/pg.rb', line 265

def server_version
  @pgh < 0 ? 0 : Pg.tep_pg_server_version(@pgh)
end

#statusObject



257
258
259
# File 'lib/tep/pg.rb', line 257

def status
  @pgh < 0 ? PG::CONNECTION_BAD : Pg.tep_pg_status(@pgh)
end

#transaction_statusObject



261
262
263
# File 'lib/tep/pg.rb', line 261

def transaction_status
  @pgh < 0 ? PG::PQTRANS_UNKNOWN : Pg.tep_pg_transaction_status(@pgh)
end

#unlisten(channel) ⇒ Object



283
284
285
286
# File 'lib/tep/pg.rb', line 283

def unlisten(channel)
  return -1 if @pgh < 0
  Pg.tep_pg_unlisten(@pgh, channel)
end