Class: Mongo::Cluster::CursorReaper Private
- Inherits:
-
Object
- Object
- Mongo::Cluster::CursorReaper
- Includes:
- Retryable
- Defined in:
- lib/mongo/cluster/reapers/cursor_reaper.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
A manager that sends kill cursors operations at regular intervals to close cursors that have been garbage collected without being exhausted.
Constant Summary collapse
- FREQUENCY =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
The default time interval for the cursor reaper to send pending kill cursors operations.
1
Instance Attribute Summary collapse
- #cluster ⇒ Object readonly private
Instance Method Summary collapse
-
#initialize(cluster) ⇒ CursorReaper
constructor
private
Create a cursor reaper.
-
#kill_cursors ⇒ Object
(also: #execute, #flush)
private
Execute all pending kill cursors operations.
-
#read_scheduled_kill_specs ⇒ Object
private
Read and decode scheduled kill cursors operations.
-
#register_cursor(id) ⇒ Object
private
Register a cursor id as active.
-
#schedule_kill_cursor(kill_spec) ⇒ Object
private
Schedule a kill cursors operation to be eventually executed.
-
#unregister_cursor(id) ⇒ Object
private
Unregister a cursor id, indicating that it’s no longer active.
Methods included from Retryable
#read_worker, #select_server, #with_overload_retry, #write_worker
Constructor Details
#initialize(cluster) ⇒ CursorReaper
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Create a cursor reaper.
39 40 41 42 43 44 45 |
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 39 def initialize(cluster) @cluster = cluster @to_kill = {} @active_cursor_ids = Set.new @mutex = Mutex.new @kill_spec_queue = Queue.new end |
Instance Attribute Details
#cluster ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
47 48 49 |
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 47 def cluster @cluster end |
Instance Method Details
#kill_cursors ⇒ Object Also known as: execute, flush
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Execute all pending kill cursors operations.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 126 def kill_cursors # TODO: optimize this to batch kill cursor operations for the same # server/database/collection instead of killing each cursor # individually. loop do server_address = nil kill_spec = @mutex.synchronize do read_scheduled_kill_specs # Find a server that has any cursors scheduled for destruction. server_address, specs = @to_kill.detect { |_, specs| specs.any? } if specs.nil? # All servers have empty specs, nothing to do. return end # Note that this mutates the spec in the queue. # If the kill cursor operation fails, we don't attempt to # kill that cursor again. spec = specs.take(1).tap do |arr| specs.subtract(arr) end.first unless @active_cursor_ids.include?(spec.cursor_id) # The cursor was already killed, typically because it has # been iterated to completion. Remove the kill spec from # our records without doing any more work. spec = nil end spec end # If there was a spec to kill but its cursor was already killed, # look for another spec. next unless kill_spec # We could also pass kill_spec directly into the KillCursors # operation, though this would make that operation have a # different API from all of the other ones which accept hashes. spec = { cursor_ids: [ kill_spec.cursor_id ], coll_name: kill_spec.coll_name, db_name: kill_spec.db_name, } op = Operation::KillCursors.new(spec) server = cluster.servers.detect do |server| server.address == server_address end unless server # The server for this cursor has gone away --- maybe temporarily, # maybe permanently, but we can't know. To prevent connections from # leaking in the case of a permanent failure, we'll just silently # drop this killspec and move on. next end = { server_api: server.[:server_api], connection_global_id: kill_spec.connection_global_id, } if connection = kill_spec.connection op.execute_with_connection(connection, context: Operation::Context.new(options: )) connection.connection_pool.check_in(connection) else op.execute(server, context: Operation::Context.new(options: )) end next unless session = kill_spec.session session.end_session if session.implicit? end end |
#read_scheduled_kill_specs ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Read and decode scheduled kill cursors operations.
This method mutates instance variables without locking, so it is not thread safe. Generally, it should not be called itself, this is a helper for ‘kill_cursor` method.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 103 def read_scheduled_kill_specs while kill_spec = @kill_spec_queue.pop(true) if @active_cursor_ids.include?(kill_spec.cursor_id) @to_kill[kill_spec.server_address] ||= Set.new @to_kill[kill_spec.server_address] << kill_spec elsif (session = kill_spec.session) && session.implicit? # Cursor was already closed; end the session immediately to release # references rather than waiting for the kill_spec to go out of scope. session.end_session end end rescue ThreadError # Empty queue, nothing to do. end |
#register_cursor(id) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Register a cursor id as active.
68 69 70 71 72 73 74 75 |
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 68 def register_cursor(id) raise ArgumentError, 'register_cursor called with nil cursor_id' if id.nil? raise ArgumentError, 'register_cursor called with cursor_id=0' if id == 0 @mutex.synchronize do @active_cursor_ids << id end end |
#schedule_kill_cursor(kill_spec) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Schedule a kill cursors operation to be eventually executed.
54 55 56 |
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 54 def schedule_kill_cursor(kill_spec) @kill_spec_queue << kill_spec end |
#unregister_cursor(id) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Unregister a cursor id, indicating that it’s no longer active.
87 88 89 90 91 92 93 94 |
# File 'lib/mongo/cluster/reapers/cursor_reaper.rb', line 87 def unregister_cursor(id) raise ArgumentError, 'unregister_cursor called with nil cursor_id' if id.nil? raise ArgumentError, 'unregister_cursor called with cursor_id=0' if id == 0 @mutex.synchronize do @active_cursor_ids.delete(id) end end |