Class: Mongo::Server::ConnectionPool

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Loggable, Monitoring::Publishable
Defined in:
lib/mongo/server/connection_pool.rb,
lib/mongo/server/connection_pool/populator.rb,
lib/mongo/server/connection_pool/generation_manager.rb

Overview

Represents a connection pool for server connections.

Since:

  • 2.0.0, largely rewritten in 2.9.0

Defined Under Namespace

Classes: GenerationManager, Populator

Constant Summary collapse

DEFAULT_MAX_SIZE =

The default max size for the connection pool.

Since:

  • 2.9.0

20
DEFAULT_MIN_SIZE =

The default min size for the connection pool.

Since:

  • 2.9.0

0
DEFAULT_MAX_CONNECTING =

The default maximum number of connections that can be connecting at any given time.

Since:

  • 2.0.0, largely rewritten in 2.9.0

2
DEFAULT_WAIT_TIMEOUT =

The default timeout, in seconds, to wait for a connection.

This timeout applies while in flow threads are waiting for background threads to establish connections (and hence they must connect, handshake and auth in the allotted time).

It is currently set to 10 seconds. The default connect timeout is 10 seconds by itself, but setting large timeouts can get applications in trouble if their requests get timed out by the reverse proxy, thus anything over 15 seconds is potentially dangerous.

Since:

  • 2.9.0

10

Constants included from Loggable

Loggable::PREFIX

Instance Attribute Summary collapse

Attributes included from Monitoring::Publishable

#monitoring

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Monitoring::Publishable

#publish_cmap_event, #publish_event, #publish_sdam_event

Methods included from Loggable

#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger

Constructor Details

#initialize(server, options = {}) ⇒ ConnectionPool

Create the new connection pool.

Note: Additionally, options for connections created by this pool should

be included in the options passed here, and they will be forwarded to
any connections created by the pool.

Parameters:

  • server (Server)

    The server which this connection pool is for.

  • options (Hash) (defaults to: {})

    The connection pool options.

Options Hash (options):

  • :max_size (Integer)

    The maximum pool size. Setting this option to zero creates an unlimited connection pool.

  • :max_connecting (Integer)

    The maximum number of connections that can be connecting simultaneously. The default is 2. This option should be increased if there are many threads that share same connection pool and the application is experiencing timeouts while waiting for connections to be established.

  • :max_pool_size (Integer)

    Deprecated. The maximum pool size. If max_size is also given, max_size and max_pool_size must be identical.

  • :min_size (Integer)

    The minimum pool size.

  • :min_pool_size (Integer)

    Deprecated. The minimum pool size. If min_size is also given, min_size and min_pool_size must be identical.

  • :wait_timeout (Float)

    The time to wait, in seconds, for a free connection.

  • :wait_queue_timeout (Float)

    Deprecated. Alias for :wait_timeout. If both wait_timeout and wait_queue_timeout are given, their values must be identical.

  • :max_idle_time (Float)

    The time, in seconds, after which idle connections should be closed by the pool.

  • :populator_io (true, false)

    For internal driver use only. Set to false to prevent the populator threads from being created and started in the server’s connection pool. It is intended for use in tests that also turn off monitoring_io, unless the populator is explicitly needed. If monitoring_io is off, but the populator_io is on, the populator needs to be manually closed at the end of the test, since a cluster without monitoring is considered not connected, and thus will not clean up the connection pool populator threads on close.

Raises:

  • (ArgumentError)

Since:

  • 2.0.0, API changed in 2.9.0



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/mongo/server/connection_pool.rb', line 100

def initialize(server, options = {})
  raise ArgumentError, 'First argument must be a Server instance' unless server.is_a?(Server)

  options = options.dup
  if options[:min_size] && options[:min_pool_size] && options[:min_size] != options[:min_pool_size]
    raise ArgumentError,
          "Min size #{options[:min_size]} is not identical to min pool size #{options[:min_pool_size]}"
  end
  if options[:max_size] && options[:max_pool_size] && options[:max_size] != options[:max_pool_size]
    raise ArgumentError,
          "Max size #{options[:max_size]} is not identical to max pool size #{options[:max_pool_size]}"
  end
  if options[:wait_timeout] && options[:wait_queue_timeout] && options[:wait_timeout] != options[:wait_queue_timeout]
    raise ArgumentError,
          "Wait timeout #{options[:wait_timeout]} is not identical to wait queue timeout #{options[:wait_queue_timeout]}"
  end

  options[:min_size] ||= options[:min_pool_size]
  options.delete(:min_pool_size)
  options[:max_size] ||= options[:max_pool_size]
  options.delete(:max_pool_size)
  if options[:min_size] && options[:max_size] &&
     options[:max_size] != 0 && options[:min_size] > options[:max_size]
    raise ArgumentError, "Cannot have min size #{options[:min_size]} exceed max size #{options[:max_size]}"
  end

  options[:wait_timeout] ||= options[:wait_queue_timeout] if options[:wait_queue_timeout]
  options.delete(:wait_queue_timeout)

  @server = server
  @options = options.freeze

  @generation_manager = GenerationManager.new(server: server)
  @ready = false
  @closed = false

  # A connection owned by this pool should be either in the
  # available connections array (which is used as a stack)
  # or in the checked out connections set.
  @available_connections = []
  @checked_out_connections = Set.new
  @pending_connections = Set.new
  @interrupt_connections = []

  # Mutex used for synchronizing access to @available_connections and
  # @checked_out_connections. The pool object is thread-safe, thus
  # all methods that retrieve or modify instance variables generally
  # must do so under this lock.
  @lock = Mutex.new

  # Background thread responsible for maintaining the size of
  # the pool to at least min_size
  @populator = Populator.new(self, options)
  @populate_semaphore = Semaphore.new

  # Condition variable to enforce the first check in check_out: max_pool_size.
  # This condition variable should be signaled when the number of
  # unavailable connections decreases (pending + pending_connections +
  # checked_out_connections).
  @size_cv = Mongo::ConditionVariable.new(@lock)
  # This represents the number of threads that have made it past the size_cv
  # gate but have not acquired a connection to add to the pending_connections
  # set.
  @connection_requests = 0

  # Condition variable to enforce the second check in check_out: max_connecting.
  # This condition variable should be signaled when the number of pending
  # connections decreases.
  @max_connecting_cv = Mongo::ConditionVariable.new(@lock)
  @max_connecting = options.fetch(:max_connecting, DEFAULT_MAX_CONNECTING)

  ObjectSpace.define_finalizer(self,
                               self.class.finalize(@available_connections, @pending_connections, @populator))

  publish_cmap_event(
    Monitoring::Event::Cmap::PoolCreated.new(@server.address, options, self)
  )
end

Instance Attribute Details

#generation_managerInteger (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns generation Generation of connections currently being used by the queue.

Returns:

  • (Integer)

    generation Generation of connections currently being used by the queue.

Since:

  • 2.0.0, largely rewritten in 2.9.0



233
234
235
# File 'lib/mongo/server/connection_pool.rb', line 233

def generation_manager
  @generation_manager
end

#max_connectingObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 2.0.0, largely rewritten in 2.9.0



342
343
344
# File 'lib/mongo/server/connection_pool.rb', line 342

def max_connecting
  @max_connecting
end

#optionsHash (readonly)

Returns options The pool options.

Returns:

  • (Hash)

    options The pool options.

Since:

  • 2.0.0, largely rewritten in 2.9.0



180
181
182
# File 'lib/mongo/server/connection_pool.rb', line 180

def options
  @options
end

#populate_semaphoreObject (readonly)

Condition variable broadcast when the size of the pool changes to wake up the populator

Since:

  • 2.0.0, largely rewritten in 2.9.0



57
58
59
# File 'lib/mongo/server/connection_pool.rb', line 57

def populate_semaphore
  @populate_semaphore
end

#populatorObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 2.0.0, largely rewritten in 2.9.0



339
340
341
# File 'lib/mongo/server/connection_pool.rb', line 339

def populator
  @populator
end

#serverObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 2.0.0, largely rewritten in 2.9.0



183
184
185
# File 'lib/mongo/server/connection_pool.rb', line 183

def server
  @server
end

Class Method Details

.finalize(available_connections, pending_connections, _populator) ⇒ Proc

Finalize the connection pool for garbage collection.

Parameters:

  • available_connections (List<Mongo::Connection>)

    The available connections.

  • pending_connections (List<Mongo::Connection>)

    The pending connections.

  • populator (Populator)

    The populator.

Returns:

  • (Proc)

    The Finalizer.

Since:

  • 2.0.0, largely rewritten in 2.9.0



847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
# File 'lib/mongo/server/connection_pool.rb', line 847

def self.finalize(available_connections, pending_connections, _populator)
  proc do
    available_connections.each do |connection|
      connection.disconnect!(reason: :pool_closed)
    end
    available_connections.clear

    pending_connections.each do |connection|
      connection.disconnect!(reason: :pool_closed)
    end
    pending_connections.clear

    # Finalizer does not close checked out connections.
    # Those would have to be garbage collected on their own
    # and that should close them.
  end
end

Instance Method Details

#available_countInteger

Number of available connections in the pool.

Returns:

  • (Integer)

    Number of available connections.

Since:

  • 2.9.0



291
292
293
294
295
296
297
# File 'lib/mongo/server/connection_pool.rb', line 291

def available_count
  raise_if_closed!

  @lock.synchronize do
    @available_connections.length
  end
end

#check_in(connection) ⇒ Object

Check a connection back into the pool.

The connection must have been previously created by this pool.

Parameters:

Since:

  • 2.9.0



418
419
420
421
422
423
424
425
426
# File 'lib/mongo/server/connection_pool.rb', line 418

def check_in(connection)
  check_invariants

  @lock.synchronize do
    do_check_in(connection)
  end
ensure
  check_invariants
end

#check_out(connection_global_id: nil, context: nil) ⇒ Mongo::Server::Connection

Checks a connection out of the pool.

If there are active connections in the pool, the most recently used connection is returned. Otherwise if the connection pool size is less than the max size, creates a new connection and returns it. Otherwise waits up to the wait timeout and raises Timeout::Error if there are still no active connections and the pool is at max size.

The returned connection counts toward the pool’s max size. When the caller is finished using the connection, the connection should be checked back in via the check_in method.

Parameters:

  • :connection_global_id (Integer | nil)

    The global id for the connection to check out.

  • :context (Mongo::Operation:Context | nil)

    Context of the operation the connection is requested for, if any.

Returns:

Raises:

  • (Error::PoolClosedError)

    If the pool has been closed.

  • (Timeout::Error)

    If the connection pool is at maximum size and remains so for longer than the wait timeout.

Since:

  • 2.9.0



366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
# File 'lib/mongo/server/connection_pool.rb', line 366

def check_out(connection_global_id: nil, context: nil)
  check_invariants

  publish_cmap_event(
    Monitoring::Event::Cmap::ConnectionCheckOutStarted.new(@server.address)
  )

  raise_if_pool_closed!
  raise_if_pool_paused_locked!

  connection = retrieve_and_connect_connection(
    connection_global_id, context
  )

  publish_cmap_event(
    Monitoring::Event::Cmap::ConnectionCheckedOut.new(@server.address, connection.id, self)
  )

  if Lint.enabled? && !connection.connected?
    raise Error::LintError,
          "Connection pool for #{address} checked out a disconnected connection #{connection.generation}:#{connection.id}"
  end

  connection
ensure
  check_invariants
end

#check_out_pinned_connection(connection_global_id) ⇒ Connection | nil

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a pinned connection that is already checked out, if one exists with the given global id. Returns nil otherwise.

Parameters:

  • connection_global_id (Integer)

    The global id of the pinned connection.

Returns:

  • (Connection | nil)

    The pinned connection, or nil.

Since:

  • 2.0.0, largely rewritten in 2.9.0



403
404
405
406
407
408
409
# File 'lib/mongo/server/connection_pool.rb', line 403

def check_out_pinned_connection(connection_global_id)
  @lock.synchronize do
    @checked_out_connections.detect do |conn|
      conn.global_id == connection_global_id && conn.pinned?
    end
  end
end

#clear(options = nil) ⇒ true

Closes all idle connections in the pool and schedules currently checked out connections to be closed when they are checked back into the pool. The pool is paused, it will not create new connections in background and it will fail checkout requests until marked ready.

Parameters:

  • options (Hash) (defaults to: nil)

    a customizable set of options

Options Hash (options):

  • :lazy (true | false)

    If true, do not close any of the idle connections and instead let them be closed during a subsequent check out operation. Defaults to false.

  • :interrupt_in_use_connections (true | false)

    If true, close all checked out connections immediately. If it is false, do not close any of the checked out connections. Defaults to true.

  • :service_id (Object)

    Clear connections with the specified service id only.

Returns:

  • (true)

    true.

Since:

  • 2.1.0



538
539
540
541
542
543
544
545
546
# File 'lib/mongo/server/connection_pool.rb', line 538

def clear(options = nil)
  raise_if_closed!

  if Lint.enabled? && !@server.unknown?
    raise Error::LintError, "Attempting to clear pool for server #{@server.summary} which is known"
  end

  do_clear(options)
end

#close(options = nil) ⇒ true

Marks the pool closed, closes all idle connections in the pool and schedules currently checked out connections to be closed when they are checked back into the pool. If force option is true, checked out connections are also closed. Attempts to use the pool after it is closed will raise Error::PoolClosedError.

Parameters:

  • options (Hash) (defaults to: nil)

    a customizable set of options

Options Hash (options):

  • :force (true | false)

    Also close all checked out connections.

  • :stay_ready (true | false)

    For internal driver use only. Whether or not to mark the pool as closed.

Returns:

  • (true)

    Always true.

Since:

  • 2.9.0



660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
# File 'lib/mongo/server/connection_pool.rb', line 660

def close(options = nil)
  return if closed?

  options ||= {}

  stop_populator

  @lock.synchronize do
    until @available_connections.empty?
      connection = @available_connections.pop
      connection.disconnect!(reason: :pool_closed)
    end

    if options[:force]
      until @checked_out_connections.empty?
        connection = @checked_out_connections.take(1).first
        connection.disconnect!(reason: :pool_closed)
        @checked_out_connections.delete(connection)
      end
    end

    unless options && options[:stay_ready]
      # mark pool as closed before releasing lock so
      # no connections can be created, checked in, or checked out
      @closed = true
      @ready = false
    end

    @max_connecting_cv.broadcast
    @size_cv.broadcast
    @generation_manager.close_all_pipes
  end

  publish_cmap_event(
    Monitoring::Event::Cmap::PoolClosed.new(@server.address, self)
  )

  true
end

#close_idle_socketsObject

Close sockets that have been open for longer than the max idle time,

if the option is set.

Since:

  • 2.5.0



770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
# File 'lib/mongo/server/connection_pool.rb', line 770

def close_idle_sockets
  return if closed?
  return unless max_idle_time

  @lock.synchronize do
    i = 0
    while i < @available_connections.length
      connection = @available_connections[i]
      if (last_checkin = connection.last_checkin) && ((Time.now - last_checkin) > max_idle_time)
        connection.disconnect!(reason: :idle)
        @available_connections.delete_at(i)
        @populate_semaphore.signal
        next
      end
      i += 1
    end
  end
end

#closed?true | false

Whether the pool has been closed.

Returns:

  • (true | false)

    Whether the pool is closed.

Since:

  • 2.9.0



304
305
306
# File 'lib/mongo/server/connection_pool.rb', line 304

def closed?
  !!@closed
end

#disconnect!(options = nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Disconnects the pool.

Does everything that clear does, except if the pool is closed this method does nothing but clear would raise PoolClosedError.

Since:

  • 2.1.0



555
556
557
558
559
560
561
# File 'lib/mongo/server/connection_pool.rb', line 555

def disconnect!(options = nil)
  do_clear(options)
rescue Error::PoolClosedError
  # The "disconnected" state is between closed and paused.
  # When we are trying to disconnect the pool, permit the pool to be
  # already closed.
end

#do_check_in(connection) ⇒ Object

Executes the check in after having already acquired the lock.

Parameters:

Since:

  • 2.0.0, largely rewritten in 2.9.0



431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
# File 'lib/mongo/server/connection_pool.rb', line 431

def do_check_in(connection)
  # When a connection is interrupted it is checked back into the pool
  # and closed. The operation that was using the connection before it was
  # interrupted will attempt to check it back into the pool, and we
  # should ignore it since its already been closed and removed from the pool.
  return if connection.closed? && connection.interrupted?

  unless connection.connection_pool == self
    raise ArgumentError,
          "Trying to check in a connection which was not checked out by this pool: #{connection} checked out from pool #{connection.connection_pool} (for #{self})"
  end

  unless @checked_out_connections.include?(connection)
    raise ArgumentError,
          "Trying to check in a connection which is not currently checked out by this pool: #{connection} (for #{self})"
  end

  # NOTE: if an event handler raises, resource will not be signaled.
  # This means threads waiting for a connection to free up when
  # the pool is at max size may time out.
  # Threads that begin waiting after this method completes (with
  # the exception) should be fine.

  @checked_out_connections.delete(connection)
  @size_cv.signal

  publish_cmap_event(
    Monitoring::Event::Cmap::ConnectionCheckedIn.new(@server.address, connection.id, self)
  )

  if connection.interrupted?
    connection.disconnect!(reason: :stale)
    return
  end

  if connection.error?
    connection.disconnect!(reason: :error)
    return
  end

  if closed?
    connection.disconnect!(reason: :pool_closed)
    return
  end

  if connection.closed?
    # Connection was closed - for example, because it experienced
    # a network error. Nothing else needs to be done here.
    @populate_semaphore.signal
  elsif connection.generation != generation(service_id: connection.service_id) && !connection.pinned?
    # If connection is marked as pinned, it is used by a transaction
    # or a series of cursor operations in a load balanced setup.
    # In this case connection should not be disconnected until
    # unpinned.
    connection.disconnect!(reason: :stale)
    @populate_semaphore.signal
  else
    connection.record_checkin!
    @available_connections << connection

    @max_connecting_cv.signal
  end
end

#do_clear(options = nil) ⇒ Object

Since:

  • 2.0.0, largely rewritten in 2.9.0



563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
# File 'lib/mongo/server/connection_pool.rb', line 563

def do_clear(options = nil)
  check_invariants

  service_id = options && options[:service_id]

  @lock.synchronize do
    # Generation must be bumped before emitting pool cleared event.
    @generation_manager.bump(service_id: service_id)

    close_available_connections(service_id) unless options && options[:lazy]

    if options && options[:interrupt_in_use_connections]
      schedule_for_interruption(@checked_out_connections, service_id)
      schedule_for_interruption(@pending_connections, service_id)
    end

    if @ready
      publish_cmap_event(
        Monitoring::Event::Cmap::PoolCleared.new(
          @server.address,
          service_id: service_id,
          interrupt_in_use_connections: options&.[](:interrupt_in_use_connections)
        )
      )
      # Only pause the connection pool if the server was marked unknown,
      # otherwise, allow the retry to be attempted with a ready pool.
      do_pause if !@server.load_balancer? && @server.unknown?
    end

    # Broadcast here to cause all of the threads waiting on the max
    # connecting to break out of the wait loop and error.
    @max_connecting_cv.broadcast
    # Broadcast here to cause all of the threads waiting on the pool size
    # to break out of the wait loop and error.
    @size_cv.broadcast
  end

  # "Schedule the background thread" after clearing. This is responsible
  # for cleaning up stale threads, and interrupting in use connections.
  @populate_semaphore.signal
  true
ensure
  check_invariants
end

#do_pauseObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Mark the connection pool as paused without acquiring the lock.

Since:

  • 2.0.0, largely rewritten in 2.9.0



511
512
513
514
515
516
517
518
519
# File 'lib/mongo/server/connection_pool.rb', line 511

def do_pause
  if Lint.enabled? && !@server.unknown?
    raise Error::LintError, "Attempting to pause pool for server #{@server.summary} which is known"
  end

  return unless @ready

  @ready = false
end

#inspectString

Get a pretty printed string inspection for the pool.

Examples:

Inspect the pool.

pool.inspect

Returns:

  • (String)

    The pool inspection.

Since:

  • 2.0.0



708
709
710
711
712
713
714
715
716
717
718
719
# File 'lib/mongo/server/connection_pool.rb', line 708

def inspect
  if closed?
    "#<Mongo::Server::ConnectionPool:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " +
      "wait_timeout=#{wait_timeout} closed>"
  elsif !ready?
    "#<Mongo::Server::ConnectionPool:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " +
      "wait_timeout=#{wait_timeout} paused>"
  else
    "#<Mongo::Server::ConnectionPool:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " +
      "wait_timeout=#{wait_timeout} current_size=#{size} available=#{available_count}>"
  end
end

#max_idle_timeFloat | nil

The maximum seconds a socket can remain idle since it has been checked in to the pool, if set.

Returns:

  • (Float | nil)

    The max socket idle time in seconds.

Since:

  • 2.9.0



228
229
230
# File 'lib/mongo/server/connection_pool.rb', line 228

def max_idle_time
  @max_idle_time ||= options[:max_idle_time]
end

#max_sizeInteger

Get the maximum size of the connection pool.

Returns:

  • (Integer)

    The maximum size of the connection pool.

Since:

  • 2.9.0



193
194
195
# File 'lib/mongo/server/connection_pool.rb', line 193

def max_size
  @max_size ||= options[:max_size] || [ DEFAULT_MAX_SIZE, min_size ].max
end

#min_sizeInteger

Get the minimum size of the connection pool.

Returns:

  • (Integer)

    The minimum size of the connection pool.

Since:

  • 2.9.0



202
203
204
# File 'lib/mongo/server/connection_pool.rb', line 202

def min_size
  @min_size ||= options[:min_size] || DEFAULT_MIN_SIZE
end

#pauseObject

Mark the connection pool as paused.

Since:

  • 2.0.0, largely rewritten in 2.9.0



496
497
498
499
500
501
502
503
504
505
506
# File 'lib/mongo/server/connection_pool.rb', line 496

def pause
  raise_if_closed!

  check_invariants

  @lock.synchronize do
    do_pause
  end
ensure
  check_invariants
end

#paused?true | false

A connection pool is paused if it is not closed and it is not ready.

Returns:

  • (true | false)

    whether the connection pool is paused.

Raises:

Since:

  • 2.0.0, largely rewritten in 2.9.0



246
247
248
249
250
251
252
# File 'lib/mongo/server/connection_pool.rb', line 246

def paused?
  raise_if_closed!

  @lock.synchronize do
    !@ready
  end
end

#populatetrue | false

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method does three things:

  1. Creates and adds a connection to the pool, if the pool’s size is below min_size. Retries once if a socket-related error is encountered during this process and raises if a second error or a non socket-related error occurs.

  2. Removes stale connections from the connection pool.

  3. Interrupts connections marked for interruption.

Used by the pool populator background thread.

occurred, or the non socket-related error

Returns:

  • (true | false)

    Whether this method should be called again to create more connections.

Raises:

Since:

  • 2.0.0, largely rewritten in 2.9.0



826
827
828
829
830
831
832
833
834
835
836
837
838
# File 'lib/mongo/server/connection_pool.rb', line 826

def populate
  return false if closed?

  begin
    return create_and_add_connection
  rescue Error::SocketError, Error::SocketTimeoutError => e
    # an error was encountered while connecting the connection,
    # ignore this first error and try again.
    log_warn("Populator failed to connect a connection for #{address}: #{e.class}: #{e}. It will retry.")
  end

  create_and_add_connection
end

#readyObject

Instructs the pool to create and return connections.

Since:

  • 2.0.0, largely rewritten in 2.9.0



609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
# File 'lib/mongo/server/connection_pool.rb', line 609

def ready
  raise_if_closed!

  # TODO: Add this back in RUBY-3174.
  # if Lint.enabled?
  #   unless @server.connected?
  #     raise Error::LintError, "Attempting to ready a pool for server #{@server.summary} which is disconnected"
  #   end
  # end

  @lock.synchronize do
    return if @ready

    @ready = true
  end

  # Note that the CMAP spec demands serialization of CMAP events for a
  # pool. In order to implement this, event publication must be done into
  # a queue which is synchronized, instead of subscribers being invoked
  # from the trigger method like this one here inline. On MRI, assuming
  # the threads yield to others when they stop having work to do, it is
  # likely that the events would in practice always be published in the
  # required order. JRuby, being truly concurrent with OS threads,
  # would not offers such a guarantee.
  publish_cmap_event(
    Monitoring::Event::Cmap::PoolReady.new(@server.address, options, self)
  )

  return unless options.fetch(:populator_io, true)

  if @populator.running?
    @populate_semaphore.signal
  else
    @populator.run!
  end
end

#ready?true | false

Whether the pool is ready.

Returns:

  • (true | false)

    Whether the pool is ready.

Since:

  • 2.0.0, largely rewritten in 2.9.0



311
312
313
314
315
# File 'lib/mongo/server/connection_pool.rb', line 311

def ready?
  @lock.synchronize do
    @ready
  end
end

#sizeInteger

Size of the connection pool.

Includes available and checked out connections.

Returns:

  • (Integer)

    Size of the connection pool.

Since:

  • 2.9.0



261
262
263
264
265
266
267
# File 'lib/mongo/server/connection_pool.rb', line 261

def size
  raise_if_closed!

  @lock.synchronize do
    unsynchronized_size
  end
end

#stop_populatorObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Stop the background populator thread and clean up any connections created which have not been connected yet.

Used when closing the pool or when terminating the bg thread for testing purposes. In the latter case, this method must be called before the pool is used, to ensure no connections in pending_connections were created in-flow by the check_out method.

Since:

  • 2.0.0, largely rewritten in 2.9.0



798
799
800
801
802
803
804
805
806
807
808
# File 'lib/mongo/server/connection_pool.rb', line 798

def stop_populator
  @populator.stop!

  @lock.synchronize do
    # If stop_populator is called while populate is running, there may be
    # connections waiting to be connected, connections which have not yet
    # been moved to available_connections, or connections moved to available_connections
    # but not deleted from pending_connections. These should be cleaned up.
    clear_pending_connections
  end
end

#summaryObject

Note:

This method is experimental and subject to change.

Since:

  • 2.11.0



321
322
323
324
325
326
327
328
329
330
331
332
333
# File 'lib/mongo/server/connection_pool.rb', line 321

def summary
  @lock.synchronize do
    state = if closed?
              'closed'
            elsif !@ready
              'paused'
            else
              'ready'
            end
    "#<ConnectionPool size=#{unsynchronized_size} (#{min_size}-#{max_size}) " +
      "used=#{@checked_out_connections.length} avail=#{@available_connections.length} pending=#{@pending_connections.length} #{state}>"
  end
end

#unavailable_connectionsInteger

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns The number of unavailable connections in the pool. Used to calculate whether we have hit max_pool_size.

Returns:

  • (Integer)

    The number of unavailable connections in the pool. Used to calculate whether we have hit max_pool_size.

Since:

  • 2.0.0, largely rewritten in 2.9.0



282
283
284
# File 'lib/mongo/server/connection_pool.rb', line 282

def unavailable_connections
  @checked_out_connections.length + @pending_connections.length + @connection_requests
end

#wait_timeout(context = nil) ⇒ Float

The time to wait, in seconds, for a connection to become available.

Parameters:

  • context (Mongo::Operation:Context | nil) (defaults to: nil)

    Context of the operation the connection is requested for, if any.

Returns:

  • (Float)

    The queue wait timeout.

Since:

  • 2.9.0



214
215
216
217
218
219
220
# File 'lib/mongo/server/connection_pool.rb', line 214

def wait_timeout(context = nil)
  if context&.remaining_timeout_sec.nil?
    options[:wait_timeout] || DEFAULT_WAIT_TIMEOUT
  else
    context&.remaining_timeout_sec
  end
end

#with_connection(connection_global_id: nil, context: nil) ⇒ Object

Yield the block to a connection, while handling check in/check out logic.

Examples:

Execute with a connection.

pool.with_connection do |connection|
  connection.read
end

Returns:

  • (Object)

    The result of the block.

Since:

  • 2.0.0



731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
# File 'lib/mongo/server/connection_pool.rb', line 731

def with_connection(connection_global_id: nil, context: nil)
  raise_if_closed!

  # If a specific connection is requested and it is already checked out
  # and pinned (e.g. for a transaction or cursor in load-balanced mode),
  # reuse it directly without going through the check_out/check_in cycle.
  if connection_global_id
    connection = @lock.synchronize do
      @checked_out_connections.detect do |conn|
        conn.global_id == connection_global_id && conn.pinned?
      end
    end
  end

  connection ||= check_out(
    connection_global_id: connection_global_id,
    context: context
  )

  yield(connection)
rescue Error::SocketError, Error::SocketTimeoutError, Error::ConnectionPerished => e
  maybe_raise_pool_cleared!(connection, e)
ensure
  if connection && !connection.pinned?
    # Do not check in if the connection is pinned (the session or cursor
    # owns it and will check it in later when unpinning). Also skip
    # check-in if the connection was already checked in during the block
    # (e.g. by Session#unpin after an error on the first operation).
    checked_out = @lock.synchronize do
      @checked_out_connections.include?(connection)
    end
    check_in(connection) if checked_out
  end
end