Class: PG::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/tep/pg.rb

Overview

——– Connection pool ——–

PG::Pool – a fixed-size connection pool for PG::Connection instances. Mirrors ruby-pg’s ‘PG::Pool` shape from the external pg_pool gem (and the same idea as AR’s ConnectionPool): hold N pre-opened connections, hand them out via ‘checkout` / take them back via `checkin`, park cooperatively under `Tep::Server::Scheduled` when the free list is empty.

Typical use:

POOL = PG::Pool.new(ENV["DATABASE_URL"], 8)

get '/users/:id' do
  c = POOL.checkout
  r = c.exec_params("SELECT name FROM users WHERE id = $1",
                    [params[:id]])
  name = r.getvalue(0, 0)
  r.clear
  POOL.checkin(c)
  name
end

The block-form ‘with { |c| … }` is deferred until spinel lights up instance-method typed yields (matz/spinel#628 covers the top-level def case but not instance methods); manual checkout/checkin is the v1 shape.

Concurrency model:

- Under prefork (Tep::Server, the default): one Pool per
  worker process; eagerly opens its N conns at boot. N tunes
  the per-worker in-flight query count.
- Under Tep::Server::Scheduled: one Pool for the whole
  worker; checkouts that find the free list empty park via
  `Tep::Scheduler.pause(0.001)` until a checkin happens.
  Other fibers run in the meantime; eventually a checkin
  refills the free list and the parked fiber retries.

On exhaustion (non-scheduled callers only), checkout raises PG::PoolExhausted once it has waited past @checkout_timeout_ms. This used to be a sentinel nil-equivalent return because spinel couldn’t rescue module-namespaced exception classes; matz/spinel#1041 fixed that, so ‘rescue PG::PoolExhausted` / `rescue PG::Error` now work. The scheduled path parks indefinitely (waking on checkin) and so has no exhaustion timeout – only the spin fallback does.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url, size) ⇒ Pool

Returns a new instance of Pool.



988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
# File 'lib/tep/pg.rb', line 988

def initialize(url, size)
  @url = url
  @size = size
  @checkout_timeout_ms = 5000  # 5s default; bump for slow upstreams
  # Type-seed @free as PtrArray<PG::Connection>. PG::Connection.new
  # with an empty conninfo returns a connection-failed instance
  # (@pgh=-1, populated @last_error_message) rather than raising,
  # so this is safe to run at module load even when PG isn't
  # reachable.
  @free = [PG::Connection.new("")]
  @free.delete_at(0)
  # Waiter queue: IntArray of fiber indices into Tep::APP.sched_fibers.
  # `checkout` parks the current fiber here when @free is empty
  # (under Scheduled); `checkin` resumes the oldest waiter by
  # setting its wake_at = -1. Type-seed with an int + delete.
  @waiter_idxs = [0]
  @waiter_idxs.delete_at(0)
  # Eager open of N real conns. If the URL isn't reachable, each
  # Connection will have @pgh=-1; the caller can check
  # `pool.healthy?` after construction.
  i = 0
  while i < size
    c = PG::Connection.new(url)
    @free.push(c)
    i += 1
  end
end

Instance Attribute Details

#checkout_timeout_msObject

Returns the value of attribute checkout_timeout_ms.



986
987
988
# File 'lib/tep/pg.rb', line 986

def checkout_timeout_ms
  @checkout_timeout_ms
end

#freeObject

Returns the value of attribute free.



986
987
988
# File 'lib/tep/pg.rb', line 986

def free
  @free
end

#sizeObject

Returns the value of attribute size.



986
987
988
# File 'lib/tep/pg.rb', line 986

def size
  @size
end

#urlObject

Returns the value of attribute url.



986
987
988
# File 'lib/tep/pg.rb', line 986

def url
  @url
end

#waiter_idxsObject

Returns the value of attribute waiter_idxs.



986
987
988
# File 'lib/tep/pg.rb', line 986

def waiter_idxs
  @waiter_idxs
end

Instance Method Details

#availableObject

Diagnostic: how many connections are currently available.



1114
1115
1116
# File 'lib/tep/pg.rb', line 1114

def available
  @free.length
end

#checkin(c) ⇒ Object

Return a connection to the pool. If there’s a parked waiter, wake it (push to @free + set wake_at=-1 on the waiter’s fiber index). Otherwise just push to @free.



1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
# File 'lib/tep/pg.rb', line 1079

def checkin(c)
  @free.push(c)
  if @waiter_idxs.length > 0
    widx = @waiter_idxs.delete_at(0)
    # wake_at = -1 makes the fiber the "earliest due" in the
    # next tick's pick (the tick comparator chooses the lowest
    # wake_at among the time-due set, so -1 always wins).
    Tep::APP.sched_wake_at[widx] = -1
  end
  0
end

#checkoutObject

Acquire a connection. Returns a PG::Connection on success.

Two paths:

- Under Tep::Server::Scheduled: park the current fiber in
  the pool's waiter queue (via Fiber.yield with a far-future
  wake_at sentinel). `checkin` wakes the oldest waiter by
  setting its wake_at=-1, which marks it as due on the next
  scheduler tick. No busy-spin -- the scheduler runs other
  fibers (handlers, accept loop, async-exec parkers) until
  a checkin happens.

- Outside scheduled context (prefork-blocking or top-level
  code): fall back to a small-step pause-and-retry. Each
  worker is single-threaded in prefork, so a busy
  checkout-on-empty only happens if user code holds two
  checkouts inside one handler. Document; rarely matters.


1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
# File 'lib/tep/pg.rb', line 1054

def checkout
  if @free.length > 0
    return @free.delete_at(0)
  end
  if !Tep::Scheduler.scheduled_context?
    return checkout_spin_fallback
  end
  # Cooperative wait. Stash our fiber index, park, wait for
  # checkin to set wake_at=-1.
  idx = Tep::APP.sched_current
  @waiter_idxs.push(idx)
  # Far-future sentinel: the scheduler won't pick us as
  # time-due until checkin lowers our wake_at. Tep::Scheduler's
  # int-second resolution means "not soon enough to matter"
  # = a few hours.
  Tep::APP.sched_wake_at[idx] = Time.now.to_i + 86400
  Fiber.yield
  # When we resume, checkin pushed a conn to @free + woke us.
  # Pop it.
  @free.delete_at(0)
end

#checkout_spin_fallbackObject

Pause-and-retry fallback for non-scheduled callers. Used by checkout when called outside a fiber. Since pause’s seconds arg is stored as an mrb_int (rounds sub-second values to 0), this actually busy-spins under the scheduler – but the branch is only taken outside scheduled context, so there’s no fiber starvation concern: the worker is single-threaded and either has a free conn or doesn’t.



1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
# File 'lib/tep/pg.rb', line 1098

def checkout_spin_fallback
  waited_ms = 0
  while @free.length == 0
    Tep::Scheduler.pause(1)   # full-second pause; non-scheduled fallback
    waited_ms += 1000
    if waited_ms >= @checkout_timeout_ms
      raise PG::PoolExhausted,
            "PG::Pool#checkout timed out after " +
            @checkout_timeout_ms.to_s + "ms; all " +
            @size.to_s + " connections in use"
    end
  end
  @free.delete_at(0)
end

#close_allObject

Close every connection. Call at app shutdown if needed; the OS recovers them on process exit anyway.



1120
1121
1122
1123
1124
1125
1126
# File 'lib/tep/pg.rb', line 1120

def close_all
  while @free.length > 0
    c = @free.delete_at(0)
    c.close
  end
  0
end

#healthy?Boolean

True iff every pooled connection opened successfully. Use after construction to fail loud rather than handing out broken conns:

POOL = PG::Pool.new(url, 8)
raise "PG unreachable" unless POOL.healthy?

Returns:

  • (Boolean)


1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
# File 'lib/tep/pg.rb', line 1022

def healthy?
  i = 0
  while i < @free.length
    if !@free[i].connected?
      return false
    end
    i += 1
  end
  @free.length == @size
end

#set_checkout_timeout_ms(ms) ⇒ Object



1033
1034
1035
# File 'lib/tep/pg.rb', line 1033

def set_checkout_timeout_ms(ms)
  @checkout_timeout_ms = ms
end