Class: VibeZstd::DecompressReader
- Inherits:
-
Object
- Object
- VibeZstd::DecompressReader
- Includes:
- Enumerable
- Defined in:
- lib/vibe_zstd.rb,
ext/vibe_zstd/vibe_zstd.c
Class Method Summary collapse
-
.open(io, **options) {|reader| ... } ⇒ Object
Block-based resource management Automatically cleans up when block completes.
Instance Method Summary collapse
-
#each(chunk_size = nil) ⇒ Object
Iterate over chunks (required for Enumerable).
-
#each_line(sep = $/) ⇒ Object
Iterate over lines.
-
#eof ⇒ Object
Alias for eof?.
- #eof? ⇒ Boolean
-
#gets(sep = $/) ⇒ Object
(also: #readline)
Read a single line (up to separator or EOF) Uses buffered reads (8192 bytes) instead of byte-at-a-time for performance.
-
#initialize(*args) ⇒ Object
constructor
Wraps ZSTD streaming decompression to read from a compressed IO object.
-
#read(*args) ⇒ Object
of arbitrarily large files without loading everything into memory.
-
#read_all ⇒ Object
Read all remaining data Drains any buffered data from line_buffer first.
-
#readpartial(maxlen) ⇒ Object
Read exactly n bytes, or raise EOFError.
Constructor Details
#initialize(*args) ⇒ Object
Wraps ZSTD streaming decompression to read from a compressed IO object
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 |
# File 'ext/vibe_zstd/streaming.c', line 265
static VALUE
vibe_zstd_reader_initialize(int argc, VALUE *argv, VALUE self) {
VALUE io, options;
rb_scan_args(argc, argv, "11", &io, &options);
vibe_zstd_dstream* dstream;
TypedData_Get_Struct(self, vibe_zstd_dstream, &vibe_zstd_dstream_type, dstream);
// Validate IO object responds to read (duck typing)
if (!rb_respond_to(io, id_read)) {
rb_raise(rb_eTypeError, "IO object must respond to read");
}
// Store IO object (write barrier for WB_PROTECTED)
RB_OBJ_WRITE(self, &dstream->io, io);
rb_ivar_set(self, rb_intern("@io"), io);
// Parse options
VALUE dict = Qnil;
size_t initial_chunk_size = 0; // 0 = use default ZSTD_DStreamOutSize()
if (!NIL_P(options)) {
Check_Type(options, T_HASH);
dict = rb_hash_aref(options, ID2SYM(rb_intern("dict")));
VALUE v_chunk_size = rb_hash_aref(options, ID2SYM(rb_intern("initial_chunk_size")));
if (!NIL_P(v_chunk_size)) {
initial_chunk_size = NUM2SIZET(v_chunk_size);
if (initial_chunk_size == 0) {
rb_raise(rb_eArgError, "initial_chunk_size must be greater than 0");
}
}
}
// Create decompression context (DStream and DCtx are the same since v1.3.0)
dstream->dstream = ZSTD_createDStream();
if (!dstream->dstream) {
rb_raise(rb_eRuntimeError, "Failed to create decompression stream");
}
// Reset context for streaming
size_t result = ZSTD_DCtx_reset((ZSTD_DCtx*)dstream->dstream, ZSTD_reset_session_only);
if (ZSTD_isError(result)) {
rb_raise(rb_eRuntimeError, "Failed to reset decompression context: %s", ZSTD_getErrorName(result));
}
// Set dictionary if provided
if (!NIL_P(dict)) {
vibe_zstd_ddict* ddict_obj;
TypedData_Get_Struct(dict, vibe_zstd_ddict, &vibe_zstd_ddict_type, ddict_obj);
result = ZSTD_DCtx_refDDict((ZSTD_DCtx*)dstream->dstream, ddict_obj->ddict);
if (ZSTD_isError(result)) {
rb_raise(rb_eRuntimeError, "Failed to set dictionary: %s", ZSTD_getErrorName(result));
}
// Retain the DDict object so GC won't free it while the stream holds a raw
// pointer to its internal ZSTD_DDict (ZSTD_DCtx_refDDict stores no Ruby ref)
rb_ivar_set(self, rb_intern("@dict"), dict);
}
// Initialize input buffer management
RB_OBJ_WRITE(self, &dstream->input_data, rb_str_new(NULL, 0));
dstream->input.src = NULL;
dstream->input.size = 0;
dstream->input.pos = 0;
dstream->eof = 0;
dstream->initial_chunk_size = initial_chunk_size;
return self;
}
|
Class Method Details
.open(io, **options) {|reader| ... } ⇒ Object
Block-based resource management Automatically cleans up when block completes
207 208 209 210 211 212 213 214 |
# File 'lib/vibe_zstd.rb', line 207 def self.open(io, **) reader = new(io, **) return reader unless block_given? yield reader # Reader doesn't have finish, but this ensures cleanup end |
Instance Method Details
#each(chunk_size = nil) ⇒ Object
Iterate over chunks (required for Enumerable)
237 238 239 240 241 242 243 244 |
# File 'lib/vibe_zstd.rb', line 237 def each(chunk_size = nil) return enum_for(:each, chunk_size) unless block_given? until eof? chunk = read(chunk_size) yield chunk if chunk end end |
#each_line(sep = $/) ⇒ Object
Iterate over lines
272 273 274 275 276 277 278 |
# File 'lib/vibe_zstd.rb', line 272 def each_line(sep = $/) return enum_for(:each_line, sep) unless block_given? while (line = gets(sep)) yield line end end |
#eof ⇒ Object
Alias for eof?
232 233 234 |
# File 'lib/vibe_zstd.rb', line 232 def eof eof? end |
#eof? ⇒ Boolean
488 489 490 491 492 493 |
# File 'ext/vibe_zstd/streaming.c', line 488
static VALUE
vibe_zstd_reader_eof(VALUE self) {
vibe_zstd_dstream* dstream;
TypedData_Get_Struct(self, vibe_zstd_dstream, &vibe_zstd_dstream_type, dstream);
return dstream->eof ? Qtrue : Qfalse;
}
|
#gets(sep = $/) ⇒ Object Also known as: readline
Read a single line (up to separator or EOF) Uses buffered reads (8192 bytes) instead of byte-at-a-time for performance. Orders of magnitude faster for line-oriented reading.
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/vibe_zstd.rb', line 249 def gets(sep = $/) return nil if eof? && (@line_buffer.nil? || @line_buffer.empty?) @line_buffer ||= +"" loop do # Check buffer for separator if (idx = @line_buffer.index(sep)) return @line_buffer.slice!(0, idx + sep.length) end # Read more data in larger chunks chunk = read(8192) break unless chunk @line_buffer << chunk end # Return remaining buffer or nil @line_buffer.empty? ? nil : @line_buffer.slice!(0, @line_buffer.length) end |
#read(*args) ⇒ Object
of arbitrarily large files without loading everything into memory.
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 |
# File 'ext/vibe_zstd/streaming.c', line 359
static VALUE
vibe_zstd_reader_read(int argc, VALUE *argv, VALUE self) {
VALUE size_arg;
rb_scan_args(argc, argv, "01", &size_arg);
vibe_zstd_dstream* dstream;
TypedData_Get_Struct(self, vibe_zstd_dstream, &vibe_zstd_dstream_type, dstream);
// read(0): per IO semantics, always return "" without touching stream state
if (!NIL_P(size_arg) && NUM2SIZET(size_arg) == 0) {
return rb_str_new(NULL, 0);
}
if (dstream->eof) {
return Qnil;
}
// Unbounded reads use configurable chunk size (defaults to ZSTD_DStreamOutSize() ~128KB)
// This provides chunked streaming behavior for true streaming use cases
size_t default_chunk_size = (dstream->initial_chunk_size > 0) ? dstream->initial_chunk_size : ZSTD_DStreamOutSize();
size_t requested_size = NIL_P(size_arg) ? default_chunk_size : NUM2SIZET(size_arg);
size_t inBufferSize = ZSTD_DStreamInSize();
// Cap the initial allocation to avoid multi-gigabyte pre-allocations when
// the caller passes a huge size argument for a small stream. The buffer
// grows geometrically below as output accumulates.
size_t default_out_size = ZSTD_DStreamOutSize();
size_t initial_alloc = (requested_size < default_out_size) ? requested_size : default_out_size;
VALUE result = rb_str_buf_new((long)initial_alloc);
size_t total_read = 0;
int made_progress = 0;
while (total_read < requested_size) {
// Refill input buffer when all compressed data consumed
if (dstream->input.pos >= dstream->input.size) {
VALUE chunk = rb_funcall(dstream->io, id_read, 1, SIZET2NUM(inBufferSize));
if (NIL_P(chunk)) {
dstream->eof = 1;
if (total_read == 0 && !made_progress) {
return Qnil;
}
break;
}
// The IO is duck-typed: read may return anything. Convert via to_str
// (raising TypeError otherwise) so RSTRING below never sees a non-String.
StringValue(chunk);
// Store a private frozen copy so that an IO that reuses/mutates its
// returned buffer string cannot invalidate dstream->input.src between
// successive read() calls. rb_str_new_frozen is cheap (copy-on-write
// snapshot) when the string is already frozen, and allocates a
// separate copy otherwise.
VALUE frozen_chunk = rb_str_new_frozen(chunk);
// Reset input buffer with new data (write barrier for WB_PROTECTED)
RB_OBJ_WRITE(self, &dstream->input_data, frozen_chunk);
dstream->input.src = RSTRING_PTR(frozen_chunk);
dstream->input.size = RSTRING_LEN(frozen_chunk);
dstream->input.pos = 0;
}
if (dstream->input.size == 0) {
dstream->eof = 1;
break;
}
// Grow the output buffer geometrically when it is full, capped at
// requested_size. We must recompute RSTRING_PTR after any resize
// because the backing allocation may move.
size_t current_capacity = (size_t)rb_str_capacity(result);
if (total_read >= current_capacity) {
size_t new_capacity = current_capacity * 2;
if (new_capacity > requested_size) new_capacity = requested_size;
rb_str_resize(result, (long)new_capacity);
}
// Cap space_left at (requested_size - total_read) to ensure read(n) never
// returns more than n bytes: rb_str_capacity may exceed the requested size
// due to malloc's internal size-class rounding (e.g. request 100, get 135).
size_t effective_capacity = (size_t)rb_str_capacity(result);
if (effective_capacity > requested_size) effective_capacity = requested_size;
size_t space_left = effective_capacity - total_read;
ZSTD_outBuffer output = {
.dst = RSTRING_PTR(result) + total_read,
.size = space_left,
.pos = 0
};
// ZSTD_decompressStream advances input.pos and output.pos
// Return value: 0 = frame complete, >0 = hint for next input size, error if < 0
size_t ret = ZSTD_decompressStream(dstream->dstream, &output, &dstream->input);
if (ZSTD_isError(ret)) {
rb_raise(rb_eRuntimeError, "Decompression failed: %s", ZSTD_getErrorName(ret));
}
if (output.pos > 0) {
total_read += output.pos;
made_progress = 1;
}
// Exit when we've read enough data
if (total_read >= requested_size) {
break;
}
// ret == 0 signals end of current frame
if (ret == 0) {
dstream->eof = 1;
break;
}
// No output produced: need more input
if (output.pos == 0) {
continue;
}
}
if (total_read == 0) {
dstream->eof = 1;
return Qnil;
}
rb_str_set_len(result, total_read);
return result;
}
|
#read_all ⇒ Object
Read all remaining data Drains any buffered data from line_buffer first
218 219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/vibe_zstd.rb', line 218 def read_all chunks = [] # Drain line buffer first if present if @line_buffer && !@line_buffer.empty? chunks << @line_buffer @line_buffer = +"" end while (chunk = read) chunks << chunk end chunks.join end |
#readpartial(maxlen) ⇒ Object
Read exactly n bytes, or raise EOFError
284 285 286 287 288 289 290 291 |
# File 'lib/vibe_zstd.rb', line 284 def readpartial(maxlen) raise EOFError, "end of file reached" if eof? data = read(maxlen) raise EOFError, "end of file reached" if data.nil? data end |