Class: Puppeteer::ReactorRunner
- Inherits:
-
Object
- Object
- Puppeteer::ReactorRunner
show all
- Defined in:
- lib/puppeteer/reactor_runner.rb
Overview
Runs a dedicated Async reactor in a background thread and proxies calls into it.
Defined Under Namespace
Classes: Finalizer, Proxy
Instance Method Summary
collapse
Constructor Details
Returns a new instance of ReactorRunner.
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
# File 'lib/puppeteer/reactor_runner.rb', line 109
def initialize
@queue = Thread::Queue.new
@ready = Queue.new
@closed = false
@thread = Thread.new do
Sync do |task|
barrier = Async::Barrier.new(parent: task)
@barrier = barrier
@ready << true
begin
while (job = @queue.pop)
barrier.async do
job.call
end
end
rescue ClosedQueueError
ensure
barrier.stop
end
end
ensure
@barrier = nil
@closed = true
end
ObjectSpace.define_finalizer(self, Finalizer.new(@queue, @thread))
@ready.pop
end
|
Instance Method Details
#close ⇒ Object
161
162
163
164
165
166
167
|
# File 'lib/puppeteer/reactor_runner.rb', line 161
def close
return if closed?
@closed = true
@queue.close
@thread.join unless runner_thread?
end
|
#closed? ⇒ Boolean
187
188
189
|
# File 'lib/puppeteer/reactor_runner.rb', line 187
def closed?
@closed
end
|
#sync(&block) ⇒ Object
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
|
# File 'lib/puppeteer/reactor_runner.rb', line 139
def sync(&block)
return block.call if runner_thread?
raise ::Puppeteer::Error.new("ReactorRunner is closed") if closed?
promise = Async::Promise.new
job = lambda do
begin
promise.resolve(block.call)
rescue Exception => err
promise.reject(err)
end
end
begin
@queue << job
rescue ClosedQueueError
raise ::Puppeteer::Error.new("ReactorRunner is closed")
end
promise.wait
end
|
#unwrap(value, seen = nil) ⇒ Object
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
|
# File 'lib/puppeteer/reactor_runner.rb', line 203
def unwrap(value, seen = nil)
seen ||= {}
case value
when Proxy
value.__getobj__
when Array
object_id = value.object_id
return seen[object_id] if seen.key?(object_id)
result = []
seen[object_id] = result
value.each { |item| result << unwrap(item, seen) }
result
when Hash
object_id = value.object_id
return seen[object_id] if seen.key?(object_id)
result = {}
seen[object_id] = result
value.each do |key, item|
result[unwrap(key, seen)] = unwrap(item, seen)
end
result
else
value
end
end
|
#wait_until_idle(timeout: 1.0) ⇒ Object
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
|
# File 'lib/puppeteer/reactor_runner.rb', line 169
def wait_until_idle(timeout: 1.0)
return if closed?
sync do
return unless @barrier
deadline = timeout ? Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout : nil
loop do
break if @barrier.empty?
break if deadline && Process.clock_gettime(Process::CLOCK_MONOTONIC) >= deadline
Async::Task.current.sleep(0.01)
end
end
rescue ::Puppeteer::Error
end
|
#wrap(value) ⇒ Object
191
192
193
194
195
196
197
198
199
200
201
|
# File 'lib/puppeteer/reactor_runner.rb', line 191
def wrap(value)
return value if value.nil? || value.is_a?(Proxy)
if value.is_a?(Array)
return value.map { |item| wrap(item) }
end
return Proxy.new(self, value) if proxyable?(value)
value
end
|