Class: Down::ChunkedIO

Inherits:
Object
  • Object
show all
Defined in:
lib/down/chunked_io.rb

Overview

Wraps an enumerator that yields chunks of content into an IO object. It implements some essential IO methods:

  • IO#read

  • IO#readpartial

  • IO#gets

  • IO#size

  • IO#pos

  • IO#eof?

  • IO#rewind

  • IO#close

By default the Down::ChunkedIO caches all read content into a tempfile, allowing it to be rewindable. If rewindability won’t be used, it can be disabled by setting ‘:rewindable` to false, which eliminates any disk I/O.

Any cleanup code (i.e. ensure block) that the given enumerator carries is guaranteed to get executed, either when all content has been retrieved or when Down::ChunkedIO is closed. One can also specify an ‘:on_close` callback that will also get executed in those situations.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(chunks:, size: nil, on_close: nil, data: {}, rewindable: true, encoding: nil) ⇒ ChunkedIO

Returns a new instance of ChunkedIO.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/down/chunked_io.rb', line 30

def initialize(chunks:, size: nil, on_close: nil, data: {}, rewindable: true, encoding: nil)
  @chunks     = chunks
  @size       = size
  @on_close   = on_close
  @data       = data
  @encoding   = find_encoding(encoding || "binary")
  @rewindable = rewindable
  @buffer     = nil
  @position   = 0
  @next_chunk = nil
  @closed     = false

  retrieve_chunk # fetch first chunk so that we know whether the file is empty
end

Instance Attribute Details

#dataObject

Returns the value of attribute data.



28
29
30
# File 'lib/down/chunked_io.rb', line 28

def data
  @data
end

#encodingObject

Returns the value of attribute encoding.



28
29
30
# File 'lib/down/chunked_io.rb', line 28

def encoding
  @encoding
end

#sizeObject

Returns the value of attribute size.



28
29
30
# File 'lib/down/chunked_io.rb', line 28

def size
  @size
end

Instance Method Details

#closeObject

Implements IO#close semantics. Closes the Down::ChunkedIO by terminating chunk retrieval and deleting the cached content.



263
264
265
266
267
268
269
270
# File 'lib/down/chunked_io.rb', line 263

def close
  return if @closed

  chunks_fiber.resume(:terminate) if chunks_fiber.alive?
  cache.close! if cache
  @buffer = nil
  @closed = true
end

#closed?Boolean

Returns whether the Down::ChunkedIO has been closed.

Returns:

  • (Boolean)


273
274
275
# File 'lib/down/chunked_io.rb', line 273

def closed?
  !!@closed
end

#each_chunkObject

Yields elements of the underlying enumerator.



51
52
53
54
55
56
57
# File 'lib/down/chunked_io.rb', line 51

def each_chunk
  fail IOError, "closed stream" if closed?

  return enum_for(__method__) unless block_given?

  yield retrieve_chunk until chunks_depleted?
end

#eof?Boolean Also known as: eof

Implements IO#eof? semantics. Returns whether we’ve reached end of file. It returns true if cache is at the end and there is no more content to retrieve. Raises IOError if closed.

Returns:

  • (Boolean)


242
243
244
245
246
247
# File 'lib/down/chunked_io.rb', line 242

def eof?
  fail IOError, "closed stream" if closed?

  return false if cache && !cache.eof?
  @buffer.nil? && chunks_depleted?
end

#gets(separator_or_limit = $/, limit = nil) ⇒ Object

Implements IO#gets semantics. Without arguments it retrieves lines of content separated by newlines.

With ‘separator` argument it does the following:

  • if ‘separator` is a nonempty string returns chunks of content surrounded with that sequence of bytes

  • if ‘separator` is an empty string returns paragraphs of content (content delimited by two newlines)

  • if ‘separator` is nil and `limit` is nil returns all content

With ‘limit` argument returns maximum of that amount of bytes.

Returns nil if end of file is reached. Raises IOError if closed.



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/down/chunked_io.rb', line 103

def gets(separator_or_limit = $/, limit = nil)
  fail IOError, "closed stream" if closed?

  if separator_or_limit.is_a?(Integer)
    separator = $/
    limit     = separator_or_limit
  else
    separator = separator_or_limit
  end

  return read(limit)&.force_encoding(@encoding) if separator.nil?

  separator = "\n\n" if separator.empty?

  data = String.new

  until data.include?(separator) || data.bytesize == limit || eof?
    remaining_length = limit - data.bytesize if limit
    data << readpartial(remaining_length, buffer ||= String.new)
  end

  buffer.clear if buffer # deallocate buffer

  line, extra = data.split(separator, 2)
  line << separator if data.include?(separator)

  data.clear # deallocate data

  if extra
    @position -= extra.bytesize

    if cache
      cache.pos -= extra.bytesize
    else
      if @buffer
        @buffer.prepend(extra)
      else
        @buffer = extra
      end
    end
  end

  line.force_encoding(@encoding) if line
end

#inspectObject

Returns useful information about the Down::ChunkedIO object.



283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/down/chunked_io.rb', line 283

def inspect
  string  = String.new
  string << "#<#{self.class.name}"
  string << " chunks=#{@chunks.inspect}"
  string << " size=#{size.inspect}"
  string << " encoding=#{encoding.inspect}"
  string << " data=#{data.inspect}"
  string << " on_close=#{@on_close.inspect}"
  string << " rewindable=#{@rewindable.inspect}"
  string << " (closed)" if closed?
  string << ">"
end

#lengthObject

For compatibility with multipart-post gem.



46
47
48
# File 'lib/down/chunked_io.rb', line 46

def length
  @size
end

#posObject Also known as: tell

Implements IO#pos semantics. Returns the current position of the Down::ChunkedIO.



234
235
236
# File 'lib/down/chunked_io.rb', line 234

def pos
  @position
end

#read(length = nil, outbuf = nil) ⇒ Object

Implements IO#read semantics. Without arguments it retrieves and returns all content.

With ‘length` argument returns exactly that number of bytes if they’re available.

With ‘outbuf` argument each call will return that same string object, where the value is replaced with retrieved content.

If end of file is reached, returns empty string if called without arguments, or nil if called with arguments. Raises IOError if closed.



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/down/chunked_io.rb', line 70

def read(length = nil, outbuf = nil)
  fail IOError, "closed stream" if closed?

  data   = outbuf.clear.force_encoding(Encoding::BINARY) if outbuf
  data ||= "".b

  remaining_length = length

  until remaining_length == 0 || eof?
    data << readpartial(remaining_length, buffer ||= String.new)
    remaining_length = length - data.bytesize if length
  end

  buffer.clear if buffer # deallocate string

  data.force_encoding(@encoding) unless length
  data unless data.empty? && length && length > 0
end

#readpartial(maxlen = nil, outbuf = nil) ⇒ Object

Implements IO#readpartial semantics. If there is any content readily available reads from it, otherwise fetches and reads from the next chunk. It writes to and reads from the cache when needed.

Without arguments it either returns all content that’s readily available, or the next chunk. This is useful when you don’t care about the size of chunks and you want to minimize string allocations.

With ‘maxlen` argument returns maximum of that amount of bytes (default is 16KB).

With ‘outbuf` argument each call will return that same string object, where the value is replaced with retrieved content.

Raises EOFError if end of file is reached. Raises IOError if closed.



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/down/chunked_io.rb', line 163

def readpartial(maxlen = nil, outbuf = nil)
  fail IOError, "closed stream" if closed?

  maxlen ||= 16*1024

  data   = cache.read(maxlen, outbuf) if cache && !cache.eof?
  data ||= outbuf.clear.force_encoding(Encoding::BINARY) if outbuf
  data ||= "".b

  return data if maxlen == 0

  if @buffer.nil? && data.empty?
    fail EOFError, "end of file reached" if chunks_depleted?
    @buffer = retrieve_chunk
  end

  remaining_length = maxlen - data.bytesize

  unless @buffer.nil? || remaining_length == 0
    if remaining_length < @buffer.bytesize
      buffered_data = @buffer.byteslice(0, remaining_length)
      @buffer       = @buffer.byteslice(remaining_length..-1)
    else
      buffered_data = @buffer
      @buffer       = nil
    end

    data << buffered_data

    cache.write(buffered_data) if cache

    buffered_data.clear unless buffered_data.frozen?
  end

  @position += data.bytesize

  data.force_encoding(Encoding::BINARY)
end

#rewindObject

Implements IO#rewind semantics. Rewinds the Down::ChunkedIO by rewinding the cache and setting the position to the beginning of the file. Raises IOError if closed or not rewindable.



253
254
255
256
257
258
259
# File 'lib/down/chunked_io.rb', line 253

def rewind
  fail IOError, "closed stream" if closed?
  fail IOError, "this Down::ChunkedIO is not rewindable" if cache.nil?

  cache.rewind
  @position = 0
end

#rewindable?Boolean

Returns whether the Down::ChunkedIO was specified as rewindable.

Returns:

  • (Boolean)


278
279
280
# File 'lib/down/chunked_io.rb', line 278

def rewindable?
  @rewindable
end

#seek(amount, whence = IO::SEEK_SET) ⇒ Object

Implements IO#seek semantics.



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/down/chunked_io.rb', line 203

def seek(amount, whence = IO::SEEK_SET)
  fail Errno::ESPIPE, "Illegal seek" if cache.nil?

  case whence
  when IO::SEEK_SET, :SET
    target_pos = amount
  when IO::SEEK_CUR, :CUR
    target_pos = @position + amount
  when IO::SEEK_END, :END
    unless chunks_depleted?
      cache.seek(0, IO::SEEK_END)
      IO.copy_stream(self, File::NULL)
    end

    target_pos = cache.size + amount
  else
    fail ArgumentError, "invalid whence: #{whence.inspect}"
  end

  if target_pos <= cache.size
    cache.seek(target_pos)
  else
    cache.seek(0, IO::SEEK_END)
    IO.copy_stream(self, File::NULL, target_pos - cache.size)
  end

  @position = cache.pos
end