Class: Pulsar::Internal::ProducerImpl

Inherits:
Object
  • Object
show all
Defined in:
lib/pulsar/internal/producer_impl.rb

Overview

Implements broker-side producer creation and unbatched sends.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_provider:, topic:, producer_id:, operation_timeout:, max_pending_messages:) ⇒ ProducerImpl

Returns a new instance of ProducerImpl.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/pulsar/internal/producer_impl.rb', line 21

def initialize(connection_provider:, topic:, producer_id:, operation_timeout:, max_pending_messages:)
  @connection_provider = connection_provider
  @connection = nil
  @topic = topic
  @producer_id = producer_id
  @producer_name = nil
  @operation_timeout = operation_timeout
  @max_pending_messages = max_pending_messages
  @pending_sends = 0
  @pending_condition = ConditionVariable.new
  @sequence_id = -1
  @mutex = Mutex.new
  @closed = false
end

Instance Attribute Details

#producer_idObject (readonly)

Returns the value of attribute producer_id.



7
8
9
# File 'lib/pulsar/internal/producer_impl.rb', line 7

def producer_id
  @producer_id
end

#producer_nameObject (readonly)

Returns the value of attribute producer_name.



7
8
9
# File 'lib/pulsar/internal/producer_impl.rb', line 7

def producer_name
  @producer_name
end

#topicObject (readonly)

Returns the value of attribute topic.



7
8
9
# File 'lib/pulsar/internal/producer_impl.rb', line 7

def topic
  @topic
end

Class Method Details

.create(topic:, producer_id:, operation_timeout:, max_pending_messages: 1000, connection: nil, connection_provider: nil) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
# File 'lib/pulsar/internal/producer_impl.rb', line 9

def self.create(topic:, producer_id:, operation_timeout:, max_pending_messages: 1000,
                connection: nil, connection_provider: nil)
  connection_provider ||= -> { connection }
  new(
    connection_provider: connection_provider,
    topic: topic,
    producer_id: producer_id,
    operation_timeout: operation_timeout,
    max_pending_messages: max_pending_messages
  ).tap(&:attach)
end

Instance Method Details

#closeObject



64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/pulsar/internal/producer_impl.rb', line 64

def close
  return nil if closed?

  if attached?
    request_id = @connection.next_request_id
    command = CommandFactory.close_producer(producer_id: producer_id, request_id: request_id)
    response = @connection.request(command, timeout: @operation_timeout)
    raise BrokerError, "producer close failed: #{response.type}" unless response.type == :SUCCESS
  end

  @closed = true
  nil
end

#closed?Boolean

Returns:

  • (Boolean)


78
79
80
# File 'lib/pulsar/internal/producer_impl.rb', line 78

def closed?
  @closed
end

#send(payload, properties: {}, key: nil, event_time: nil, timeout: nil) ⇒ Object

Raises:



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/pulsar/internal/producer_impl.rb', line 36

def send(payload, properties: {}, key: nil, event_time: nil, timeout: nil)
  raise ClosedError, 'producer is closed' if closed?

  send_timeout = timeout || @operation_timeout
  acquire_pending_send(timeout: send_timeout)

  begin
    attach unless attached?
    sequence_id = next_sequence_id
    command,  = CommandFactory.send_message(
      producer_id: producer_id,
      sequence_id: sequence_id,
      producer_name: producer_name,
      properties: properties,
      key: key,
      event_time: event_time,
      publish_time: current_time_millis
    )
    response = @connection.send_message(command, , String(payload).b, timeout: send_timeout)

    raise BrokerError, "send failed: #{response.type}" unless response.type == :SEND_RECEIPT

    message_id_from(response.send_receipt.message_id)
  ensure
    release_pending_send
  end
end