Class: PG::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/tep/pg.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ Connection

Returns a new instance of Connection.



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/tep/pg.rb', line 188

def initialize(opts)
  @pgh = -1
  @last_sqlstate = ""
  @last_error_message = ""
  @last_result_rh = -1
  if opts.is_a?(String)
    if Tep::Scheduler.scheduled_context?
      h = Connection.async_connect(opts)
    else
      h = Pg.tep_pg_connect(opts)
    end
  else
    # Hash form. Pack keys and values into parallel \0-delimited
    # buffers; the shim splits them apart and calls
    # PQconnectdbParams. (No async-connect path for the Hash
    # form yet -- AR uses the String form for connect, so the
    # Scheduled-context shortcut points only at conninfo.)
    keys = ""
    vals = ""
    n = 0
    opts.each do |k, v|
      keys = keys + k + "\0"
      vals = vals + v + "\0"
      n += 1
    end
    h = Pg.tep_pg_connect_kv(keys, vals, n)
  end
  if h < 0
    # Slot 0 holds the most recent connect-failure error message
    # (PQstatus on a failed PQconnectdb still gives a readable
    # error, but the conn itself is closed by the time we get
    # here -- the shim stashes the message before PQfinish).
    @last_error_message = Pg.tep_pg_error_message(0)
    @last_sqlstate = ""
    # Connection-failure surfaces via `c.last_error_message` +
    # `c.connected?` after the constructor returns -- the
    # constructor stays non-raising on purpose (PG::Pool seeds its
    # free list with `PG::Connection.new("")` before a server is
    # reachable; a raising constructor would blow up at module
    # load). Callers must check `c.connected?` before exec. NB:
    # this is the lone non-raising path -- query methods raise
    # PG::Error subclasses now that spinel supports namespaced
    # raise + rescue (matz/spinel#627 + #1041).
  end
  @pgh = h
end

Instance Attribute Details

#last_error_messageObject

Error context for the most recent exception raised by this connection. Spinel’s ‘raise X.new(msg, …)` lowering doesn’t handle custom initializers (#622), so the SQLSTATE / message / owning-result-handle live here instead. Read after ‘rescue PG::Error => e`:

begin; conn.exec_params(sql, params)
rescue PG::Error => e
  sqlstate = conn.last_sqlstate
  full_msg = conn.last_error_message
end

AR’s ‘translate_exception_class(message, sql, binds)` uses `e.is_a?(PG::UniqueViolation)` etc., which still works – the class hierarchy is intact; only the per-exception accessors move to the connection.



186
187
188
# File 'lib/tep/pg.rb', line 186

def last_error_message
  @last_error_message
end

#last_result_rhObject

Error context for the most recent exception raised by this connection. Spinel’s ‘raise X.new(msg, …)` lowering doesn’t handle custom initializers (#622), so the SQLSTATE / message / owning-result-handle live here instead. Read after ‘rescue PG::Error => e`:

begin; conn.exec_params(sql, params)
rescue PG::Error => e
  sqlstate = conn.last_sqlstate
  full_msg = conn.last_error_message
end

AR’s ‘translate_exception_class(message, sql, binds)` uses `e.is_a?(PG::UniqueViolation)` etc., which still works – the class hierarchy is intact; only the per-exception accessors move to the connection.



186
187
188
# File 'lib/tep/pg.rb', line 186

def last_result_rh
  @last_result_rh
end

#last_sqlstateObject

Error context for the most recent exception raised by this connection. Spinel’s ‘raise X.new(msg, …)` lowering doesn’t handle custom initializers (#622), so the SQLSTATE / message / owning-result-handle live here instead. Read after ‘rescue PG::Error => e`:

begin; conn.exec_params(sql, params)
rescue PG::Error => e
  sqlstate = conn.last_sqlstate
  full_msg = conn.last_error_message
end

AR’s ‘translate_exception_class(message, sql, binds)` uses `e.is_a?(PG::UniqueViolation)` etc., which still works – the class hierarchy is intact; only the per-exception accessors move to the connection.



186
187
188
# File 'lib/tep/pg.rb', line 186

def last_sqlstate
  @last_sqlstate
end

#pghObject

‘:pgh` rather than `:handle` – same poly-dispatch widening concern as Tep::SQLite#dbh (sharing a method name with Tep::Handler#handle confuses spinel’s same-named-imeth-across- classes unifier).



169
170
171
# File 'lib/tep/pg.rb', line 169

def pgh
  @pgh
end

Class Method Details

.async_connect(conninfo) ⇒ Object

Drive PQconnectStart + PQconnectPoll, parking on io_wait between poll calls. Returns the conn slot (>=1) on success or -1 on failure. The C shim’s tep_pg_connect_poll stashes the libpq error message on a FAILED return so ‘Pg.tep_pg_error_message(0)` still surfaces the diagnostic for the Connection.new “connect failed” branch.

libpq’s PostgresPollingStatusType:

0 = PGRES_POLLING_FAILED
1 = PGRES_POLLING_READING    (wait for fd READ-ready)
2 = PGRES_POLLING_WRITING    (wait for fd WRITE-ready)
3 = PGRES_POLLING_OK         (connected; stop polling)


450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
# File 'lib/tep/pg.rb', line 450

def self.async_connect(conninfo)
  h = Pg.tep_pg_connect_start(conninfo)
  if h < 0
    return -1
  end
  fd = Pg.tep_pg_socket(h)
  while true
    state = Pg.tep_pg_connect_poll(h)
    if state == 3
      # PGRES_POLLING_OK
      Pg.tep_pg_set_client_encoding(h, "UTF8")
      return h
    end
    if state == 0
      # PGRES_POLLING_FAILED. The shim has already stashed the
      # error message; we PQfinish the slot.
      Pg.tep_pg_finish(h)
      return -1
    end
    mode = state == 1 ? Tep::Scheduler::READ : Tep::Scheduler::WRITE
    Tep::Scheduler.io_wait(fd, mode, 10)
  end
  -1
end

.drain_remaining_results(pgh) ⇒ Object

After the first PQgetResult returned a real Result, libpq requires the conn be drained via additional PQgetResult calls until NULL is returned. This is a fast in-memory drain (no network), but it has to happen between async_exec calls or the next send_query will fail. Each tep_pg_get_result call that produces a non-NULL result stashes it in the slot table; we PQclear those immediately since they’re trailing status results we don’t expose.



537
538
539
540
541
542
543
544
545
546
547
548
# File 'lib/tep/pg.rb', line 537

def self.drain_remaining_results(pgh)
  while true
    rh = Pg.tep_pg_get_result(pgh)
    if rh < 0
      return 0
    end
    # A trailing result -- shouldn't normally happen for
    # single-statement queries, but defensively free.
    Pg.tep_pg_clear(rh)
  end
  0
end

.drain_send(pgh) ⇒ Object

Drain libpq’s send buffer. PQflush returns 0 when done; 1 when the kernel send-buffer is full and we should park on WRITE-ready; -1 on error. Timeout is generous (10s); a genuinely-stuck PG is the rare case worth bailing on.



494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
# File 'lib/tep/pg.rb', line 494

def self.drain_send(pgh)
  fd = Pg.tep_pg_socket(pgh)
  while true
    rc = Pg.tep_pg_flush(pgh)
    if rc == 0
      return 0
    end
    if rc < 0
      return -1
    end
    # rc == 1: send buffer full, park on writability.
    Tep::Scheduler.io_wait(fd, Tep::Scheduler::WRITE, 10)
  end
  0
end

.escape_string(s) ⇒ Object

Class-method form – ruby-pg allows escape_string and quote_ident without a live conn. We route through slot 0 which the shim treats as “no conn, fall back to standalone PQescapeString”. Use the instance method when a conn is available – it goes through PQescapeStringConn which is the standards-compliant path.



568
569
570
# File 'lib/tep/pg.rb', line 568

def self.escape_string(s)
  Pg.tep_pg_escape_string(0, s)
end

.quote_ident(s) ⇒ Object



572
573
574
575
576
577
# File 'lib/tep/pg.rb', line 572

def self.quote_ident(s)
  # PQescapeIdentifier requires a conn; without one we fall
  # through to "" which is wrong but rare. Apps with a live
  # PG::Connection should use the instance method.
  Pg.tep_pg_escape_identifier(0, s)
end

.raise_send_failure(conn) ⇒ Object

PQsendQuery returned 0 (immediate failure – conn already closed, send buffer error, etc.). Mirror the error onto the conn’s last_* and raise, matching the exec error path (ruby-pg surfaces a send failure as PG::UnableToSend < PG::Error). No SQLSTATE is available pre-result, so this maps to the transport leaf rather than going through raise_for_sqlstate.

Raises:



483
484
485
486
487
488
# File 'lib/tep/pg.rb', line 483

def self.raise_send_failure(conn)
  conn.last_sqlstate = ""
  conn.last_error_message = conn.error_message
  conn.last_result_rh = -1
  raise PG::UnableToSend, conn.error_message
end

.record_error_if_any(conn, r) ⇒ Object

If the Result is in an error state, mirror SQLSTATE + message + result-handle onto the conn so post-rescue (or post-‘if !r.ok?`) callers can read them via `conn.last_*`. No raise here – see the docstring on `exec` for why.



583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
# File 'lib/tep/pg.rb', line 583

def self.record_error_if_any(conn, r)
  st = r.status
  if st == Pg::RES_TUPLES || st == Pg::RES_COMMAND || st == Pg::RES_EMPTY
    return 0
  end
  sqlstate = r.error_field(PG::DIAG_SQLSTATE)
  msg = r.error_message
  if msg.length == 0
    msg = conn.error_message
  end
  conn.last_sqlstate = sqlstate
  conn.last_error_message = msg
  # Free the failed PGresult NOW: once we raise out of
  # exec/exec_params the caller's `r.clear` never runs, so this is
  # the only chance to release it. The SQLSTATE / message are
  # already copied onto conn.last_* (Strings) for post-rescue
  # inspection, so dropping the handle loses nothing callers need.
  conn.last_result_rh = -1
  r.clear
  # ruby-pg / AR parity: raise the SQLSTATE-mapped PG::Error
  # subclass (live since matz/spinel#627 + #1041 -- namespaced
  # raise + hierarchy-walking rescue). Callers `rescue
  # PG::UniqueViolation` (leaf) or `rescue PG::Error` (base).
  PG.raise_for_sqlstate(sqlstate, msg)
  0
end

.wait_for_result_ready(pgh) ⇒ Object

Wait until PQisBusy returns 0 (PQgetResult won’t block). Pumps PQconsumeInput in between io_wait calls so the libpq state machine advances. Timeout is generous (30s) since the query itself can take that long; the io_wait timeout is per-iteration, not cumulative.



515
516
517
518
519
520
521
522
523
524
525
526
527
# File 'lib/tep/pg.rb', line 515

def self.wait_for_result_ready(pgh)
  fd = Pg.tep_pg_socket(pgh)
  while true
    if Pg.tep_pg_consume_input(pgh) != 1
      return -1
    end
    if Pg.tep_pg_is_busy(pgh) == 0
      return 0
    end
    Tep::Scheduler.io_wait(fd, Tep::Scheduler::READ, 30)
  end
  0
end

Instance Method Details

#async_exec(sql) ⇒ Object

Explicit async exec. Same shape as ‘exec` but doesn’t context-detect – always uses the libpq async surface. If called outside Tep::Server::Scheduled, Tep::Scheduler.io_wait falls back to a single-shot poll(2), so this still works under prefork (just without the cross-fiber concurrency win).



382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
# File 'lib/tep/pg.rb', line 382

def async_exec(sql)
  Pg.tep_pg_set_nonblocking(@pgh, 1)
  ok = Pg.tep_pg_send_query(@pgh, sql)
  if ok != 1
    Connection.raise_send_failure(self)
  end
  Connection.drain_send(@pgh)
  Connection.wait_for_result_ready(@pgh)
  rh = Pg.tep_pg_get_result(@pgh)
  r = PG::Result.new(rh)
  # Drain trailing NULL terminator (libpq requires reading
  # until PQgetResult returns NULL to mark the conn ready for
  # the next send_query).
  Connection.drain_remaining_results(@pgh)
  Connection.record_error_if_any(self, r)
  r
end

#async_exec_params(sql, params) ⇒ Object

Parameterised async exec. ‘params` is an Array of String / Integer / nil; same conversion as exec_params.



402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
# File 'lib/tep/pg.rb', line 402

def async_exec_params(sql, params)
  Pg.tep_pg_param_clear
  i = 0
  n = params.length
  while i < n
    p = params[i]
    if p == nil
      Pg.tep_pg_param_push_null
    else
      Pg.tep_pg_param_push_str(p.to_s)
    end
    i += 1
  end
  async_exec_params_after_clear(sql)
end

#async_exec_params_after_clear(sql) ⇒ Object

Internal: param accumulator has already been populated by the caller (either exec_params routing here on context detect, or async_exec_params after its own push loop).



421
422
423
424
425
426
427
428
429
430
431
432
433
434
# File 'lib/tep/pg.rb', line 421

def async_exec_params_after_clear(sql)
  Pg.tep_pg_set_nonblocking(@pgh, 1)
  ok = Pg.tep_pg_send_query_params(@pgh, sql)
  if ok != 1
    Connection.raise_send_failure(self)
  end
  Connection.drain_send(@pgh)
  Connection.wait_for_result_ready(@pgh)
  rh = Pg.tep_pg_get_result(@pgh)
  r = PG::Result.new(rh)
  Connection.drain_remaining_results(@pgh)
  Connection.record_error_if_any(self, r)
  r
end

#closeObject



239
240
241
242
243
244
245
# File 'lib/tep/pg.rb', line 239

def close
  if @pgh >= 0
    Pg.tep_pg_finish(@pgh)
    @pgh = -1
  end
  0
end

#connected?Boolean

Returns:

  • (Boolean)


235
236
237
# File 'lib/tep/pg.rb', line 235

def connected?
  @pgh >= 0
end

#error_messageObject



270
271
272
# File 'lib/tep/pg.rb', line 270

def error_message
  @pgh < 0 ? "" : Pg.tep_pg_error_message(@pgh)
end

#escape_identifier(s) ⇒ Object



554
555
556
# File 'lib/tep/pg.rb', line 554

def escape_identifier(s)
  Pg.tep_pg_escape_identifier(@pgh, s)
end

#escape_literal(s) ⇒ Object



558
559
560
# File 'lib/tep/pg.rb', line 558

def escape_literal(s)
  Pg.tep_pg_escape_literal(@pgh, s)
end

#escape_string(s) ⇒ Object



550
551
552
# File 'lib/tep/pg.rb', line 550

def escape_string(s)
  Pg.tep_pg_escape_string(@pgh, s)
end

#exec(sql) ⇒ Object

Run a no-params query. Returns a PG::Result on success.

ON ERROR IT RAISES the SQLSTATE-mapped PG::Error subclass (PG::UniqueViolation, PG::UndefinedTable, … -> PG::ServerError for unmapped states) – the ruby-pg / AR shape. The failed PGresult is freed before the raise; the SQLSTATE / message stay readable on ‘conn.last_sqlstate` / `#last_error_message` for post-rescue inspection:

begin
  c.exec(sql)
rescue PG::UniqueViolation => e
  ...                     # e.message + c.last_sqlstate
rescue PG::Error => e     # base catches any server error
  ...
end

Raising (instead of the old Result-on-error sentinel) became viable once spinel learned namespaced raise + hierarchy-walking rescue (matz/spinel#627 + #1041). NB: PG.connect is the one path that still does NOT raise – it returns a connection-failed instance so PG::Pool can type-seed without a live server (check ‘conn.connected?`).

Under ‘Tep::Server::Scheduled` this routes through the libpq async surface (PQsendQuery + PQflush + PQconsumeInput parked on Tep::Scheduler.io_wait), so other fibers in the same worker can run while the query is in flight. Under prefork it routes through the blocking PQexec. Both raise identically.



341
342
343
344
345
346
347
348
349
# File 'lib/tep/pg.rb', line 341

def exec(sql)
  if Tep::Scheduler.scheduled_context?
    return async_exec(sql)
  end
  rh = Pg.tep_pg_exec(@pgh, sql)
  r = PG::Result.new(rh)
  Connection.record_error_if_any(self, r)
  r
end

#exec_params(sql, params) ⇒ Object

Parameterised query with positional binds ($1, $2, …). ‘params` is an Array of String / Integer / nil. Same raise-on-error contract + auto-routing as `exec`.



354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
# File 'lib/tep/pg.rb', line 354

def exec_params(sql, params)
  Pg.tep_pg_param_clear
  i = 0
  n = params.length
  while i < n
    p = params[i]
    if p == nil
      Pg.tep_pg_param_push_null
    else
      Pg.tep_pg_param_push_str(p.to_s)
    end
    i += 1
  end
  if Tep::Scheduler.scheduled_context?
    return async_exec_params_after_clear(sql)
  end
  rh = Pg.tep_pg_exec_params(@pgh, sql)
  r = PG::Result.new(rh)
  Connection.record_error_if_any(self, r)
  r
end

#finishObject



247
248
249
# File 'lib/tep/pg.rb', line 247

def finish
  close
end

#last_notify_channelObject



304
305
306
# File 'lib/tep/pg.rb', line 304

def last_notify_channel
  Pg.tep_pg_notify_channel
end

#last_notify_payloadObject



308
309
310
# File 'lib/tep/pg.rb', line 308

def last_notify_payload
  Pg.tep_pg_notify_payload
end

#listen(channel) ⇒ Object

LISTEN / NOTIFY (Battery 2 chunk 2.2). Used by Tep::Broadcast’s PG backend for cross-worker pub/sub. Channel names must be safe SQL identifiers (no caller- controlled interpolation – use a hard-coded constant). Payload max size is 8000 bytes per PG default.



279
280
281
282
# File 'lib/tep/pg.rb', line 279

def listen(channel)
  return -1 if @pgh < 0
  Pg.tep_pg_listen(@pgh, channel)
end

#notify(channel, payload) ⇒ Object



289
290
291
292
# File 'lib/tep/pg.rb', line 289

def notify(channel, payload)
  return -1 if @pgh < 0
  Pg.tep_pg_notify(@pgh, channel, payload)
end

#poll_notification(timeout_ms) ⇒ Object

Block up to ‘timeout_ms` waiting for one notification on the connection. Returns 1 on receipt (caller then reads #last_notify_channel + #last_notify_payload), 0 on timeout, -1 on connection error. Connection must already be in LISTEN mode for the channel of interest.



299
300
301
302
# File 'lib/tep/pg.rb', line 299

def poll_notification(timeout_ms)
  return -1 if @pgh < 0
  Pg.tep_pg_poll_notification(@pgh, timeout_ms)
end

#resetObject



251
252
253
254
255
256
# File 'lib/tep/pg.rb', line 251

def reset
  if @pgh >= 0
    Pg.tep_pg_reset(@pgh)
  end
  self
end

#server_versionObject



266
267
268
# File 'lib/tep/pg.rb', line 266

def server_version
  @pgh < 0 ? 0 : Pg.tep_pg_server_version(@pgh)
end

#statusObject



258
259
260
# File 'lib/tep/pg.rb', line 258

def status
  @pgh < 0 ? PG::CONNECTION_BAD : Pg.tep_pg_status(@pgh)
end

#transaction_statusObject



262
263
264
# File 'lib/tep/pg.rb', line 262

def transaction_status
  @pgh < 0 ? PG::PQTRANS_UNKNOWN : Pg.tep_pg_transaction_status(@pgh)
end

#unlisten(channel) ⇒ Object



284
285
286
287
# File 'lib/tep/pg.rb', line 284

def unlisten(channel)
  return -1 if @pgh < 0
  Pg.tep_pg_unlisten(@pgh, channel)
end