Class: Pulsar::Internal::ProducerImpl
- Inherits:
-
Object
- Object
- Pulsar::Internal::ProducerImpl
- Defined in:
- lib/pulsar/internal/producer_impl.rb
Overview
Implements broker-side producer creation and unbatched sends.
Instance Attribute Summary collapse
-
#producer_id ⇒ Object
readonly
Returns the value of attribute producer_id.
-
#producer_name ⇒ Object
readonly
Returns the value of attribute producer_name.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Class Method Summary collapse
Instance Method Summary collapse
- #close ⇒ Object
- #closed? ⇒ Boolean
-
#initialize(connection_provider:, topic:, producer_id:, operation_timeout:, max_pending_messages:) ⇒ ProducerImpl
constructor
A new instance of ProducerImpl.
- #send(payload, properties: {}, key: nil, event_time: nil, timeout: nil) ⇒ Object
Constructor Details
#initialize(connection_provider:, topic:, producer_id:, operation_timeout:, max_pending_messages:) ⇒ ProducerImpl
Returns a new instance of ProducerImpl.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/pulsar/internal/producer_impl.rb', line 21 def initialize(connection_provider:, topic:, producer_id:, operation_timeout:, max_pending_messages:) @connection_provider = connection_provider @connection = nil @topic = topic @producer_id = producer_id @producer_name = nil @operation_timeout = operation_timeout @max_pending_messages = @pending_sends = 0 @pending_condition = ConditionVariable.new @sequence_id = -1 @mutex = Mutex.new @closed = false end |
Instance Attribute Details
#producer_id ⇒ Object (readonly)
Returns the value of attribute producer_id.
7 8 9 |
# File 'lib/pulsar/internal/producer_impl.rb', line 7 def producer_id @producer_id end |
#producer_name ⇒ Object (readonly)
Returns the value of attribute producer_name.
7 8 9 |
# File 'lib/pulsar/internal/producer_impl.rb', line 7 def producer_name @producer_name end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
7 8 9 |
# File 'lib/pulsar/internal/producer_impl.rb', line 7 def topic @topic end |
Class Method Details
.create(topic:, producer_id:, operation_timeout:, max_pending_messages: 1000, connection: nil, connection_provider: nil) ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/pulsar/internal/producer_impl.rb', line 9 def self.create(topic:, producer_id:, operation_timeout:, max_pending_messages: 1000, connection: nil, connection_provider: nil) connection_provider ||= -> { connection } new( connection_provider: connection_provider, topic: topic, producer_id: producer_id, operation_timeout: operation_timeout, max_pending_messages: ).tap(&:attach) end |
Instance Method Details
#close ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/pulsar/internal/producer_impl.rb', line 64 def close return nil if closed? if attached? request_id = @connection.next_request_id command = CommandFactory.close_producer(producer_id: producer_id, request_id: request_id) response = @connection.request(command, timeout: @operation_timeout) raise BrokerError, "producer close failed: #{response.type}" unless response.type == :SUCCESS end @closed = true nil end |
#closed? ⇒ Boolean
78 79 80 |
# File 'lib/pulsar/internal/producer_impl.rb', line 78 def closed? @closed end |
#send(payload, properties: {}, key: nil, event_time: nil, timeout: nil) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/pulsar/internal/producer_impl.rb', line 36 def send(payload, properties: {}, key: nil, event_time: nil, timeout: nil) raise ClosedError, 'producer is closed' if closed? send_timeout = timeout || @operation_timeout acquire_pending_send(timeout: send_timeout) begin attach unless attached? sequence_id = next_sequence_id command, = CommandFactory.( producer_id: producer_id, sequence_id: sequence_id, producer_name: producer_name, properties: properties, key: key, event_time: event_time, publish_time: current_time_millis ) response = @connection.(command, , String(payload).b, timeout: send_timeout) raise BrokerError, "send failed: #{response.type}" unless response.type == :SEND_RECEIPT (response.send_receipt.) ensure release_pending_send end end |