Class: Puppeteer::ReactorRunner

Inherits:
Object
  • Object
show all
Defined in:
lib/puppeteer/reactor_runner.rb

Overview

Runs a dedicated Async reactor in a background thread and proxies calls into it.

Defined Under Namespace

Classes: Finalizer, Proxy

Instance Method Summary collapse

Constructor Details

#initializeReactorRunner

Returns a new instance of ReactorRunner.



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/puppeteer/reactor_runner.rb', line 109

def initialize
  @queue = Thread::Queue.new
  @ready = Queue.new
  @closed = false
  @thread = Thread.new do
    Sync do |task|
      barrier = Async::Barrier.new(parent: task)
      @barrier = barrier
      @ready << true
      begin
        while (job = @queue.pop)
          barrier.async do
            job.call
          end
        end
      rescue ClosedQueueError
        # Queue closed; exit the reactor loop.
      ensure
        barrier.stop
      end
    end
  ensure
    @barrier = nil
    @closed = true
  end

  ObjectSpace.define_finalizer(self, Finalizer.new(@queue, @thread))
  @ready.pop
end

Instance Method Details

#closeObject



161
162
163
164
165
166
167
# File 'lib/puppeteer/reactor_runner.rb', line 161

def close
  return if closed?

  @closed = true
  @queue.close
  @thread.join unless runner_thread?
end

#closed?Boolean

Returns:

  • (Boolean)


187
188
189
# File 'lib/puppeteer/reactor_runner.rb', line 187

def closed?
  @closed
end

#sync(&block) ⇒ Object

Raises:



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/puppeteer/reactor_runner.rb', line 139

def sync(&block)
  return block.call if runner_thread?
  raise ::Puppeteer::Error.new("ReactorRunner is closed") if closed?

  promise = Async::Promise.new
  job = lambda do
    begin
      promise.resolve(block.call)
    rescue Exception => err
      promise.reject(err)
    end
  end

  begin
    @queue << job
  rescue ClosedQueueError
    raise ::Puppeteer::Error.new("ReactorRunner is closed")
  end

  promise.wait
end

#unwrap(value, seen = nil) ⇒ Object



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/puppeteer/reactor_runner.rb', line 203

def unwrap(value, seen = nil)
  seen ||= {}

  case value
  when Proxy
    value.__getobj__
  when Array
    object_id = value.object_id
    return seen[object_id] if seen.key?(object_id)

    result = []
    seen[object_id] = result
    value.each { |item| result << unwrap(item, seen) }
    result
  when Hash
    object_id = value.object_id
    return seen[object_id] if seen.key?(object_id)

    result = {}
    seen[object_id] = result
    value.each do |key, item|
      result[unwrap(key, seen)] = unwrap(item, seen)
    end
    result
  else
    value
  end
end

#wait_until_idle(timeout: 1.0) ⇒ Object



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/puppeteer/reactor_runner.rb', line 169

def wait_until_idle(timeout: 1.0)
  return if closed?

  sync do
    return unless @barrier

    deadline = timeout ? Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout : nil
    loop do
      break if @barrier.empty?
      break if deadline && Process.clock_gettime(Process::CLOCK_MONOTONIC) >= deadline

      Async::Task.current.sleep(0.01)
    end
  end
rescue ::Puppeteer::Error
  # Runner closed while waiting; ignore.
end

#wrap(value) ⇒ Object



191
192
193
194
195
196
197
198
199
200
201
# File 'lib/puppeteer/reactor_runner.rb', line 191

def wrap(value)
  return value if value.nil? || value.is_a?(Proxy)

  if value.is_a?(Array)
    return value.map { |item| wrap(item) }
  end

  return Proxy.new(self, value) if proxyable?(value)

  value
end