Module: Jrf::InputReader
- Defined in:
- lib/jrf/input_reader.rb
Overview
File and stream input reading for jrf pipelines.
Used by both the CLI runner and Pipeline#read to share gzip auto-detection, strict NDJSON parsing, and (lazily loaded) –lax multiline parsing.
Defined Under Namespace
Classes: RsNormalizer
Constant Summary collapse
- RS_CHAR =
"\x1e"
Class Method Summary collapse
- .each_value(stream, lax: false, &block) ⇒ Object
- .each_value_lax(stream, &block) ⇒ Object
- .open_path(path, &block) ⇒ Object
- .streaming_handler_class ⇒ Object
Class Method Details
.each_value(stream, lax: false, &block) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/jrf/input_reader.rb', line 24 def each_value(stream, lax: false, &block) if lax each_value_lax(stream, &block) else stream.each_line do |line| line.strip! next if line.empty? block.call(JSON.parse(line)) end end end |
.each_value_lax(stream, &block) ⇒ Object
36 37 38 39 40 41 42 43 |
# File 'lib/jrf/input_reader.rb', line 36 def each_value_lax(stream, &block) require "oj" Oj.sc_parse(streaming_handler_class.new(&block), RsNormalizer.new(stream)) rescue LoadError raise "oj is required for --lax mode (gem install oj)" rescue Oj::ParseError => e raise JSON::ParserError, e. end |
.open_path(path, &block) ⇒ Object
16 17 18 19 20 21 22 |
# File 'lib/jrf/input_reader.rb', line 16 def open_path(path, &block) if path.end_with?(".gz") Zlib::GzipReader.open(path, &block) else File.open(path, "rb", &block) end end |
.streaming_handler_class ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/jrf/input_reader.rb', line 45 def streaming_handler_class @streaming_handler_class ||= Class.new(Oj::ScHandler) do def initialize(&emit) @emit = emit end def hash_start = {} def hash_key(key) = key def hash_set(hash, key, value) = hash[key] = value def array_start = [] def array_append(array, value) = array << value def add_value(value) = @emit.call(value) end end |