Module: Jrf::InputReader

Defined in:
lib/jrf/input_reader.rb

Overview

File and stream input reading for jrf pipelines.

Used by both the CLI runner and Pipeline#read to share gzip auto-detection, strict NDJSON parsing, and (lazily loaded) –lax multiline parsing.

Defined Under Namespace

Classes: RsNormalizer

Constant Summary collapse

RS_CHAR =
"\x1e"

Class Method Summary collapse

Class Method Details

.each_value(stream, lax: false, &block) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
# File 'lib/jrf/input_reader.rb', line 24

def each_value(stream, lax: false, &block)
  if lax
    each_value_lax(stream, &block)
  else
    stream.each_line do |line|
      line.strip!
      next if line.empty?
      block.call(JSON.parse(line))
    end
  end
end

.each_value_lax(stream, &block) ⇒ Object



36
37
38
39
40
41
42
43
# File 'lib/jrf/input_reader.rb', line 36

def each_value_lax(stream, &block)
  require "oj"
  Oj.sc_parse(streaming_handler_class.new(&block), RsNormalizer.new(stream))
rescue LoadError
  raise "oj is required for --lax mode (gem install oj)"
rescue Oj::ParseError => e
  raise JSON::ParserError, e.message
end

.open_path(path, &block) ⇒ Object



16
17
18
19
20
21
22
# File 'lib/jrf/input_reader.rb', line 16

def open_path(path, &block)
  if path.end_with?(".gz")
    Zlib::GzipReader.open(path, &block)
  else
    File.open(path, "rb", &block)
  end
end

.streaming_handler_classObject



45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/jrf/input_reader.rb', line 45

def streaming_handler_class
  @streaming_handler_class ||= Class.new(Oj::ScHandler) do
    def initialize(&emit)
      @emit = emit
    end

    def hash_start = {}
    def hash_key(key) = key
    def hash_set(hash, key, value) = hash[key] = value
    def array_start = []
    def array_append(array, value) = array << value
    def add_value(value) = @emit.call(value)
  end
end