Class: Pulsar::Client
- Inherits:
-
Object
- Object
- Pulsar::Client
- Defined in:
- lib/pulsar/client.rb
Overview
Entry point for creating producers and consumers against a Pulsar broker.
Constant Summary collapse
- DEFAULT_OPERATION_TIMEOUT =
30- DEFAULT_CONNECTION_TIMEOUT =
10
Instance Attribute Summary collapse
-
#connection_timeout ⇒ Object
readonly
Returns the value of attribute connection_timeout.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#operation_timeout ⇒ Object
readonly
Returns the value of attribute operation_timeout.
-
#service_url ⇒ Object
readonly
Returns the value of attribute service_url.
Class Method Summary collapse
Instance Method Summary collapse
- #close ⇒ Object
- #closed? ⇒ Boolean
- #consumer(topic:, subscription:, **_options) ⇒ Object
-
#initialize(service_url, operation_timeout: DEFAULT_OPERATION_TIMEOUT, connection_timeout: DEFAULT_CONNECTION_TIMEOUT, logger: nil, **options) ⇒ Client
constructor
A new instance of Client.
- #producer(topic:, max_pending_messages: 1000, **_options) ⇒ Object
Constructor Details
#initialize(service_url, operation_timeout: DEFAULT_OPERATION_TIMEOUT, connection_timeout: DEFAULT_CONNECTION_TIMEOUT, logger: nil, **options) ⇒ Client
Returns a new instance of Client.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/pulsar/client.rb', line 25 def initialize(service_url, operation_timeout: DEFAULT_OPERATION_TIMEOUT, connection_timeout: DEFAULT_CONNECTION_TIMEOUT, logger: nil, **) () @service_url = normalize_service_url(service_url) @service_uri = URI(@service_url) @operation_timeout = operation_timeout @connection_timeout = connection_timeout @logger = logger @producers = Set.new @consumers = Set.new @closed = false @producer_id = 0 @consumer_id = 0 end |
Instance Attribute Details
#connection_timeout ⇒ Object (readonly)
Returns the value of attribute connection_timeout.
12 13 14 |
# File 'lib/pulsar/client.rb', line 12 def connection_timeout @connection_timeout end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
12 13 14 |
# File 'lib/pulsar/client.rb', line 12 def logger @logger end |
#operation_timeout ⇒ Object (readonly)
Returns the value of attribute operation_timeout.
12 13 14 |
# File 'lib/pulsar/client.rb', line 12 def operation_timeout @operation_timeout end |
#service_url ⇒ Object (readonly)
Returns the value of attribute service_url.
12 13 14 |
# File 'lib/pulsar/client.rb', line 12 def service_url @service_url end |
Class Method Details
.open(service_url, **options) ⇒ Object
14 15 16 17 18 19 20 21 22 23 |
# File 'lib/pulsar/client.rb', line 14 def self.open(service_url, **) client = new(service_url, **) return client unless block_given? begin yield client ensure client.close end end |
Instance Method Details
#close ⇒ Object
70 71 72 73 74 75 76 77 78 |
# File 'lib/pulsar/client.rb', line 70 def close return if closed? @closed = true close_all(@producers) close_all(@consumers) @connection&.close nil end |
#closed? ⇒ Boolean
80 81 82 |
# File 'lib/pulsar/client.rb', line 80 def closed? @closed end |
#consumer(topic:, subscription:, **_options) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/pulsar/client.rb', line 55 def consumer(topic:, subscription:, **) ensure_open! lookup_topic(topic) impl = Internal::ConsumerImpl.create( connection_provider: -> { connection }, topic: topic, subscription: subscription, consumer_id: next_consumer_id, operation_timeout: operation_timeout, receiver_queue_size: 1000 ) Consumer.new(topic: topic, subscription: subscription, impl: impl).tap { |consumer| @consumers.add(consumer) } end |
#producer(topic:, max_pending_messages: 1000, **_options) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/pulsar/client.rb', line 41 def producer(topic:, max_pending_messages: 1000, **) ensure_open! lookup_topic(topic) impl = Internal::ProducerImpl.create( connection_provider: -> { connection }, topic: topic, producer_id: next_producer_id, operation_timeout: operation_timeout, max_pending_messages: ) Producer.new(topic: topic, impl: impl).tap { |producer| @producers.add(producer) } end |