Class: Jrf::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/jrf/runner.rb

Defined Under Namespace

Classes: ProbeValue

Constant Summary collapse

PROBE_VALUE =
ProbeValue.new

Instance Method Summary collapse

Constructor Details

#initialize(input: ARGF, out: $stdout, err: $stderr) ⇒ Runner

Returns a new instance of Runner.



27
28
29
30
31
# File 'lib/jrf/runner.rb', line 27

def initialize(input: ARGF, out: $stdout, err: $stderr)
  @input = input
  @out = out
  @err = err
end

Instance Method Details

#run(expression, verbose: false) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/jrf/runner.rb', line 33

def run(expression, verbose: false)
  parsed = PipelineParser.new(expression).parse
  stages = parsed[:stages]
  dump_stages(stages) if verbose

  ctx = RowContext.new
  compiled = compile_stages(stages, ctx)
  initialize_reducers(compiled, ctx)
  error = nil

  begin
    @input.each_line do |line|
      line = line.strip
      next if line.empty?

      process_value(JSON.parse(line), compiled, ctx)
    end
  rescue StandardError => e
    error = e
  ensure
    flush_reducers(compiled, ctx)
  end

  raise error if error
end