Class: BPS::Publisher::Kafka
- Inherits:
-
Abstract
- Object
- Abstract
- BPS::Publisher::Kafka
- Defined in:
- lib/bps/publisher/kafka.rb
Direct Known Subclasses
Defined Under Namespace
Classes: Topic
Constant Summary collapse
- CLIENT_OPTS =
{ client_id: :string, connect_timeout: :float, socket_timeout: :float, ssl_ca_cert_file_path: :string, ssl_ca_cert: :string, ssl_client_cert: :string, ssl_client_cert_key: :string, ssl_client_cert_key_password: :string, ssl_client_cert_chain: :string, sasl_gssapi_principal: :string, sasl_gssapi_keytab: :string, sasl_plain_authzid: :string, sasl_plain_username: :string, sasl_plain_password: :string, sasl_scram_username: :string, sasl_scram_password: :string, sasl_scram_mechanism: :string, sasl_over_ssl: :bool, ssl_ca_certs_from_system: :bool, ssl_verify_hostname: :bool, }.freeze
- PRODUCER_OPTS =
{ # standard retry_backoff: :float, compression_codec: :symbol, compression_threshold: :int, ack_timeout: :float, required_acks: :symbol, max_retries: :int, max_buffer_size: :int, max_buffer_bytesize: :int, idempotent: :bool, transactional: :bool, transactional_id: :string, transactional_timeout: :bool, # async delivery_interval: :float, delivery_threshold: :int, max_queue_size: :int, }.freeze
Class Method Summary collapse
-
.coercer ⇒ BPS::Coercer
The options coercer.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(broker_addrs, **opts) ⇒ Kafka
constructor
A new instance of Kafka.
- #topic(name) ⇒ Object
Constructor Details
#initialize(broker_addrs, **opts) ⇒ Kafka
Returns a new instance of Kafka.
82 83 84 85 86 87 88 89 |
# File 'lib/bps/publisher/kafka.rb', line 82 def initialize(broker_addrs, **opts) super() broker_addrs = parse_url(broker_addrs) if broker_addrs.is_a?(URI) @topics = {} @client = ::Kafka.new(broker_addrs, **opts.slice(*CLIENT_OPTS.keys)) @producer = init_producer(**opts.slice(*PRODUCER_OPTS.keys)) end |
Class Method Details
.coercer ⇒ BPS::Coercer
Returns the options coercer.
75 76 77 |
# File 'lib/bps/publisher/kafka.rb', line 75 def self.coercer @coercer ||= BPS::Coercer.new(CLIENT_OPTS.merge(PRODUCER_OPTS)).freeze end |
Instance Method Details
#close ⇒ Object
95 96 97 98 99 100 |
# File 'lib/bps/publisher/kafka.rb', line 95 def close super @producer.shutdown @client.close end |
#topic(name) ⇒ Object
91 92 93 |
# File 'lib/bps/publisher/kafka.rb', line 91 def topic(name) @topics[name] ||= self.class::Topic.new(@producer, name) end |