Class: Bemi::Ingester

Inherits:
Object
  • Object
show all
Defined in:
lib/bemi/ingester.rb

Constant Summary collapse

IGNORE_TABLES =
%w[bemi_changesets bemi_contexts].freeze

Class Method Summary collapse

Class Method Details

.daemonize!Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/bemi/ingester.rb', line 5

def daemonize!
  Open3.popen3(stream_binlong_command) do |stdin, stdout, stderr, thread|
    stdin.close

    stderr_thread = Thread.new do
      while line = stderr.gets
        puts "ERROR: #{line}"
      end
    end
    stderr_thread.abort_on_exception = true

    stdout_thread = Thread.new do
      while line = stdout.gets
        begin
          payload = JSON.parse(line)
          next if IGNORE_TABLES.include?(payload['table'])

          Bemi::Storage.create_changeset!(payload)
        rescue JSON::ParserError => e
          puts "ERROR: #{line}"
          raise "Failed"
        end
      end
    end
    stdout_thread.abort_on_exception = true

    thread.join
  end
end

.stream_binlong_commandObject



35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/bemi/ingester.rb', line 35

def stream_binlong_command
  init_position = ''

  if changeset = Bemi::Changeset.last
    init_position = " --init_position=#{changeset.binlog_position}"
    puts "Started ingester with: #{changeset.binlog_position}"
  else
    puts "Started ingester with no init position"
  end

  "maxwell --host=127.0.0.1 --user=root --password=mysql --port=3366 --producer=stdout --output_row_query=true --output_binlog_position=true --output_commit_info=false#{init_position}"
end