Class: Whoosh::Streaming::StreamBody

Inherits:
Object
  • Object
show all
Defined in:
lib/whoosh/streaming/stream_body.rb

Instance Method Summary collapse

Constructor Details

#initialize(queue_size: 64, &producer) ⇒ StreamBody

Returns a new instance of StreamBody.



33
34
35
36
37
# File 'lib/whoosh/streaming/stream_body.rb', line 33

def initialize(queue_size: 64, &producer)
  @queue = SizedQueue.new(queue_size)
  @producer = producer
  @thread = nil
end

Instance Method Details

#closeObject



54
55
56
# File 'lib/whoosh/streaming/stream_body.rb', line 54

def close
  @thread&.kill
end

#eachObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/whoosh/streaming/stream_body.rb', line 39

def each
  @thread = Thread.new do
    out = QueueWriter.new(@queue)
    @producer.call(out)
  rescue => e
    # Producer failed — just end the stream
  ensure
    @queue.push(:done)
  end

  while (chunk = @queue.pop) != :done
    yield chunk
  end
end