Class: Pulsar::Internal::ConsumerImpl

Inherits:
Object
  • Object
show all
Defined in:
lib/pulsar/internal/consumer_impl.rb

Overview

Implements broker-side subscription, flow, receive, and ack behavior.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_provider:, topic:, subscription:, consumer_id:, operation_timeout:, receiver_queue_size:) ⇒ ConsumerImpl

Returns a new instance of ConsumerImpl.



22
23
24
25
26
27
28
29
30
31
32
# File 'lib/pulsar/internal/consumer_impl.rb', line 22

def initialize(connection_provider:, topic:, subscription:, consumer_id:, operation_timeout:, receiver_queue_size:)
  @connection_provider = connection_provider
  @connection = nil
  @topic = topic
  @subscription = subscription
  @consumer_id = consumer_id
  @operation_timeout = operation_timeout
  @receiver_queue_size = receiver_queue_size
  @receiver_queue = BoundedQueue.new(capacity: receiver_queue_size)
  @closed = false
end

Instance Attribute Details

#consumer_idObject (readonly)

Returns the value of attribute consumer_id.



7
8
9
# File 'lib/pulsar/internal/consumer_impl.rb', line 7

def consumer_id
  @consumer_id
end

#subscriptionObject (readonly)

Returns the value of attribute subscription.



7
8
9
# File 'lib/pulsar/internal/consumer_impl.rb', line 7

def subscription
  @subscription
end

#topicObject (readonly)

Returns the value of attribute topic.



7
8
9
# File 'lib/pulsar/internal/consumer_impl.rb', line 7

def topic
  @topic
end

Class Method Details

.create(topic:, subscription:, consumer_id:, operation_timeout:, receiver_queue_size:, connection: nil, connection_provider: nil) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/pulsar/internal/consumer_impl.rb', line 9

def self.create(topic:, subscription:, consumer_id:, operation_timeout:, receiver_queue_size:,
                connection: nil, connection_provider: nil)
  connection_provider ||= -> { connection }
  new(
    connection_provider: connection_provider,
    topic: topic,
    subscription: subscription,
    consumer_id: consumer_id,
    operation_timeout: operation_timeout,
    receiver_queue_size: receiver_queue_size
  ).tap(&:attach)
end

Instance Method Details

#ack(message_or_message_id) ⇒ Object

Raises:



60
61
62
63
64
65
66
67
# File 'lib/pulsar/internal/consumer_impl.rb', line 60

def ack(message_or_message_id)
  raise ClosedError, 'consumer is closed' if closed?

  attach unless attached?
  message_id = message_or_message_id.respond_to?(:message_id) ? message_or_message_id.message_id : message_or_message_id
  @connection.write_command(CommandFactory.ack(consumer_id: consumer_id, message_id: message_id))
  nil
end

#closeObject



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/pulsar/internal/consumer_impl.rb', line 69

def close
  return nil if closed?

  if attached?
    request_id = @connection.next_request_id
    command = CommandFactory.close_consumer(consumer_id: consumer_id, request_id: request_id)
    response = @connection.request(command, timeout: @operation_timeout)
    raise BrokerError, "consumer close failed: #{response.type}" unless response.type == :SUCCESS

    @connection.unregister_consumer(consumer_id)
  end

  @receiver_queue.close
  @closed = true
  nil
end

#closed?Boolean

Returns:

  • (Boolean)


86
87
88
# File 'lib/pulsar/internal/consumer_impl.rb', line 86

def closed?
  @closed
end

#flow(permits) ⇒ Object

Raises:



90
91
92
93
94
95
# File 'lib/pulsar/internal/consumer_impl.rb', line 90

def flow(permits)
  raise ClosedError, 'consumer is closed' if closed?

  attach unless attached?
  @connection.write_command(CommandFactory.flow(consumer_id: consumer_id, permits: permits))
end

#handle_message(command_message, headers_and_payload) ⇒ Object

Raises:



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/pulsar/internal/consumer_impl.rb', line 34

def handle_message(command_message, headers_and_payload)
  raise ClosedError, 'consumer is closed' if closed?

  decoded = FrameCodec.decode_message_data(headers_and_payload)
  @receiver_queue.push(
    Message.new(
      payload: decoded.payload,
      message_id: message_id_from(command_message.message_id),
      properties: decoded..properties.to_h { |property| [property.key, property.value] },
      key: decoded..partition_key,
      publish_time: decoded..publish_time,
      event_time: decoded..event_time
    ),
    timeout: @operation_timeout
  )
end

#receive(timeout: nil) ⇒ Object

Raises:



51
52
53
54
55
56
57
58
# File 'lib/pulsar/internal/consumer_impl.rb', line 51

def receive(timeout: nil)
  raise ClosedError, 'consumer is closed' if closed?

  attach unless attached?
  @receiver_queue.pop(timeout: timeout || @operation_timeout).tap do
    flow(1)
  end
end