Class: Bemi::Ingester
- Inherits:
-
Object
- Object
- Bemi::Ingester
- Defined in:
- lib/bemi/ingester.rb
Constant Summary collapse
- IGNORE_TABLES =
%w[bemi_changesets bemi_contexts].freeze
Class Method Summary collapse
Class Method Details
.daemonize! ⇒ Object
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/bemi/ingester.rb', line 5 def daemonize! Open3.popen3(stream_binlong_command) do |stdin, stdout, stderr, thread| stdin.close stderr_thread = Thread.new do while line = stderr.gets puts "ERROR: #{line}" end end stderr_thread.abort_on_exception = true stdout_thread = Thread.new do while line = stdout.gets begin payload = JSON.parse(line) next if IGNORE_TABLES.include?(payload['table']) Bemi::Storage.create_changeset!(payload) rescue JSON::ParserError => e puts "ERROR: #{line}" raise "Failed" end end end stdout_thread.abort_on_exception = true thread.join end end |
.stream_binlong_command ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/bemi/ingester.rb', line 35 def stream_binlong_command init_position = '' if changeset = Bemi::Changeset.last init_position = " --init_position=#{changeset.binlog_position}" puts "Started ingester with: #{changeset.binlog_position}" else puts "Started ingester with no init position" end "maxwell --host=127.0.0.1 --user=root --password=mysql --port=3366 --producer=stdout --output_row_query=true --output_binlog_position=true --output_commit_info=false#{init_position}" end |