Class: Pulsar::Internal::CommandFactory
- Inherits:
-
Object
- Object
- Pulsar::Internal::CommandFactory
- Defined in:
- lib/pulsar/internal/command_factory.rb
Overview
Builds protobuf commands for the Pulsar binary protocol.
Class Method Summary collapse
- .ack(consumer_id:, message_id:) ⇒ Object
- .close_consumer(consumer_id:, request_id:) ⇒ Object
- .close_producer(producer_id:, request_id:) ⇒ Object
- .flow(consumer_id:, permits:) ⇒ Object
- .lookup(topic:, request_id:) ⇒ Object
- .pong ⇒ Object
- .producer(topic:, producer_id:, request_id:) ⇒ Object
- .send_message(producer_id:, sequence_id:, producer_name:, publish_time:, properties: {}, key: nil, event_time: nil) ⇒ Object
- .subscribe(topic:, subscription:, consumer_id:, request_id:, subscription_type: :Exclusive) ⇒ Object
Class Method Details
.ack(consumer_id:, message_id:) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/pulsar/internal/command_factory.rb', line 62 def self.ack(consumer_id:, message_id:) Proto::BaseCommand.new( type: :ACK, ack: Proto::CommandAck.new( consumer_id: consumer_id, ack_type: :Individual, message_id: [ Proto::MessageIdData.new( ledgerId: .ledger_id, entryId: .entry_id, partition: .partition_index, batch_index: .batch_index ) ] ) ) end |
.close_consumer(consumer_id:, request_id:) ⇒ Object
100 101 102 103 104 105 106 107 108 |
# File 'lib/pulsar/internal/command_factory.rb', line 100 def self.close_consumer(consumer_id:, request_id:) Proto::BaseCommand.new( type: :CLOSE_CONSUMER, close_consumer: Proto::CommandCloseConsumer.new( consumer_id: consumer_id, request_id: request_id ) ) end |
.close_producer(producer_id:, request_id:) ⇒ Object
90 91 92 93 94 95 96 97 98 |
# File 'lib/pulsar/internal/command_factory.rb', line 90 def self.close_producer(producer_id:, request_id:) Proto::BaseCommand.new( type: :CLOSE_PRODUCER, close_producer: Proto::CommandCloseProducer.new( producer_id: producer_id, request_id: request_id ) ) end |
.flow(consumer_id:, permits:) ⇒ Object
52 53 54 55 56 57 58 59 60 |
# File 'lib/pulsar/internal/command_factory.rb', line 52 def self.flow(consumer_id:, permits:) Proto::BaseCommand.new( type: :FLOW, flow: Proto::CommandFlow.new( consumer_id: consumer_id, messagePermits: permits ) ) end |
.lookup(topic:, request_id:) ⇒ Object
80 81 82 83 84 85 86 87 88 |
# File 'lib/pulsar/internal/command_factory.rb', line 80 def self.lookup(topic:, request_id:) Proto::BaseCommand.new( type: :LOOKUP, lookupTopic: Proto::CommandLookupTopic.new( topic: topic, request_id: request_id ) ) end |
.pong ⇒ Object
110 111 112 113 114 115 |
# File 'lib/pulsar/internal/command_factory.rb', line 110 def self.pong Proto::BaseCommand.new( type: :PONG, pong: Proto::CommandPong.new ) end |
.producer(topic:, producer_id:, request_id:) ⇒ Object
7 8 9 10 11 12 13 14 15 16 |
# File 'lib/pulsar/internal/command_factory.rb', line 7 def self.producer(topic:, producer_id:, request_id:) Proto::BaseCommand.new( type: :PRODUCER, producer: Proto::CommandProducer.new( topic: topic, producer_id: producer_id, request_id: request_id ) ) end |
.send_message(producer_id:, sequence_id:, producer_name:, publish_time:, properties: {}, key: nil, event_time: nil) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/pulsar/internal/command_factory.rb', line 18 def self.(producer_id:, sequence_id:, producer_name:, publish_time:, properties: {}, key: nil, event_time: nil) command = Proto::BaseCommand.new( type: :SEND, send: Proto::CommandSend.new( producer_id: producer_id, sequence_id: sequence_id ) ) = Proto::MessageMetadata.new( producer_name: producer_name, sequence_id: sequence_id, publish_time: publish_time, properties: properties.map { |key_value, value| Proto::KeyValue.new(key: key_value.to_s, value: value.to_s) } ) .partition_key = key if key .event_time = event_time if event_time [command, ] end |
.subscribe(topic:, subscription:, consumer_id:, request_id:, subscription_type: :Exclusive) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/pulsar/internal/command_factory.rb', line 39 def self.subscribe(topic:, subscription:, consumer_id:, request_id:, subscription_type: :Exclusive) Proto::BaseCommand.new( type: :SUBSCRIBE, subscribe: Proto::CommandSubscribe.new( topic: topic, subscription: subscription, subType: subscription_type, consumer_id: consumer_id, request_id: request_id ) ) end |