Class: PG::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/tep/pg.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ Connection

Returns a new instance of Connection.



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/tep/pg.rb', line 188

def initialize(opts)
  @pgh = -1
  @last_sqlstate = ""
  @last_error_message = ""
  @last_result_rh = -1
  if opts.is_a?(String)
    if Tep::Scheduler.scheduled_context?
      h = Connection.async_connect(opts)
    else
      h = Pg.tep_pg_connect(opts)
    end
  else
    # ============ WORKAROUND -- REMOVE WHEN UPSTREAM LANDS ============
    # Hash-conninfo form. The `opts.each` below miscompiles at spinel
    # master: `opts` is a String|Hash param, but in this is_a?(String)
    # ELSE branch spinel types it String (the is_a?-else narrowing
    # gap, matz/spinel#1434) and rejects `opts.each` as String#each
    # -- the lone blocker to re-pinning tep onto master (tep#196).
    #
    # The Hash form is currently UNUSED + untested in tep and toy:
    # every PG::Connection.new / PG.connect caller passes a String
    # conninfo (AR connects with the String form too). So we stub
    # this dead branch to a failed connection to unblock the re-pin.
    #
    # RESTORE the original kv-pack loop (preserved below) once the
    # upstream narrowing fix lands, and re-add Hash-form test
    # coverage. Until then a Hash arg yields a failed Connection
    # (connected? == false) rather than a miscompile.
    #
    #   keys = ""
    #   vals = ""
    #   n = 0
    #   opts.each do |k, v|
    #     keys = keys + k + "\0"
    #     vals = vals + v + "\0"
    #     n += 1
    #   end
    #   h = Pg.tep_pg_connect_kv(keys, vals, n)
    h = -1
    # =================================================================
  end
  if h < 0
    # Slot 0 holds the most recent connect-failure error message
    # (PQstatus on a failed PQconnectdb still gives a readable
    # error, but the conn itself is closed by the time we get
    # here -- the shim stashes the message before PQfinish).
    @last_error_message = Pg.tep_pg_error_message(0)
    @last_sqlstate = ""
    # Connection-failure surfaces via `c.last_error_message` +
    # `c.connected?` after the constructor returns -- the
    # constructor stays non-raising on purpose (PG::Pool seeds its
    # free list with `PG::Connection.new("")` before a server is
    # reachable; a raising constructor would blow up at module
    # load). Callers must check `c.connected?` before exec. NB:
    # this is the lone non-raising path -- query methods raise
    # PG::Error subclasses now that spinel supports namespaced
    # raise + rescue (matz/spinel#627 + #1041).
  end
  @pgh = h
end

Instance Attribute Details

#last_error_messageObject

Error context for the most recent exception raised by this connection. Spinel’s ‘raise X.new(msg, …)` lowering doesn’t handle custom initializers (#622), so the SQLSTATE / message / owning-result-handle live here instead. Read after ‘rescue PG::Error => e`:

begin; conn.exec_params(sql, params)
rescue PG::Error => e
  sqlstate = conn.last_sqlstate
  full_msg = conn.last_error_message
end

AR’s ‘translate_exception_class(message, sql, binds)` uses `e.is_a?(PG::UniqueViolation)` etc., which still works – the class hierarchy is intact; only the per-exception accessors move to the connection.



186
187
188
# File 'lib/tep/pg.rb', line 186

def last_error_message
  @last_error_message
end

#last_result_rhObject

Error context for the most recent exception raised by this connection. Spinel’s ‘raise X.new(msg, …)` lowering doesn’t handle custom initializers (#622), so the SQLSTATE / message / owning-result-handle live here instead. Read after ‘rescue PG::Error => e`:

begin; conn.exec_params(sql, params)
rescue PG::Error => e
  sqlstate = conn.last_sqlstate
  full_msg = conn.last_error_message
end

AR’s ‘translate_exception_class(message, sql, binds)` uses `e.is_a?(PG::UniqueViolation)` etc., which still works – the class hierarchy is intact; only the per-exception accessors move to the connection.



186
187
188
# File 'lib/tep/pg.rb', line 186

def last_result_rh
  @last_result_rh
end

#last_sqlstateObject

Error context for the most recent exception raised by this connection. Spinel’s ‘raise X.new(msg, …)` lowering doesn’t handle custom initializers (#622), so the SQLSTATE / message / owning-result-handle live here instead. Read after ‘rescue PG::Error => e`:

begin; conn.exec_params(sql, params)
rescue PG::Error => e
  sqlstate = conn.last_sqlstate
  full_msg = conn.last_error_message
end

AR’s ‘translate_exception_class(message, sql, binds)` uses `e.is_a?(PG::UniqueViolation)` etc., which still works – the class hierarchy is intact; only the per-exception accessors move to the connection.



186
187
188
# File 'lib/tep/pg.rb', line 186

def last_sqlstate
  @last_sqlstate
end

#pghObject

‘:pgh` rather than `:handle` – same poly-dispatch widening concern as Tep::SQLite#dbh (sharing a method name with Tep::Handler#handle confuses spinel’s same-named-imeth-across- classes unifier).



169
170
171
# File 'lib/tep/pg.rb', line 169

def pgh
  @pgh
end

Class Method Details

.async_connect(conninfo) ⇒ Object

Drive PQconnectStart + PQconnectPoll, parking on io_wait between poll calls. Returns the conn slot (>=1) on success or -1 on failure. The C shim’s tep_pg_connect_poll stashes the libpq error message on a FAILED return so ‘Pg.tep_pg_error_message(0)` still surfaces the diagnostic for the Connection.new “connect failed” branch.

libpq’s PostgresPollingStatusType:

0 = PGRES_POLLING_FAILED
1 = PGRES_POLLING_READING    (wait for fd READ-ready)
2 = PGRES_POLLING_WRITING    (wait for fd WRITE-ready)
3 = PGRES_POLLING_OK         (connected; stop polling)


464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
# File 'lib/tep/pg.rb', line 464

def self.async_connect(conninfo)
  h = Pg.tep_pg_connect_start(conninfo)
  if h < 0
    return -1
  end
  fd = Pg.tep_pg_socket(h)
  while true
    state = Pg.tep_pg_connect_poll(h)
    if state == 3
      # PGRES_POLLING_OK
      Pg.tep_pg_set_client_encoding(h, "UTF8")
      return h
    end
    if state == 0
      # PGRES_POLLING_FAILED. The shim has already stashed the
      # error message; we PQfinish the slot.
      Pg.tep_pg_finish(h)
      return -1
    end
    mode = state == 1 ? Tep::Scheduler::READ : Tep::Scheduler::WRITE
    Tep::Scheduler.io_wait(fd, mode, 10)
  end
  -1
end

.drain_remaining_results(pgh) ⇒ Object

After the first PQgetResult returned a real Result, libpq requires the conn be drained via additional PQgetResult calls until NULL is returned. This is a fast in-memory drain (no network), but it has to happen between async_exec calls or the next send_query will fail. Each tep_pg_get_result call that produces a non-NULL result stashes it in the slot table; we PQclear those immediately since they’re trailing status results we don’t expose.



551
552
553
554
555
556
557
558
559
560
561
562
# File 'lib/tep/pg.rb', line 551

def self.drain_remaining_results(pgh)
  while true
    rh = Pg.tep_pg_get_result(pgh)
    if rh < 0
      return 0
    end
    # A trailing result -- shouldn't normally happen for
    # single-statement queries, but defensively free.
    Pg.tep_pg_clear(rh)
  end
  0
end

.drain_send(pgh) ⇒ Object

Drain libpq’s send buffer. PQflush returns 0 when done; 1 when the kernel send-buffer is full and we should park on WRITE-ready; -1 on error. Timeout is generous (10s); a genuinely-stuck PG is the rare case worth bailing on.



508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
# File 'lib/tep/pg.rb', line 508

def self.drain_send(pgh)
  fd = Pg.tep_pg_socket(pgh)
  while true
    rc = Pg.tep_pg_flush(pgh)
    if rc == 0
      return 0
    end
    if rc < 0
      return -1
    end
    # rc == 1: send buffer full, park on writability.
    Tep::Scheduler.io_wait(fd, Tep::Scheduler::WRITE, 10)
  end
  0
end

.escape_string(s) ⇒ Object

Class-method form – ruby-pg allows escape_string and quote_ident without a live conn. We route through slot 0 which the shim treats as “no conn, fall back to standalone PQescapeString”. Use the instance method when a conn is available – it goes through PQescapeStringConn which is the standards-compliant path.



582
583
584
# File 'lib/tep/pg.rb', line 582

def self.escape_string(s)
  Pg.tep_pg_escape_string(0, s)
end

.quote_ident(s) ⇒ Object



586
587
588
589
590
591
# File 'lib/tep/pg.rb', line 586

def self.quote_ident(s)
  # PQescapeIdentifier requires a conn; without one we fall
  # through to "" which is wrong but rare. Apps with a live
  # PG::Connection should use the instance method.
  Pg.tep_pg_escape_identifier(0, s)
end

.raise_send_failure(conn) ⇒ Object

PQsendQuery returned 0 (immediate failure – conn already closed, send buffer error, etc.). Mirror the error onto the conn’s last_* and raise, matching the exec error path (ruby-pg surfaces a send failure as PG::UnableToSend < PG::Error). No SQLSTATE is available pre-result, so this maps to the transport leaf rather than going through raise_for_sqlstate.

Raises:



497
498
499
500
501
502
# File 'lib/tep/pg.rb', line 497

def self.raise_send_failure(conn)
  conn.last_sqlstate = ""
  conn.last_error_message = conn.error_message
  conn.last_result_rh = -1
  raise PG::UnableToSend, conn.error_message
end

.record_error_if_any(conn, r) ⇒ Object

If the Result is in an error state, mirror SQLSTATE + message + result-handle onto the conn so post-rescue (or post-‘if !r.ok?`) callers can read them via `conn.last_*`. No raise here – see the docstring on `exec` for why.



597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
# File 'lib/tep/pg.rb', line 597

def self.record_error_if_any(conn, r)
  st = r.status
  if st == Pg::RES_TUPLES || st == Pg::RES_COMMAND || st == Pg::RES_EMPTY
    return 0
  end
  sqlstate = r.error_field(PG::DIAG_SQLSTATE)
  msg = r.error_message
  if msg.length == 0
    msg = conn.error_message
  end
  conn.last_sqlstate = sqlstate
  conn.last_error_message = msg
  # Free the failed PGresult NOW: once we raise out of
  # exec/exec_params the caller's `r.clear` never runs, so this is
  # the only chance to release it. The SQLSTATE / message are
  # already copied onto conn.last_* (Strings) for post-rescue
  # inspection, so dropping the handle loses nothing callers need.
  conn.last_result_rh = -1
  r.clear
  # ruby-pg / AR parity: raise the SQLSTATE-mapped PG::Error
  # subclass (live since matz/spinel#627 + #1041 -- namespaced
  # raise + hierarchy-walking rescue). Callers `rescue
  # PG::UniqueViolation` (leaf) or `rescue PG::Error` (base).
  PG.raise_for_sqlstate(sqlstate, msg)
  0
end

.wait_for_result_ready(pgh) ⇒ Object

Wait until PQisBusy returns 0 (PQgetResult won’t block). Pumps PQconsumeInput in between io_wait calls so the libpq state machine advances. Timeout is generous (30s) since the query itself can take that long; the io_wait timeout is per-iteration, not cumulative.



529
530
531
532
533
534
535
536
537
538
539
540
541
# File 'lib/tep/pg.rb', line 529

def self.wait_for_result_ready(pgh)
  fd = Pg.tep_pg_socket(pgh)
  while true
    if Pg.tep_pg_consume_input(pgh) != 1
      return -1
    end
    if Pg.tep_pg_is_busy(pgh) == 0
      return 0
    end
    Tep::Scheduler.io_wait(fd, Tep::Scheduler::READ, 30)
  end
  0
end

Instance Method Details

#async_exec(sql) ⇒ Object

Explicit async exec. Same shape as ‘exec` but doesn’t context-detect – always uses the libpq async surface. If called outside Tep::Server::Scheduled, Tep::Scheduler.io_wait falls back to a single-shot poll(2), so this still works under prefork (just without the cross-fiber concurrency win).



396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
# File 'lib/tep/pg.rb', line 396

def async_exec(sql)
  Pg.tep_pg_set_nonblocking(@pgh, 1)
  ok = Pg.tep_pg_send_query(@pgh, sql)
  if ok != 1
    Connection.raise_send_failure(self)
  end
  Connection.drain_send(@pgh)
  Connection.wait_for_result_ready(@pgh)
  rh = Pg.tep_pg_get_result(@pgh)
  r = PG::Result.new(rh)
  # Drain trailing NULL terminator (libpq requires reading
  # until PQgetResult returns NULL to mark the conn ready for
  # the next send_query).
  Connection.drain_remaining_results(@pgh)
  Connection.record_error_if_any(self, r)
  r
end

#async_exec_params(sql, params) ⇒ Object

Parameterised async exec. ‘params` is an Array of String / Integer / nil; same conversion as exec_params.



416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
# File 'lib/tep/pg.rb', line 416

def async_exec_params(sql, params)
  Pg.tep_pg_param_clear
  i = 0
  n = params.length
  while i < n
    p = params[i]
    if p == nil
      Pg.tep_pg_param_push_null
    else
      Pg.tep_pg_param_push_str(p.to_s)
    end
    i += 1
  end
  async_exec_params_after_clear(sql)
end

#async_exec_params_after_clear(sql) ⇒ Object

Internal: param accumulator has already been populated by the caller (either exec_params routing here on context detect, or async_exec_params after its own push loop).



435
436
437
438
439
440
441
442
443
444
445
446
447
448
# File 'lib/tep/pg.rb', line 435

def async_exec_params_after_clear(sql)
  Pg.tep_pg_set_nonblocking(@pgh, 1)
  ok = Pg.tep_pg_send_query_params(@pgh, sql)
  if ok != 1
    Connection.raise_send_failure(self)
  end
  Connection.drain_send(@pgh)
  Connection.wait_for_result_ready(@pgh)
  rh = Pg.tep_pg_get_result(@pgh)
  r = PG::Result.new(rh)
  Connection.drain_remaining_results(@pgh)
  Connection.record_error_if_any(self, r)
  r
end

#closeObject



253
254
255
256
257
258
259
# File 'lib/tep/pg.rb', line 253

def close
  if @pgh >= 0
    Pg.tep_pg_finish(@pgh)
    @pgh = -1
  end
  0
end

#connected?Boolean

Returns:

  • (Boolean)


249
250
251
# File 'lib/tep/pg.rb', line 249

def connected?
  @pgh >= 0
end

#error_messageObject



284
285
286
# File 'lib/tep/pg.rb', line 284

def error_message
  @pgh < 0 ? "" : Pg.tep_pg_error_message(@pgh)
end

#escape_identifier(s) ⇒ Object



568
569
570
# File 'lib/tep/pg.rb', line 568

def escape_identifier(s)
  Pg.tep_pg_escape_identifier(@pgh, s)
end

#escape_literal(s) ⇒ Object



572
573
574
# File 'lib/tep/pg.rb', line 572

def escape_literal(s)
  Pg.tep_pg_escape_literal(@pgh, s)
end

#escape_string(s) ⇒ Object



564
565
566
# File 'lib/tep/pg.rb', line 564

def escape_string(s)
  Pg.tep_pg_escape_string(@pgh, s)
end

#exec(sql) ⇒ Object

Run a no-params query. Returns a PG::Result on success.

ON ERROR IT RAISES the SQLSTATE-mapped PG::Error subclass (PG::UniqueViolation, PG::UndefinedTable, … -> PG::ServerError for unmapped states) – the ruby-pg / AR shape. The failed PGresult is freed before the raise; the SQLSTATE / message stay readable on ‘conn.last_sqlstate` / `#last_error_message` for post-rescue inspection:

begin
  c.exec(sql)
rescue PG::UniqueViolation => e
  ...                     # e.message + c.last_sqlstate
rescue PG::Error => e     # base catches any server error
  ...
end

Raising (instead of the old Result-on-error sentinel) became viable once spinel learned namespaced raise + hierarchy-walking rescue (matz/spinel#627 + #1041). NB: PG.connect is the one path that still does NOT raise – it returns a connection-failed instance so PG::Pool can type-seed without a live server (check ‘conn.connected?`).

Under ‘Tep::Server::Scheduled` this routes through the libpq async surface (PQsendQuery + PQflush + PQconsumeInput parked on Tep::Scheduler.io_wait), so other fibers in the same worker can run while the query is in flight. Under prefork it routes through the blocking PQexec. Both raise identically.



355
356
357
358
359
360
361
362
363
# File 'lib/tep/pg.rb', line 355

def exec(sql)
  if Tep::Scheduler.scheduled_context?
    return async_exec(sql)
  end
  rh = Pg.tep_pg_exec(@pgh, sql)
  r = PG::Result.new(rh)
  Connection.record_error_if_any(self, r)
  r
end

#exec_params(sql, params) ⇒ Object

Parameterised query with positional binds ($1, $2, …). ‘params` is an Array of String / Integer / nil. Same raise-on-error contract + auto-routing as `exec`.



368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
# File 'lib/tep/pg.rb', line 368

def exec_params(sql, params)
  Pg.tep_pg_param_clear
  i = 0
  n = params.length
  while i < n
    p = params[i]
    if p == nil
      Pg.tep_pg_param_push_null
    else
      Pg.tep_pg_param_push_str(p.to_s)
    end
    i += 1
  end
  if Tep::Scheduler.scheduled_context?
    return async_exec_params_after_clear(sql)
  end
  rh = Pg.tep_pg_exec_params(@pgh, sql)
  r = PG::Result.new(rh)
  Connection.record_error_if_any(self, r)
  r
end

#finishObject



261
262
263
# File 'lib/tep/pg.rb', line 261

def finish
  close
end

#last_notify_channelObject



318
319
320
# File 'lib/tep/pg.rb', line 318

def last_notify_channel
  Pg.tep_pg_notify_channel
end

#last_notify_payloadObject



322
323
324
# File 'lib/tep/pg.rb', line 322

def last_notify_payload
  Pg.tep_pg_notify_payload
end

#listen(channel) ⇒ Object

LISTEN / NOTIFY (Battery 2 chunk 2.2). Used by Tep::Broadcast’s PG backend for cross-worker pub/sub. Channel names must be safe SQL identifiers (no caller- controlled interpolation – use a hard-coded constant). Payload max size is 8000 bytes per PG default.



293
294
295
296
# File 'lib/tep/pg.rb', line 293

def listen(channel)
  return -1 if @pgh < 0
  Pg.tep_pg_listen(@pgh, channel)
end

#notify(channel, payload) ⇒ Object



303
304
305
306
# File 'lib/tep/pg.rb', line 303

def notify(channel, payload)
  return -1 if @pgh < 0
  Pg.tep_pg_notify(@pgh, channel, payload)
end

#poll_notification(timeout_ms) ⇒ Object

Block up to ‘timeout_ms` waiting for one notification on the connection. Returns 1 on receipt (caller then reads #last_notify_channel + #last_notify_payload), 0 on timeout, -1 on connection error. Connection must already be in LISTEN mode for the channel of interest.



313
314
315
316
# File 'lib/tep/pg.rb', line 313

def poll_notification(timeout_ms)
  return -1 if @pgh < 0
  Pg.tep_pg_poll_notification(@pgh, timeout_ms)
end

#resetObject



265
266
267
268
269
270
# File 'lib/tep/pg.rb', line 265

def reset
  if @pgh >= 0
    Pg.tep_pg_reset(@pgh)
  end
  self
end

#server_versionObject



280
281
282
# File 'lib/tep/pg.rb', line 280

def server_version
  @pgh < 0 ? 0 : Pg.tep_pg_server_version(@pgh)
end

#statusObject



272
273
274
# File 'lib/tep/pg.rb', line 272

def status
  @pgh < 0 ? PG::CONNECTION_BAD : Pg.tep_pg_status(@pgh)
end

#transaction_statusObject



276
277
278
# File 'lib/tep/pg.rb', line 276

def transaction_status
  @pgh < 0 ? PG::PQTRANS_UNKNOWN : Pg.tep_pg_transaction_status(@pgh)
end

#unlisten(channel) ⇒ Object



298
299
300
301
# File 'lib/tep/pg.rb', line 298

def unlisten(channel)
  return -1 if @pgh < 0
  Pg.tep_pg_unlisten(@pgh, channel)
end