Class: Sbmt::KafkaProducer::BaseProducer
- Inherits:
-
Object
- Object
- Sbmt::KafkaProducer::BaseProducer
- Extended by:
- Dry::Initializer
- Defined in:
- lib/sbmt/kafka_producer/base_producer.rb
Direct Known Subclasses
Constant Summary collapse
- MSG_SUCCESS =
"Message has been successfully sent to Kafka"
Instance Method Summary collapse
- #async_publish(payload, options = {}) ⇒ Object
- #async_publish!(payload, options = {}) ⇒ Object
- #sync_publish(payload, options = {}) ⇒ Object
- #sync_publish!(payload, options = {}) ⇒ Object
Instance Method Details
#async_publish(payload, options = {}) ⇒ Object
39 40 41 42 43 44 45 |
# File 'lib/sbmt/kafka_producer/base_producer.rb', line 39 def async_publish(payload, = {}) async_publish!(payload, ) true rescue WaterDrop::Errors::ProduceError => e log_error(e) false end |
#async_publish!(payload, options = {}) ⇒ Object
32 33 34 35 36 37 |
# File 'lib/sbmt/kafka_producer/base_producer.rb', line 32 def async_publish!(payload, = {}) around_publish do do_produce(:async, payload, .merge(topic: topic)) end true end |
#sync_publish(payload, options = {}) ⇒ Object
24 25 26 27 28 29 30 |
# File 'lib/sbmt/kafka_producer/base_producer.rb', line 24 def sync_publish(payload, = {}) sync_publish!(payload, ) true rescue WaterDrop::Errors::ProduceError => e log_error(e) false end |
#sync_publish!(payload, options = {}) ⇒ Object
14 15 16 17 18 19 20 21 22 |
# File 'lib/sbmt/kafka_producer/base_producer.rb', line 14 def sync_publish!(payload, = {}) report, produce_duration = around_publish do measure_time do do_produce(:sync, payload, .merge(topic: topic)) end end log_success(report, produce_duration) true end |