Class: Async::Container::Generic
- Inherits:
-
Object
- Object
- Async::Container::Generic
- Defined in:
- lib/async/container/generic.rb
Overview
A base class for implementing containers.
Constant Summary collapse
- UNNAMED =
"Unnamed"
Instance Attribute Summary collapse
-
#group ⇒ Object
readonly
Returns the value of attribute group.
-
#policy ⇒ Object
Returns the value of attribute policy.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#statistics ⇒ Object
Statistics relating to the behavior of children instances.
- #The group of running children instances.(groupofrunningchildreninstances.) ⇒ Object readonly
Class Method Summary collapse
-
.run ⇒ Object
Run a new container.
Instance Method Summary collapse
-
#[](key) ⇒ Object
Look up a child process by key.
-
#async(**options, &block) ⇒ Object
deprecated
Deprecated.
Please use #spawn or Generic.run instead.
-
#failed? ⇒ Boolean
Whether any failures have occurred within the container.
-
#initialize(policy: Policy::DEFAULT, **options) ⇒ Generic
constructor
Initialize the container.
-
#interrupt ⇒ Object
Gracefully interrupt all child instances.
-
#key?(key) ⇒ Boolean
Whether a child instance exists for the given key.
-
#mark?(key) ⇒ Boolean
Mark the container’s keyed instance which ensures that it won’t be discarded.
-
#reload ⇒ Object
Reload the container’s keyed instances.
-
#run(count: Container.processor_count, **options, &block) ⇒ Object
Run multiple instances of the same block in the container.
-
#running? ⇒ Boolean
Whether the container has running children instances.
- #size ⇒ Object
-
#sleep(duration = nil) ⇒ Object
Sleep until some state change occurs or the specified duration elapses.
-
#spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startup_timeout: nil, &block) ⇒ Object
Spawn a child instance into the container.
-
#status?(flag) ⇒ Boolean
Returns true if all children instances have the specified status flag set.
-
#stop(timeout = true) ⇒ Object
Stop the children instances.
-
#stopping? ⇒ Boolean
Whether the container is currently stopping.
- #The policy for managing child lifecycle events.=(policy) ⇒ Object
- #The state of each child instance.=(stateofeachchildinstance. = (value)) ⇒ Object
-
#to_s ⇒ Object
A human readable representation of the container.
-
#wait ⇒ Object
Wait until all spawned tasks are completed.
-
#wait_until_ready ⇒ Object
Wait until all the children instances have indicated that they are ready.
Constructor Details
#initialize(policy: Policy::DEFAULT, **options) ⇒ Generic
Initialize the container.
48 49 50 51 52 53 54 55 56 57 |
# File 'lib/async/container/generic.rb', line 48 def initialize(policy: Policy::DEFAULT, **) @group = Group.new(**) @stopping = false @state = {} @policy = policy @statistics = @policy.make_statistics @keyed = {} end |
Instance Attribute Details
#group ⇒ Object (readonly)
Returns the value of attribute group.
60 61 62 |
# File 'lib/async/container/generic.rb', line 60 def group @group end |
#policy ⇒ Object
Returns the value of attribute policy.
71 72 73 |
# File 'lib/async/container/generic.rb', line 71 def policy @policy end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
68 69 70 |
# File 'lib/async/container/generic.rb', line 68 def state @state end |
#statistics ⇒ Object
Statistics relating to the behavior of children instances.
87 88 89 |
# File 'lib/async/container/generic.rb', line 87 def statistics @statistics end |
#The group of running children instances.(groupofrunningchildreninstances.) ⇒ Object (readonly)
60 |
# File 'lib/async/container/generic.rb', line 60 attr :group |
Class Method Details
.run ⇒ Object
Run a new container.
38 39 40 |
# File 'lib/async/container/generic.rb', line 38 def self.run(...) self.new.run(...) end |
Instance Method Details
#[](key) ⇒ Object
Look up a child process by key. A key could be a symbol, a file path, or something else which the child instance represents.
81 82 83 |
# File 'lib/async/container/generic.rb', line 81 def [] key @keyed[key]&.value end |
#async(**options, &block) ⇒ Object
319 320 321 322 323 324 325 326 327 |
# File 'lib/async/container/generic.rb', line 319 def async(**, &block) # warn "#{self.class}##{__method__} is deprecated, please use `spawn` or `run` instead.", uplevel: 1 require "async" spawn(**) do |instance| Async(instance, &block) end end |
#failed? ⇒ Boolean
Whether any failures have occurred within the container.
91 92 93 |
# File 'lib/async/container/generic.rb', line 91 def failed? @statistics.failed? end |
#interrupt ⇒ Object
Gracefully interrupt all child instances.
119 120 121 122 123 124 125 |
# File 'lib/async/container/generic.rb', line 119 def interrupt # We must enter the stopping state before signalling the children. Interrupting a child causes it to drain and exit, but the main run loop will respawn any child that exits while `restart: true` and the container is not stopping (see the `restart && !@stopping` gate in `#run`). Without setting this flag, an interrupted child immediately respawns, so the container never drains and `#wait` never returns. # # This matters most for `Hybrid` containers: a `SIGINT`/`SIGTERM` delivered to a fork is translated into a call to `#interrupt` on the inner threaded container, which typically runs with `restart: true` (the default for `async-service` managed services). If `#interrupt` did not set this flag, the inner threads would drain, exit, and respawn in a loop, so a single signal would never terminate the fork. Setting `@stopping = true` here makes `#interrupt` behave as the start of a graceful shutdown: children drain and exit, are not respawned, and the fork terminates - consistent with how `Forked` and `Threaded` containers handle a single interrupt. @stopping = true @group.interrupt end |
#key?(key) ⇒ Boolean
Whether a child instance exists for the given key.
358 359 360 361 362 |
# File 'lib/async/container/generic.rb', line 358 def key?(key) if key @keyed.key?(key) end end |
#mark?(key) ⇒ Boolean
Mark the container’s keyed instance which ensures that it won’t be discarded.
345 346 347 348 349 350 351 352 353 354 355 |
# File 'lib/async/container/generic.rb', line 345 def mark?(key) if key if value = @keyed[key] value.mark! return true end end return false end |
#reload ⇒ Object
Reload the container’s keyed instances.
330 331 332 333 334 335 336 337 338 339 340 341 342 |
# File 'lib/async/container/generic.rb', line 330 def reload @keyed.each_value(&:clear!) yield dirty = false @keyed.delete_if do |key, value| value.stop? && (dirty = true) end return dirty end |
#run(count: Container.processor_count, **options, &block) ⇒ Object
Run multiple instances of the same block in the container.
310 311 312 313 314 315 316 |
# File 'lib/async/container/generic.rb', line 310 def run(count: Container.processor_count, **, &block) count.times do spawn(**, &block) end return self end |
#running? ⇒ Boolean
Whether the container has running children instances.
96 97 98 |
# File 'lib/async/container/generic.rb', line 96 def running? @group.running? end |
#size ⇒ Object
63 64 65 |
# File 'lib/async/container/generic.rb', line 63 def size @group.size end |
#sleep(duration = nil) ⇒ Object
Sleep until some state change occurs or the specified duration elapses.
109 110 111 |
# File 'lib/async/container/generic.rb', line 109 def sleep(duration = nil) @group.sleep(duration) end |
#spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startup_timeout: nil, &block) ⇒ Object
Spawn a child instance into the container.
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 |
# File 'lib/async/container/generic.rb', line 216 def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startup_timeout: nil, &block) name ||= UNNAMED if mark?(key) Console.debug(self, "Reusing existing child.", child: {key: key, name: name}) return false end @statistics.spawn! fiber do until @stopping Console.debug(self, "Starting child...", child: {key: key, name: name, restart: restart, health_check_timeout: health_check_timeout}, statistics: @statistics) child = self.start(name, &block) state = insert(key, child) # Notify policy of spawn begin @policy.child_spawn(self, child, name: name, key: key) rescue => error Console.error(self, "Policy error in child_spawn!", exception: error) end Console.debug(self, "Started child.", child: child, spawn: {key: key, restart: restart, health_check_timeout: health_check_timeout}, statistics: @statistics) # If a health check or startup timeout is specified, we will monitor the child process and terminate it if it does not update its state within the specified time. if health_check_timeout || startup_timeout age_clock = state[:age] = Clock.start end status = nil begin status = @group.wait_for(child) do || case when :health_check! if state[:ready] # If a health check timeout is specified, we will monitor the child process and terminate it if it does not update its state within the specified time. if health_check_timeout if health_check_timeout < age_clock.total health_check_failed(child, age_clock, health_check_timeout) end end else # If a startup timeout is specified, we will monitor the child process and terminate it if it does not become ready within the specified time. if startup_timeout if startup_timeout < age_clock.total startup_failed(child, age_clock, startup_timeout) end end end else state.update() # Reset the age clock if the child has become ready: if state[:ready] age_clock&.reset! end end end rescue => error Console.error(self, "Error during child process management!", exception: error, stopping: @stopping) ensure delete(key, child) end if status&.success? Console.debug(self, "Child exited successfully.", status: status, stopping: @stopping) else @statistics.failure! Console.error(self, "Child exited with error!", status: status, stopping: @stopping) end # Notify policy of exit (after statistics are updated): begin @policy.child_exit(self, child, status, name: name, key: key) rescue => error Console.error(self, "Policy error in child_exit!", exception: error) end if restart && !@stopping @statistics.restart! else break end end end.resume return true end |
#status?(flag) ⇒ Boolean
Returns true if all children instances have the specified status flag set. e.g. ‘:ready`. This state is updated by the process readiness protocol mechanism. See Notify::Client for more details.
131 132 133 134 |
# File 'lib/async/container/generic.rb', line 131 def status?(flag) # This also returns true if all processes have exited/failed: @state.all?{|_, state| state[flag]} end |
#stop(timeout = true) ⇒ Object
Stop the children instances.
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/async/container/generic.rb', line 164 def stop(timeout = true) if @stopping Console.warn(self, "Container is already stopping!") return end Console.info(self, "Stopping container...", timeout: timeout) @stopping = true @group.stop(timeout) if @group.running? Console.warn(self, "Group is still running after stopping it!") else Console.info(self, "Group has stopped.") end rescue => error Console.error(self, "Error while stopping container!", exception: error) raise end |
#stopping? ⇒ Boolean
Whether the container is currently stopping.
102 103 104 |
# File 'lib/async/container/generic.rb', line 102 def stopping? @stopping end |
#The policy for managing child lifecycle events.=(policy) ⇒ Object
71 |
# File 'lib/async/container/generic.rb', line 71 attr_accessor :policy |
#The state of each child instance.=(stateofeachchildinstance. = (value)) ⇒ Object
68 |
# File 'lib/async/container/generic.rb', line 68 attr :state |
#to_s ⇒ Object
A human readable representation of the container.
75 76 77 |
# File 'lib/async/container/generic.rb', line 75 def to_s "#{self.class} with #{@statistics.spawns} spawns and #{@statistics.failures} failures." end |
#wait ⇒ Object
Wait until all spawned tasks are completed.
114 115 116 |
# File 'lib/async/container/generic.rb', line 114 def wait @group.wait end |
#wait_until_ready ⇒ Object
Wait until all the children instances have indicated that they are ready.
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/async/container/generic.rb', line 138 def wait_until_ready while true Console.debug(self) do |buffer| buffer.puts "Waiting for ready:" @state.each do |child, state| buffer.puts "\t#{child.inspect}: #{state}" end end self.sleep if self.status?(:ready) Console.debug(self) do |buffer| buffer.puts "All ready:" @state.each do |child, state| buffer.puts "\t#{child.inspect}: #{state}" end end return true end end end |