Class: Jrf::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/jrf/pipeline.rb

Instance Method Summary collapse

Constructor Details

#initialize(*blocks) ⇒ Pipeline

Returns a new instance of Pipeline.

Raises:

  • (ArgumentError)


10
11
12
13
14
# File 'lib/jrf/pipeline.rb', line 10

def initialize(*blocks)
  raise ArgumentError, "at least one stage block is required" if blocks.empty?

  @stages = blocks.map { |block| Stage.new(block, src: nil) }
end

Instance Method Details

#call(input) {|value| ... } ⇒ Array?

Run the pipeline on an enumerable of input values.

Without a block, returns an Array of output values. With a block, streams each output value to the block.

Parameters:

  • input (Enumerable)

    input values to process

Yield Parameters:

  • value

    output value

Returns:

  • (Array, nil)

    output values (without block), or nil (with block)



52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/jrf/pipeline.rb', line 52

def call(input, &on_output)
  if on_output.nil?
    results = []
    on_output = proc { |value| results << value }
  end

  begin
    input.each { |value| process_value(value, @stages, &on_output) }
  ensure
    flush_reducers(@stages, &on_output)
  end

  results unless results.nil?
end

#read(*paths, lax: false) {|value| ... } ⇒ Array?

Run the pipeline on one or more files, mirroring how the CLI reads its file arguments: each path is opened (with .gz auto-decompression) and parsed as NDJSON. Pass lax: true for multiline JSON / JSON-SEQ input.

Without a block, returns an Array of output values; with a block, streams each output value to the block.

Examples:

Build a lookup hash from one file, use it to filter another

lookup = Jrf.new(
  proc { reduce({}) { |a, v| a[[v["tid"], v["conn"]]] = v["late_acked"]; a } }
).read("conn_stats.ndjson").first

Parameters:

  • paths (Array<String>)

    one or more file paths

  • lax (Boolean) (defaults to: false)

    enable lax (multiline / whitespace-delimited) parsing

Yield Parameters:

  • value

    output value

Returns:

  • (Array, nil)

    output values (without block), or nil (with block)

Raises:

  • (ArgumentError)


31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/jrf/pipeline.rb', line 31

def read(*paths, lax: false, &on_output)
  raise ArgumentError, "at least one path is required" if paths.empty?

  input = Enumerator.new do |y|
    paths.each do |path|
      InputReader.open_path(path) do |stream|
        InputReader.each_value(stream, lax: lax) { |value| y << value }
      end
    end
  end
  call(input, &on_output)
end