Class: Jrf::Pipeline
- Inherits:
-
Object
- Object
- Jrf::Pipeline
- Defined in:
- lib/jrf/pipeline.rb
Instance Method Summary collapse
-
#call(input) {|value| ... } ⇒ Array?
Run the pipeline on an enumerable of input values.
-
#initialize(*blocks) ⇒ Pipeline
constructor
A new instance of Pipeline.
-
#read(*paths, lax: false) {|value| ... } ⇒ Array?
Run the pipeline on one or more files, mirroring how the CLI reads its file arguments: each path is opened (with .gz auto-decompression) and parsed as NDJSON.
Constructor Details
Instance Method Details
#call(input) {|value| ... } ⇒ Array?
Run the pipeline on an enumerable of input values.
Without a block, returns an Array of output values. With a block, streams each output value to the block.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/jrf/pipeline.rb', line 52 def call(input, &on_output) if on_output.nil? results = [] on_output = proc { |value| results << value } end begin input.each { |value| process_value(value, @stages, &on_output) } ensure flush_reducers(@stages, &on_output) end results unless results.nil? end |
#read(*paths, lax: false) {|value| ... } ⇒ Array?
Run the pipeline on one or more files, mirroring how the CLI reads its file arguments: each path is opened (with .gz auto-decompression) and parsed as NDJSON. Pass lax: true for multiline JSON / JSON-SEQ input.
Without a block, returns an Array of output values; with a block, streams each output value to the block.
31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/jrf/pipeline.rb', line 31 def read(*paths, lax: false, &on_output) raise ArgumentError, "at least one path is required" if paths.empty? input = Enumerator.new do |y| paths.each do |path| InputReader.open_path(path) do |stream| InputReader.each_value(stream, lax: lax) { |value| y << value } end end end call(input, &on_output) end |