Class: Async::Container::Group

Inherits:
Object
  • Object
show all
Defined in:
lib/async/container/group.rb

Overview

Manages a group of running processes.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(health_check_interval: 1.0) ⇒ Group

Initialize an empty group.



21
22
23
24
25
26
# File 'lib/async/container/group.rb', line 21

def initialize(health_check_interval: 1.0)
	@health_check_interval = health_check_interval
	
	# The running fibers, indexed by IO:
	@running = {}
end

Instance Attribute Details

#runningObject (readonly)

Returns the value of attribute running.



34
35
36
# File 'lib/async/container/group.rb', line 34

def running
  @running
end

#the running tasks, indexed by IO.(runningtasks, indexedbyIO.) ⇒ Object (readonly)



34
# File 'lib/async/container/group.rb', line 34

attr :running

Instance Method Details

#any?Boolean

Whether the group contains any running processes.

Returns:

  • (Boolean)


49
50
51
# File 'lib/async/container/group.rb', line 49

def any?
	@running.any?
end

#empty?Boolean

Whether the group is empty.

Returns:

  • (Boolean)


55
56
57
# File 'lib/async/container/group.rb', line 55

def empty?
	@running.empty?
end

#health_check!Object

Perform a health check on all running processes.



98
99
100
101
102
# File 'lib/async/container/group.rb', line 98

def health_check!
	each_running do |fiber|
		fiber.resume(:health_check!)
	end
end

#inspectObject



29
30
31
# File 'lib/async/container/group.rb', line 29

def inspect
	"#<#{self.class} running=#{@running.size}>"
end

#interruptObject

Interrupt all running processes. This resumes the controlling fiber with an instance of Interrupt.



106
107
108
109
110
111
# File 'lib/async/container/group.rb', line 106

def interrupt
	Console.info(self, "Sending interrupt to #{@running.size} running processes...")
	each_running do |fiber|
		fiber.resume(Interrupt)
	end
end

#killObject

Kill all running processes. This resumes the controlling fiber with an instance of Kill.



124
125
126
127
128
129
# File 'lib/async/container/group.rb', line 124

def kill
	Console.info(self, "Sending kill to #{@running.size} running processes...")
	each_running do |fiber|
		fiber.resume(Kill)
	end
end

#running?Boolean

Whether the group contains any running processes.

Returns:

  • (Boolean)


43
44
45
# File 'lib/async/container/group.rb', line 43

def running?
	@running.any?
end

#sizeObject



37
38
39
# File 'lib/async/container/group.rb', line 37

def size
	@running.size
end

#sleep(duration) ⇒ Object

Sleep for at most the specified duration until some state change occurs.



60
61
62
# File 'lib/async/container/group.rb', line 60

def sleep(duration)
	self.wait_for_children(duration)
end

#stop(graceful = true) ⇒ Object

Stop all child processes with a multi-phase shutdown sequence.

A graceful shutdown performs the following sequence:

  1. Send SIGINT and wait up to ‘graceful` seconds if specified.

  2. Send SIGKILL and wait indefinitely for process cleanup.

If ‘graceful` is true, default to `DEFAULT_GRACEFUL_TIMEOUT` (10 seconds). If `graceful` is false, skip the SIGINT phase and go directly to SIGKILL.



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/async/container/group.rb', line 154

def stop(graceful = true)
	Console.debug(self, "Stopping all processes...", graceful: graceful)
	
	# If a timeout is specified, interrupt the children first:
	if graceful
		# Send SIGINT to the children:
		self.interrupt
		
		if graceful == true
			graceful = DEFAULT_GRACEFUL_TIMEOUT
		end
		
		clock = Clock.start
		
		# Wait for the children to exit:
		self.wait_for_exit(clock, graceful)
	end
ensure
	# Do our best to clean up the children:
	if any?
		if graceful
			Console.warn(self, "Killing processes after graceful shutdown failed...", size: self.size, clock: clock)
		end
		
		self.kill
		self.wait
	end
end

#terminateObject

Terminate all running processes. This resumes the controlling fiber with an instance of Terminate.



115
116
117
118
119
120
# File 'lib/async/container/group.rb', line 115

def terminate
	Console.info(self, "Sending terminate to #{@running.size} running processes...")
	each_running do |fiber|
		fiber.resume(Terminate)
	end
end

#waitObject

Begin any outstanding queued processes and wait for them indefinitely.



65
66
67
68
69
# File 'lib/async/container/group.rb', line 65

def wait
	with_health_checks do |duration|
		self.wait_for_children(duration)
	end
end

#wait_for(channel) ⇒ Object

Wait for a message in the specified Channel.



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/async/container/group.rb', line 184

def wait_for(channel)
	io = channel.in
	
	@running[io] = Fiber.current
	
	while @running.key?(io)
		# Wait for some event on the channel:
		result = Fiber.yield
		
		if result == Interrupt
			channel.interrupt!
		elsif result == Terminate
			channel.terminate!
		elsif result == Kill
			channel.kill!
		elsif result
			yield result
		elsif message = channel.receive
			yield message
		else
			# Wait for the channel to exit:
			return channel.wait
		end
	end
ensure
	@running.delete(io)
end