Class: Pulsar::Internal::ConsumerImpl
- Inherits:
-
Object
- Object
- Pulsar::Internal::ConsumerImpl
- Defined in:
- lib/pulsar/internal/consumer_impl.rb
Overview
Implements broker-side subscription, flow, receive, and ack behavior.
Instance Attribute Summary collapse
-
#consumer_id ⇒ Object
readonly
Returns the value of attribute consumer_id.
-
#subscription ⇒ Object
readonly
Returns the value of attribute subscription.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Class Method Summary collapse
Instance Method Summary collapse
- #ack(message_or_message_id) ⇒ Object
- #close ⇒ Object
- #closed? ⇒ Boolean
- #flow(permits) ⇒ Object
- #handle_message(command_message, headers_and_payload) ⇒ Object
-
#initialize(connection_provider:, topic:, subscription:, consumer_id:, operation_timeout:, receiver_queue_size:) ⇒ ConsumerImpl
constructor
A new instance of ConsumerImpl.
- #receive(timeout: nil) ⇒ Object
Constructor Details
#initialize(connection_provider:, topic:, subscription:, consumer_id:, operation_timeout:, receiver_queue_size:) ⇒ ConsumerImpl
Returns a new instance of ConsumerImpl.
22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/pulsar/internal/consumer_impl.rb', line 22 def initialize(connection_provider:, topic:, subscription:, consumer_id:, operation_timeout:, receiver_queue_size:) @connection_provider = connection_provider @connection = nil @topic = topic @subscription = subscription @consumer_id = consumer_id @operation_timeout = operation_timeout @receiver_queue_size = receiver_queue_size @receiver_queue = BoundedQueue.new(capacity: receiver_queue_size) @closed = false end |
Instance Attribute Details
#consumer_id ⇒ Object (readonly)
Returns the value of attribute consumer_id.
7 8 9 |
# File 'lib/pulsar/internal/consumer_impl.rb', line 7 def consumer_id @consumer_id end |
#subscription ⇒ Object (readonly)
Returns the value of attribute subscription.
7 8 9 |
# File 'lib/pulsar/internal/consumer_impl.rb', line 7 def subscription @subscription end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
7 8 9 |
# File 'lib/pulsar/internal/consumer_impl.rb', line 7 def topic @topic end |
Class Method Details
.create(topic:, subscription:, consumer_id:, operation_timeout:, receiver_queue_size:, connection: nil, connection_provider: nil) ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/pulsar/internal/consumer_impl.rb', line 9 def self.create(topic:, subscription:, consumer_id:, operation_timeout:, receiver_queue_size:, connection: nil, connection_provider: nil) connection_provider ||= -> { connection } new( connection_provider: connection_provider, topic: topic, subscription: subscription, consumer_id: consumer_id, operation_timeout: operation_timeout, receiver_queue_size: receiver_queue_size ).tap(&:attach) end |
Instance Method Details
#ack(message_or_message_id) ⇒ Object
60 61 62 63 64 65 66 67 |
# File 'lib/pulsar/internal/consumer_impl.rb', line 60 def ack() raise ClosedError, 'consumer is closed' if closed? attach unless attached? = .respond_to?(:message_id) ? . : @connection.write_command(CommandFactory.ack(consumer_id: consumer_id, message_id: )) nil end |
#close ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/pulsar/internal/consumer_impl.rb', line 69 def close return nil if closed? if attached? request_id = @connection.next_request_id command = CommandFactory.close_consumer(consumer_id: consumer_id, request_id: request_id) response = @connection.request(command, timeout: @operation_timeout) raise BrokerError, "consumer close failed: #{response.type}" unless response.type == :SUCCESS @connection.unregister_consumer(consumer_id) end @receiver_queue.close @closed = true nil end |
#closed? ⇒ Boolean
86 87 88 |
# File 'lib/pulsar/internal/consumer_impl.rb', line 86 def closed? @closed end |
#flow(permits) ⇒ Object
90 91 92 93 94 95 |
# File 'lib/pulsar/internal/consumer_impl.rb', line 90 def flow(permits) raise ClosedError, 'consumer is closed' if closed? attach unless attached? @connection.write_command(CommandFactory.flow(consumer_id: consumer_id, permits: permits)) end |
#handle_message(command_message, headers_and_payload) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/pulsar/internal/consumer_impl.rb', line 34 def (, headers_and_payload) raise ClosedError, 'consumer is closed' if closed? decoded = FrameCodec.(headers_and_payload) @receiver_queue.push( Message.new( payload: decoded.payload, message_id: (.), properties: decoded..properties.to_h { |property| [property.key, property.value] }, key: decoded..partition_key, publish_time: decoded..publish_time, event_time: decoded..event_time ), timeout: @operation_timeout ) end |
#receive(timeout: nil) ⇒ Object
51 52 53 54 55 56 57 58 |
# File 'lib/pulsar/internal/consumer_impl.rb', line 51 def receive(timeout: nil) raise ClosedError, 'consumer is closed' if closed? attach unless attached? @receiver_queue.pop(timeout: timeout || @operation_timeout).tap do flow(1) end end |