Class: Tina4::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/tina4/queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(topic:, backend: nil, max_retries: 3) ⇒ Consumer

Returns a new instance of Consumer.



57
58
59
60
61
62
63
# File 'lib/tina4/queue.rb', line 57

def initialize(topic:, backend: nil, max_retries: 3)
  @topic = topic
  @backend = backend || Tina4::QueueBackends::LiteBackend.new
  @max_retries = max_retries
  @handlers = []
  @running = false
end

Instance Method Details

#on_message(&block) ⇒ Object



65
66
67
# File 'lib/tina4/queue.rb', line 65

def on_message(&block)
  @handlers << block
end

#process_oneObject



88
89
90
91
# File 'lib/tina4/queue.rb', line 88

def process_one
  message = @backend.dequeue(@topic)
  process_message(message) if message
end

#start(poll_interval: 1) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/tina4/queue.rb', line 69

def start(poll_interval: 1)
  @running = true
  Tina4::Debug.info("Consumer started for topic: #{@topic}")

  while @running
    message = @backend.dequeue(@topic)
    if message
      process_message(message)
    else
      sleep(poll_interval)
    end
  end
end

#stopObject



83
84
85
86
# File 'lib/tina4/queue.rb', line 83

def stop
  @running = false
  Tina4::Debug.info("Consumer stopped for topic: #{@topic}")
end