Class: PG::Pool
- Inherits:
-
Object
- Object
- PG::Pool
- Defined in:
- lib/tep/pg.rb
Overview
——– Connection pool ——–
PG::Pool – a fixed-size connection pool for PG::Connection instances. Mirrors ruby-pg’s ‘PG::Pool` shape from the external pg_pool gem (and the same idea as AR’s ConnectionPool): hold N pre-opened connections, hand them out via ‘checkout` / take them back via `checkin`, park cooperatively under `Tep::Server::Scheduled` when the free list is empty.
Typical use:
POOL = PG::Pool.new(ENV["DATABASE_URL"], 8)
get '/users/:id' do
c = POOL.checkout
r = c.exec_params("SELECT name FROM users WHERE id = $1",
[params[:id]])
name = r.getvalue(0, 0)
r.clear
POOL.checkin(c)
name
end
The block-form ‘with { |c| … }` is deferred until spinel lights up instance-method typed yields (matz/spinel#628 covers the top-level def case but not instance methods); manual checkout/checkin is the v1 shape.
Concurrency model:
- Under prefork (Tep::Server, the default): one Pool per
worker process; eagerly opens its N conns at boot. N tunes
the per-worker in-flight query count.
- Under Tep::Server::Scheduled: one Pool for the whole
worker; checkouts that find the free list empty park via
`Tep::Scheduler.pause(0.001)` until a checkin happens.
Other fibers run in the meantime; eventually a checkin
refills the free list and the parked fiber retries.
On exhaustion (non-scheduled callers only), checkout raises PG::PoolExhausted once it has waited past @checkout_timeout_ms. This used to be a sentinel nil-equivalent return because spinel couldn’t rescue module-namespaced exception classes; matz/spinel#1041 fixed that, so ‘rescue PG::PoolExhausted` / `rescue PG::Error` now work. The scheduled path parks indefinitely (waking on checkin) and so has no exhaustion timeout – only the spin fallback does.
Instance Attribute Summary collapse
-
#checkout_timeout_ms ⇒ Object
Returns the value of attribute checkout_timeout_ms.
-
#free ⇒ Object
Returns the value of attribute free.
-
#size ⇒ Object
Returns the value of attribute size.
-
#url ⇒ Object
Returns the value of attribute url.
-
#waiter_idxs ⇒ Object
Returns the value of attribute waiter_idxs.
Instance Method Summary collapse
-
#available ⇒ Object
Diagnostic: how many connections are currently available.
-
#checkin(c) ⇒ Object
Return a connection to the pool.
-
#checkout ⇒ Object
Acquire a connection.
-
#checkout_spin_fallback ⇒ Object
Pause-and-retry fallback for non-scheduled callers.
-
#close_all ⇒ Object
Close every connection.
-
#healthy? ⇒ Boolean
True iff every pooled connection opened successfully.
-
#initialize(url, size) ⇒ Pool
constructor
A new instance of Pool.
- #set_checkout_timeout_ms(ms) ⇒ Object
Constructor Details
#initialize(url, size) ⇒ Pool
Returns a new instance of Pool.
988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 |
# File 'lib/tep/pg.rb', line 988 def initialize(url, size) @url = url @size = size @checkout_timeout_ms = 5000 # 5s default; bump for slow upstreams # Type-seed @free as PtrArray<PG::Connection>. PG::Connection.new # with an empty conninfo returns a connection-failed instance # (@pgh=-1, populated @last_error_message) rather than raising, # so this is safe to run at module load even when PG isn't # reachable. @free = [PG::Connection.new("")] @free.delete_at(0) # Waiter queue: IntArray of fiber indices into Tep::APP.sched_fibers. # `checkout` parks the current fiber here when @free is empty # (under Scheduled); `checkin` resumes the oldest waiter by # setting its wake_at = -1. Type-seed with an int + delete. @waiter_idxs = [0] @waiter_idxs.delete_at(0) # Eager open of N real conns. If the URL isn't reachable, each # Connection will have @pgh=-1; the caller can check # `pool.healthy?` after construction. i = 0 while i < size c = PG::Connection.new(url) @free.push(c) i += 1 end end |
Instance Attribute Details
#checkout_timeout_ms ⇒ Object
Returns the value of attribute checkout_timeout_ms.
986 987 988 |
# File 'lib/tep/pg.rb', line 986 def checkout_timeout_ms @checkout_timeout_ms end |
#free ⇒ Object
Returns the value of attribute free.
986 987 988 |
# File 'lib/tep/pg.rb', line 986 def free @free end |
#size ⇒ Object
Returns the value of attribute size.
986 987 988 |
# File 'lib/tep/pg.rb', line 986 def size @size end |
#url ⇒ Object
Returns the value of attribute url.
986 987 988 |
# File 'lib/tep/pg.rb', line 986 def url @url end |
#waiter_idxs ⇒ Object
Returns the value of attribute waiter_idxs.
986 987 988 |
# File 'lib/tep/pg.rb', line 986 def waiter_idxs @waiter_idxs end |
Instance Method Details
#available ⇒ Object
Diagnostic: how many connections are currently available.
1114 1115 1116 |
# File 'lib/tep/pg.rb', line 1114 def available @free.length end |
#checkin(c) ⇒ Object
Return a connection to the pool. If there’s a parked waiter, wake it (push to @free + set wake_at=-1 on the waiter’s fiber index). Otherwise just push to @free.
1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 |
# File 'lib/tep/pg.rb', line 1079 def checkin(c) @free.push(c) if @waiter_idxs.length > 0 widx = @waiter_idxs.delete_at(0) # wake_at = -1 makes the fiber the "earliest due" in the # next tick's pick (the tick comparator chooses the lowest # wake_at among the time-due set, so -1 always wins). Tep::APP.sched_wake_at[widx] = -1 end 0 end |
#checkout ⇒ Object
Acquire a connection. Returns a PG::Connection on success.
Two paths:
- Under Tep::Server::Scheduled: park the current fiber in
the pool's waiter queue (via Fiber.yield with a far-future
wake_at sentinel). `checkin` wakes the oldest waiter by
setting its wake_at=-1, which marks it as due on the next
scheduler tick. No busy-spin -- the scheduler runs other
fibers (handlers, accept loop, async-exec parkers) until
a checkin happens.
- Outside scheduled context (prefork-blocking or top-level
code): fall back to a small-step pause-and-retry. Each
worker is single-threaded in prefork, so a busy
checkout-on-empty only happens if user code holds two
checkouts inside one handler. Document; rarely matters.
1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 |
# File 'lib/tep/pg.rb', line 1054 def checkout if @free.length > 0 return @free.delete_at(0) end if !Tep::Scheduler.scheduled_context? return checkout_spin_fallback end # Cooperative wait. Stash our fiber index, park, wait for # checkin to set wake_at=-1. idx = Tep::APP.sched_current @waiter_idxs.push(idx) # Far-future sentinel: the scheduler won't pick us as # time-due until checkin lowers our wake_at. Tep::Scheduler's # int-second resolution means "not soon enough to matter" # = a few hours. Tep::APP.sched_wake_at[idx] = Time.now.to_i + 86400 Fiber.yield # When we resume, checkin pushed a conn to @free + woke us. # Pop it. @free.delete_at(0) end |
#checkout_spin_fallback ⇒ Object
Pause-and-retry fallback for non-scheduled callers. Used by checkout when called outside a fiber. Since pause’s seconds arg is stored as an mrb_int (rounds sub-second values to 0), this actually busy-spins under the scheduler – but the branch is only taken outside scheduled context, so there’s no fiber starvation concern: the worker is single-threaded and either has a free conn or doesn’t.
1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 |
# File 'lib/tep/pg.rb', line 1098 def checkout_spin_fallback waited_ms = 0 while @free.length == 0 Tep::Scheduler.pause(1) # full-second pause; non-scheduled fallback waited_ms += 1000 if waited_ms >= @checkout_timeout_ms raise PG::PoolExhausted, "PG::Pool#checkout timed out after " + @checkout_timeout_ms.to_s + "ms; all " + @size.to_s + " connections in use" end end @free.delete_at(0) end |
#close_all ⇒ Object
Close every connection. Call at app shutdown if needed; the OS recovers them on process exit anyway.
1120 1121 1122 1123 1124 1125 1126 |
# File 'lib/tep/pg.rb', line 1120 def close_all while @free.length > 0 c = @free.delete_at(0) c.close end 0 end |
#healthy? ⇒ Boolean
1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 |
# File 'lib/tep/pg.rb', line 1022 def healthy? i = 0 while i < @free.length if !@free[i].connected? return false end i += 1 end @free.length == @size end |
#set_checkout_timeout_ms(ms) ⇒ Object
1033 1034 1035 |
# File 'lib/tep/pg.rb', line 1033 def set_checkout_timeout_ms(ms) @checkout_timeout_ms = ms end |