Class: Sbmt::KafkaProducer::BaseProducer

Inherits:
Object
  • Object
show all
Extended by:
Dry::Initializer
Defined in:
lib/sbmt/kafka_producer/base_producer.rb

Direct Known Subclasses

OutboxProducer

Constant Summary collapse

MSG_SUCCESS =
"Message has been successfully sent to Kafka"

Instance Method Summary collapse

Instance Method Details

#async_publish(payload, options = {}) ⇒ Object



39
40
41
42
43
44
45
# File 'lib/sbmt/kafka_producer/base_producer.rb', line 39

def async_publish(payload, options = {})
  async_publish!(payload, options)
  true
rescue WaterDrop::Errors::ProduceError => e
  log_error(e)
  false
end

#async_publish!(payload, options = {}) ⇒ Object



32
33
34
35
36
37
# File 'lib/sbmt/kafka_producer/base_producer.rb', line 32

def async_publish!(payload, options = {})
  around_publish do
    do_produce(:async, payload, options.merge(topic: topic))
  end
  true
end

#sync_publish(payload, options = {}) ⇒ Object



24
25
26
27
28
29
30
# File 'lib/sbmt/kafka_producer/base_producer.rb', line 24

def sync_publish(payload, options = {})
  sync_publish!(payload, options)
  true
rescue WaterDrop::Errors::ProduceError => e
  log_error(e)
  false
end

#sync_publish!(payload, options = {}) ⇒ Object



14
15
16
17
18
19
20
21
22
# File 'lib/sbmt/kafka_producer/base_producer.rb', line 14

def sync_publish!(payload, options = {})
  report, produce_duration = around_publish do
    measure_time do
      do_produce(:sync, payload, options.merge(topic: topic))
    end
  end
  log_success(report, produce_duration)
  true
end