Module: DeliveryBoy

Defined in:
lib/delivery_boy.rb,
lib/delivery_boy/fake.rb,
lib/delivery_boy/config.rb,
lib/delivery_boy/datadog.rb,
lib/delivery_boy/railtie.rb,
lib/delivery_boy/version.rb,
lib/delivery_boy/instance.rb,
lib/delivery_boy/config_error.rb,
lib/delivery_boy/instrumenter.rb,
lib/generators/delivery_boy/install_generator.rb

Defined Under Namespace

Modules: Datadog, Generators Classes: Config, Fake, Instance, Instrumenter, NullInstrumenter, Railtie

Constant Summary collapse

VERSION =
"2.0.0.alpha.2"
ConfigError =
Class.new(StandardError)

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.instrumenterDeliveryBoy::Instrumenter, DeliveryBoy::NullInstrumenter

The instrumenter used by DeliveryBoy for emitting metrics.



121
122
123
# File 'lib/delivery_boy.rb', line 121

def instrumenter
  @instrumenter ||= NullInstrumenter.new
end

.loggerLogger

The logger used by DeliveryBoy.

Returns:

  • (Logger)


108
109
110
111
112
113
114
# File 'lib/delivery_boy.rb', line 108

def logger
  @logger ||= Logger.new($stdout).tap do |logger|
    if config.log_level
      logger.level = Object.const_get("Logger::#{config.log_level.upcase}")
    end
  end
end

Class Method Details

.buffer_sizeObject

Return the number of messages in the buffer



91
92
93
# File 'lib/delivery_boy.rb', line 91

def buffer_size
  instance.buffer_size
end

.clear_bufferObject

Clear any buffered messages generated by produce or produce! methods.



86
87
88
# File 'lib/delivery_boy.rb', line 86

def clear_buffer
  instance.clear_buffer
end

.clear_config!Object



136
137
138
# File 'lib/delivery_boy.rb', line 136

def clear_config!
  @config = nil
end

.configDeliveryBoy::Config

The configuration used by DeliveryBoy.

Returns:



130
131
132
133
134
# File 'lib/delivery_boy.rb', line 130

def config
  @config ||= DeliveryBoy::Config.new(env: ENV)
rescue KingKonf::ConfigError => e
  raise ConfigError, e.message
end

.configure {|DeliveryBoy::Config| ... } ⇒ nil

Configure DeliveryBoy in a block.

DeliveryBoy.configure do |config|
  config.client_id = "yolo"
end

Yields:

Returns:

  • (nil)


148
149
150
# File 'lib/delivery_boy.rb', line 148

def configure
  yield config
end

.deliver(value, topic:, **options) ⇒ nil

Write a message to a specified Kafka topic synchronously.

Keep in mind that the client will block until the message has been delivered.

Parameters:

  • value (String)

    the message value.

  • topic (String)

    the topic that the message should be written to.

  • key (String, nil)

    the message key.

  • partition (Integer, nil)

    the topic partition that the message should be written to.

  • partition_key (String, nil)

    a key used to deterministically assign a partition to the message.

Returns:

  • (nil)

Raises:

  • (Rdkafka::RdkafkaError)

    if delivery failed for some reason.



29
30
31
# File 'lib/delivery_boy.rb', line 29

def deliver(value, topic:, **options)
  instance.deliver(value, topic: topic, **options)
end

.deliver_async(value, topic:, **options) ⇒ nil

Like deliver_async!, but handles Rdkafka::RdkafkaError errors by logging them and just going on with normal business.

Returns:

  • (nil)


37
38
39
40
41
# File 'lib/delivery_boy.rb', line 37

def deliver_async(value, topic:, **options)
  deliver_async!(value, topic: topic, **options)
rescue Rdkafka::RdkafkaError => e
  logger.error "Message for `#{topic}` dropped due to error: #{e.message}"
end

.deliver_async!(value, topic:, **options) ⇒ nil

Like deliver, but returns immediately.

The actual delivery takes place in a background thread.

Returns:

  • (nil)


48
49
50
# File 'lib/delivery_boy.rb', line 48

def deliver_async!(value, topic:, **options)
  instance.deliver_async!(value, topic: topic, **options)
end

.deliver_messagesnil

Delivers the items currently in the producer buffer.

Returns:

  • (nil)

Raises:

  • (Rdkafka::RdkafkaError)

    if delivery failed for some reason.



81
82
83
# File 'lib/delivery_boy.rb', line 81

def deliver_messages
  instance.deliver_messages
end

.produce(value, topic:, **options) ⇒ nil

Like produce!, but handles Rdkafka::RdkafkaError errors by logging them and just going on with normal business.

Returns:

  • (nil)


56
57
58
59
60
# File 'lib/delivery_boy.rb', line 56

def produce(value, topic:, **options)
  produce!(value, topic: topic, **options)
rescue Rdkafka::RdkafkaError => e
  logger.error "Message for `#{topic}` dropped due to error: #{e.message}"
end

.produce!(value, topic:, **options) ⇒ nil

Appends the given message to the producer buffer but does not send it until deliver_messages is called.

Parameters:

  • value (String)

    the message value.

  • topic (String)

    the topic that the message should be written to.

  • key (String, nil)

    the message key.

  • partition (Integer, nil)

    the topic partition that the message should be written to.

  • partition_key (String, nil)

    a key used to deterministically assign a partition to the message.

Returns:

  • (nil)

Raises:

  • (Rdkafka::RdkafkaError)

    if the producer’s buffer is full.



73
74
75
# File 'lib/delivery_boy.rb', line 73

def produce!(value, topic:, **options)
  instance.produce(value, topic: topic, **options)
end

.shutdownnil

Shut down DeliveryBoy.

Automatically called when the process exits.

Returns:

  • (nil)


100
101
102
103
# File 'lib/delivery_boy.rb', line 100

def shutdown
  instance.shutdown
  @instance = nil
end

.test_mode!Object



152
153
154
# File 'lib/delivery_boy.rb', line 152

def test_mode!
  @instance = testing
end

.testingObject



156
157
158
# File 'lib/delivery_boy.rb', line 156

def testing
  @testing ||= Fake.new
end