Class: DeliveryBoy::Instance

Inherits:
Object
  • Object
show all
Defined in:
lib/delivery_boy/instance.rb

Overview

This class implements the actual logic of DeliveryBoy. The DeliveryBoy module has a module-level singleton instance.

Instance Method Summary collapse

Constructor Details

#initialize(config, logger, instrumenter: NullInstrumenter.new) ⇒ Instance

Returns a new instance of Instance.



7
8
9
10
11
# File 'lib/delivery_boy/instance.rb', line 7

def initialize(config, logger, instrumenter: NullInstrumenter.new)
  @config = config
  @logger = logger
  @instrumenter = instrumenter
end

Instance Method Details

#buffer_sizeObject



102
103
104
# File 'lib/delivery_boy/instance.rb', line 102

def buffer_size
  handles.size
end

#clear_bufferObject



98
99
100
# File 'lib/delivery_boy/instance.rb', line 98

def clear_buffer
  handles.clear
end

#deliver(value, topic:, **options) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/delivery_boy/instance.rb', line 13

def deliver(value, topic:, **options)
  options_clone = options.clone
  if options[:create_time]
    options_clone[:timestamp] = Time.at(options[:create_time])
    options_clone.delete(:create_time)
  end

  message_size = value.to_s.bytesize

  instrumentation_payload = {
    client_id: config.client_id,
    topic: topic,
    message_size: message_size
  }

  @instrumenter.instrument("deliver", instrumentation_payload) do
    sync_producer
      .produce(payload: value, topic: topic, **options_clone)
      .wait
  end
end

#deliver_async!(value, topic:, **options) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/delivery_boy/instance.rb', line 35

def deliver_async!(value, topic:, **options)
  options_clone = options.clone
  if options[:create_time]
    options_clone[:timestamp] = Time.at(options[:create_time])
    options_clone.delete(:create_time)
  end

  message_size = value.to_s.bytesize

  instrumentation_payload = {
    client_id: config.client_id,
    topic: topic,
    message_size: message_size,
    queue_size: async_producer_queue_size
  }

  @instrumenter.instrument("deliver_async", instrumentation_payload) do
    async_producer
      .produce(payload: value, topic: topic, **options_clone)
  end
end

#deliver_messagesObject



84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/delivery_boy/instance.rb', line 84

def deliver_messages
  message_count = handles.size

  instrumentation_payload = {
    client_id: config.client_id,
    delivered_message_count: message_count
  }

  @instrumenter.instrument("deliver_messages", instrumentation_payload) do
    sync_producer.flush
    handles.clear
  end
end

#produce(value, topic:, **options) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/delivery_boy/instance.rb', line 62

def produce(value, topic:, **options)
  options_clone = options.clone
  if options[:create_time]
    options_clone[:timestamp] = Time.at(options[:create_time])
    options_clone.delete(:create_time)
  end

  message_size = value.to_s.bytesize

  instrumentation_payload = {
    client_id: config.client_id,
    topic: topic,
    message_size: message_size,
    buffer_size: handles.size
  }

  @instrumenter.instrument("produce_message", instrumentation_payload) do
    handle = sync_producer.produce(payload: value, topic: topic, **options_clone)
    handles.push(handle)
  end
end

#shutdownObject



57
58
59
60
# File 'lib/delivery_boy/instance.rb', line 57

def shutdown
  sync_producer.close if sync_producer?
  async_producer.close if async_producer?
end