Class: Async::Container::Group

Inherits:
Object
  • Object
show all
Defined in:
lib/async/container/group.rb

Overview

Manages a group of running processes.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(health_check_interval: 1.0) ⇒ Group

Initialize an empty group.



33
34
35
36
37
38
# File 'lib/async/container/group.rb', line 33

def initialize(health_check_interval: 1.0)
	@health_check_interval = health_check_interval
	
	# The running fibers, indexed by IO:
	@running = {}
end

Instance Attribute Details

#runningObject (readonly)

Returns the value of attribute running.



46
47
48
# File 'lib/async/container/group.rb', line 46

def running
  @running
end

#the running tasks, indexed by IO.(runningtasks, indexedbyIO.) ⇒ Object (readonly)



46
# File 'lib/async/container/group.rb', line 46

attr :running

Instance Method Details

#any?Boolean

Whether the group contains any running processes.

Returns:

  • (Boolean)


61
62
63
# File 'lib/async/container/group.rb', line 61

def any?
	@running.any?
end

#empty?Boolean

Whether the group is empty.

Returns:

  • (Boolean)


67
68
69
# File 'lib/async/container/group.rb', line 67

def empty?
	@running.empty?
end

#health_check!Object

Perform a health check on all running processes.



110
111
112
113
114
# File 'lib/async/container/group.rb', line 110

def health_check!
	each_running do |fiber|
		fiber.resume(:health_check!)
	end
end

#inspectObject



41
42
43
# File 'lib/async/container/group.rb', line 41

def inspect
	"#<#{self.class} running=#{@running.size}>"
end

#interruptObject

Interrupt all running processes. This resumes the controlling fiber with an instance of Interrupt.



118
119
120
121
122
123
# File 'lib/async/container/group.rb', line 118

def interrupt
	Console.info(self, "Sending interrupt to #{@running.size} running processes...")
	each_running do |fiber|
		fiber.resume(Interrupt)
	end
end

#killObject

Kill all running processes. This resumes the controlling fiber with an instance of Kill.



136
137
138
139
140
141
# File 'lib/async/container/group.rb', line 136

def kill
	Console.info(self, "Sending kill to #{@running.size} running processes...")
	each_running do |fiber|
		fiber.resume(Kill)
	end
end

#running?Boolean

Whether the group contains any running processes.

Returns:

  • (Boolean)


55
56
57
# File 'lib/async/container/group.rb', line 55

def running?
	@running.any?
end

#sizeObject



49
50
51
# File 'lib/async/container/group.rb', line 49

def size
	@running.size
end

#sleep(duration) ⇒ Object

Sleep for at most the specified duration until some state change occurs.



72
73
74
# File 'lib/async/container/group.rb', line 72

def sleep(duration)
	self.wait_for_children(duration)
end

#stop(graceful = GRACEFUL_TIMEOUT) ⇒ Object

Stop all child processes with a multi-phase shutdown sequence.

A graceful shutdown performs the following sequence:

  1. Send SIGINT and wait up to ‘graceful` seconds if specified.

  2. Send SIGKILL and wait indefinitely for process cleanup.

If ‘graceful` is true, default to `DEFAULT_GRACEFUL_TIMEOUT` (10 seconds). If `graceful` is false, skip the SIGINT phase and go directly to SIGKILL.



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/async/container/group.rb', line 166

def stop(graceful = GRACEFUL_TIMEOUT)
	Console.debug(self, "Stopping all processes...", graceful: graceful)
	
	# If a timeout is specified, interrupt the children first:
	if graceful
		# Send SIGINT to the children:
		self.interrupt
		
		if graceful == true
			graceful = DEFAULT_GRACEFUL_TIMEOUT
		end
		
		clock = Clock.start
		
		# Wait for the children to exit:
		self.wait_for_exit(clock, graceful)
	end
ensure
	# Do our best to clean up the children:
	if any?
		if graceful
			Console.warn(self, "Killing processes after graceful shutdown failed...", size: self.size, clock: clock)
		end
		
		self.kill
		self.wait
	end
end

#terminateObject

Terminate all running processes. This resumes the controlling fiber with an instance of Terminate.



127
128
129
130
131
132
# File 'lib/async/container/group.rb', line 127

def terminate
	Console.info(self, "Sending terminate to #{@running.size} running processes...")
	each_running do |fiber|
		fiber.resume(Terminate)
	end
end

#waitObject

Begin any outstanding queued processes and wait for them indefinitely.



77
78
79
80
81
# File 'lib/async/container/group.rb', line 77

def wait
	with_health_checks do |duration|
		self.wait_for_children(duration)
	end
end

#wait_for(channel) ⇒ Object

Wait for a message in the specified Channel.



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/async/container/group.rb', line 196

def wait_for(channel)
	io = channel.in
	
	@running[io] = Fiber.current
	
	while @running.key?(io)
		# Wait for some event on the channel:
		result = Fiber.yield
		
		if result == Interrupt
			channel.interrupt!
		elsif result == Terminate
			channel.terminate!
		elsif result == Kill
			channel.kill!
		elsif result
			yield result
		elsif message = channel.receive
			yield message
		else
			# Wait for the channel to exit:
			return channel.wait
		end
	end
ensure
	@running.delete(io)
end