Class: Pulsar::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/pulsar/client.rb

Overview

Entry point for creating producers and consumers against a Pulsar broker.

Constant Summary collapse

DEFAULT_OPERATION_TIMEOUT =
30
DEFAULT_CONNECTION_TIMEOUT =
10

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(service_url, operation_timeout: DEFAULT_OPERATION_TIMEOUT, connection_timeout: DEFAULT_CONNECTION_TIMEOUT, logger: nil, **options) ⇒ Client

Returns a new instance of Client.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/pulsar/client.rb', line 25

def initialize(service_url, operation_timeout: DEFAULT_OPERATION_TIMEOUT,
               connection_timeout: DEFAULT_CONNECTION_TIMEOUT, logger: nil, **options)
  validate_unsupported_options!(options)

  @service_url = normalize_service_url(service_url)
  @service_uri = URI(@service_url)
  @operation_timeout = operation_timeout
  @connection_timeout = connection_timeout
  @logger = logger
  @producers = Set.new
  @consumers = Set.new
  @closed = false
  @producer_id = 0
  @consumer_id = 0
end

Instance Attribute Details

#connection_timeoutObject (readonly)

Returns the value of attribute connection_timeout.



12
13
14
# File 'lib/pulsar/client.rb', line 12

def connection_timeout
  @connection_timeout
end

#loggerObject (readonly)

Returns the value of attribute logger.



12
13
14
# File 'lib/pulsar/client.rb', line 12

def logger
  @logger
end

#operation_timeoutObject (readonly)

Returns the value of attribute operation_timeout.



12
13
14
# File 'lib/pulsar/client.rb', line 12

def operation_timeout
  @operation_timeout
end

#service_urlObject (readonly)

Returns the value of attribute service_url.



12
13
14
# File 'lib/pulsar/client.rb', line 12

def service_url
  @service_url
end

Class Method Details

.open(service_url, **options) ⇒ Object



14
15
16
17
18
19
20
21
22
23
# File 'lib/pulsar/client.rb', line 14

def self.open(service_url, **options)
  client = new(service_url, **options)
  return client unless block_given?

  begin
    yield client
  ensure
    client.close
  end
end

Instance Method Details

#closeObject



70
71
72
73
74
75
76
77
78
# File 'lib/pulsar/client.rb', line 70

def close
  return if closed?

  @closed = true
  close_all(@producers)
  close_all(@consumers)
  @connection&.close
  nil
end

#closed?Boolean

Returns:

  • (Boolean)


80
81
82
# File 'lib/pulsar/client.rb', line 80

def closed?
  @closed
end

#consumer(topic:, subscription:, **_options) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/pulsar/client.rb', line 55

def consumer(topic:, subscription:, **_options)
  ensure_open!
  lookup_topic(topic)

  impl = Internal::ConsumerImpl.create(
    connection_provider: -> { connection },
    topic: topic,
    subscription: subscription,
    consumer_id: next_consumer_id,
    operation_timeout: operation_timeout,
    receiver_queue_size: 1000
  )
  Consumer.new(topic: topic, subscription: subscription, impl: impl).tap { |consumer| @consumers.add(consumer) }
end

#producer(topic:, max_pending_messages: 1000, **_options) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/pulsar/client.rb', line 41

def producer(topic:, max_pending_messages: 1000, **_options)
  ensure_open!
  lookup_topic(topic)

  impl = Internal::ProducerImpl.create(
    connection_provider: -> { connection },
    topic: topic,
    producer_id: next_producer_id,
    operation_timeout: operation_timeout,
    max_pending_messages: max_pending_messages
  )
  Producer.new(topic: topic, impl: impl).tap { |producer| @producers.add(producer) }
end