Class: Pulsar::Internal::CommandFactory

Inherits:
Object
  • Object
show all
Defined in:
lib/pulsar/internal/command_factory.rb

Overview

Builds protobuf commands for the Pulsar binary protocol.

Class Method Summary collapse

Class Method Details

.ack(consumer_id:, message_id:) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/pulsar/internal/command_factory.rb', line 62

def self.ack(consumer_id:, message_id:)
  Proto::BaseCommand.new(
    type: :ACK,
    ack: Proto::CommandAck.new(
      consumer_id: consumer_id,
      ack_type: :Individual,
      message_id: [
        Proto::MessageIdData.new(
          ledgerId: message_id.ledger_id,
          entryId: message_id.entry_id,
          partition: message_id.partition_index,
          batch_index: message_id.batch_index
        )
      ]
    )
  )
end

.close_consumer(consumer_id:, request_id:) ⇒ Object



100
101
102
103
104
105
106
107
108
# File 'lib/pulsar/internal/command_factory.rb', line 100

def self.close_consumer(consumer_id:, request_id:)
  Proto::BaseCommand.new(
    type: :CLOSE_CONSUMER,
    close_consumer: Proto::CommandCloseConsumer.new(
      consumer_id: consumer_id,
      request_id: request_id
    )
  )
end

.close_producer(producer_id:, request_id:) ⇒ Object



90
91
92
93
94
95
96
97
98
# File 'lib/pulsar/internal/command_factory.rb', line 90

def self.close_producer(producer_id:, request_id:)
  Proto::BaseCommand.new(
    type: :CLOSE_PRODUCER,
    close_producer: Proto::CommandCloseProducer.new(
      producer_id: producer_id,
      request_id: request_id
    )
  )
end

.flow(consumer_id:, permits:) ⇒ Object



52
53
54
55
56
57
58
59
60
# File 'lib/pulsar/internal/command_factory.rb', line 52

def self.flow(consumer_id:, permits:)
  Proto::BaseCommand.new(
    type: :FLOW,
    flow: Proto::CommandFlow.new(
      consumer_id: consumer_id,
      messagePermits: permits
    )
  )
end

.lookup(topic:, request_id:) ⇒ Object



80
81
82
83
84
85
86
87
88
# File 'lib/pulsar/internal/command_factory.rb', line 80

def self.lookup(topic:, request_id:)
  Proto::BaseCommand.new(
    type: :LOOKUP,
    lookupTopic: Proto::CommandLookupTopic.new(
      topic: topic,
      request_id: request_id
    )
  )
end

.pongObject



110
111
112
113
114
115
# File 'lib/pulsar/internal/command_factory.rb', line 110

def self.pong
  Proto::BaseCommand.new(
    type: :PONG,
    pong: Proto::CommandPong.new
  )
end

.producer(topic:, producer_id:, request_id:) ⇒ Object



7
8
9
10
11
12
13
14
15
16
# File 'lib/pulsar/internal/command_factory.rb', line 7

def self.producer(topic:, producer_id:, request_id:)
  Proto::BaseCommand.new(
    type: :PRODUCER,
    producer: Proto::CommandProducer.new(
      topic: topic,
      producer_id: producer_id,
      request_id: request_id
    )
  )
end

.send_message(producer_id:, sequence_id:, producer_name:, publish_time:, properties: {}, key: nil, event_time: nil) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/pulsar/internal/command_factory.rb', line 18

def self.send_message(producer_id:, sequence_id:, producer_name:, publish_time:, properties: {}, key: nil,
                      event_time: nil)
  command = Proto::BaseCommand.new(
    type: :SEND,
    send: Proto::CommandSend.new(
      producer_id: producer_id,
      sequence_id: sequence_id
    )
  )
   = Proto::MessageMetadata.new(
    producer_name: producer_name,
    sequence_id: sequence_id,
    publish_time: publish_time,
    properties: properties.map { |key_value, value| Proto::KeyValue.new(key: key_value.to_s, value: value.to_s) }
  )
  .partition_key = key if key
  .event_time = event_time if event_time

  [command, ]
end

.subscribe(topic:, subscription:, consumer_id:, request_id:, subscription_type: :Exclusive) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/pulsar/internal/command_factory.rb', line 39

def self.subscribe(topic:, subscription:, consumer_id:, request_id:, subscription_type: :Exclusive)
  Proto::BaseCommand.new(
    type: :SUBSCRIBE,
    subscribe: Proto::CommandSubscribe.new(
      topic: topic,
      subscription: subscription,
      subType: subscription_type,
      consumer_id: consumer_id,
      request_id: request_id
    )
  )
end