Class: Racecar::ConsumerSet

Inherits:
Object
  • Object
show all
Defined in:
lib/racecar/consumer_set.rb

Constant Summary collapse

MAX_POLL_TRIES =
10

Instance Method Summary collapse

Constructor Details

#initialize(config, logger, partition_processors, instrumenter = NullInstrumenter) ⇒ ConsumerSet

Returns a new instance of ConsumerSet.

Raises:

  • (ArgumentError)


9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/racecar/consumer_set.rb', line 9

def initialize(config, logger, partition_processors, instrumenter = NullInstrumenter)
  @config, @logger = config, logger
  @instrumenter = instrumenter
  @partition_processors = partition_processors
  raise ArgumentError, "Subscriptions must not be empty when subscribing" if @config.subscriptions.empty?

  @consumers = []
  @consumer_id_iterator = (0...@config.subscriptions.size).cycle
  @producer_mutex = Mutex.new

  @previous_retries = 0

  @last_poll_read_nil_message = false
end

Instance Method Details

#batch_poll(max_wait_time_ms = @config.max_wait_time_ms, max_messages = @config.fetch_messages) ⇒ Object

batch_poll collects messages until any of the following occurs:

  • max_wait_time_ms time has passed

  • max_messages have been collected

  • a nil message was polled (end of topic, Kafka stalled, etc.)

The messages are from a single topic, but potentially from more than one partition.

Any errors during polling are retried in an exponential backoff fashion. If an error occurs, but there is no time left for a backoff and retry, it will return the already collected messages and only retry on the next call.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/racecar/consumer_set.rb', line 38

def batch_poll(max_wait_time_ms = @config.max_wait_time_ms, max_messages = @config.fetch_messages)
  started_at = Time.now
  remain_ms = max_wait_time_ms
  maybe_select_next_consumer
  messages = []

  while remain_ms > 0 && messages.size < max_messages
    remain_ms = remaining_time_ms(max_wait_time_ms, started_at)
    msg = poll_with_retries(remain_ms)
    break if msg.nil?
    messages << msg
  end

  messages
end

#closeObject



71
72
73
74
# File 'lib/racecar/consumer_set.rb', line 71

def close
  each_subscribed(&:close)
  reset_producer!
end

#commitObject



65
66
67
68
69
# File 'lib/racecar/consumer_set.rb', line 65

def commit
  each_subscribed do |consumer|
    commit_rescue_no_offset(consumer)
  end
end

#currentObject



91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/racecar/consumer_set.rb', line 91

def current
  @consumers[@consumer_id_iterator.peek] ||= begin
    consumer_config = Rdkafka::Config.new(rdkafka_config(current_subscription))
    listener = RebalanceListener.new(@config, @instrumenter, @partition_processors)
    consumer_config.consumer_rebalance_listener = listener
    consumer = consumer_config.consumer
    listener.rdkafka_consumer = consumer

    @instrumenter.instrument('join_group') do
      consumer.subscribe current_subscription.topic
    end
    consumer
  end
end

#each_subscribedObject Also known as: each



106
107
108
109
110
111
112
# File 'lib/racecar/consumer_set.rb', line 106

def each_subscribed
  if block_given?
    @consumers.compact.each { |c| yield c }
  else
    @consumers.compact.each
  end
end

#pause(topic, partition, offset = nil) ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/racecar/consumer_set.rb', line 114

def pause(topic, partition, offset = nil)
  consumer, filtered_tpl = find_consumer_by(topic, partition)
  unless consumer
    @logger.info "Attempted to pause #{topic}/#{partition}, but we're not subscribed to it"
    return
  end

  consumer.pause(filtered_tpl)
  if offset
    fake_msg = OpenStruct.new(topic: topic, partition: partition, offset: offset)
    consumer.seek(fake_msg)
  end
end

#poll(max_wait_time_ms = @config.max_wait_time_ms) ⇒ Object



24
25
26
# File 'lib/racecar/consumer_set.rb', line 24

def poll(max_wait_time_ms = @config.max_wait_time_ms)
  batch_poll(max_wait_time_ms, 1).first
end

#producerObject



76
77
78
79
80
81
82
# File 'lib/racecar/consumer_set.rb', line 76

def producer
  @producer_mutex.synchronize do
    @producer ||= Rdkafka::Config.new(producer_config).producer.tap do |p|
      p.delivery_callback = Racecar::DeliveryCallback.new(instrumenter: @instrumenter)
    end
  end
end

#reset_producer!Object



84
85
86
87
88
89
# File 'lib/racecar/consumer_set.rb', line 84

def reset_producer!
  @producer_mutex.synchronize do
    @producer&.close
    @producer = nil
  end
end

#resume(topic, partition) ⇒ Object



128
129
130
131
132
133
134
135
136
137
# File 'lib/racecar/consumer_set.rb', line 128

def resume(topic, partition)
  consumer, filtered_tpl = find_consumer_by(topic, partition)

  unless consumer
    @logger.info "Attempted to resume #{topic}/#{partition}, but we're not subscribed to it"
    return
  end

  consumer.resume(filtered_tpl)
end

#store_offset(message, raw_consumer = nil) ⇒ Object



54
55
56
57
58
59
60
61
62
63
# File 'lib/racecar/consumer_set.rb', line 54

def store_offset(message, raw_consumer = nil)
  consumer = raw_consumer || current
  consumer.store_offset(message)
rescue Rdkafka::RdkafkaError => e
  if e.code == :state # -172
    @logger.warn "Attempted to store_offset, but we're not subscribed to it: #{ErroneousStateError.new(e)}"
    return
  end
  raise e
end

#subscribe_allObject

Subscribe to all topics eagerly, even if there’s still messages elsewhere. Usually that’s not needed and Kafka might rebalance if topics are not polled frequently enough.



144
145
146
147
148
149
# File 'lib/racecar/consumer_set.rb', line 144

def subscribe_all
  @config.subscriptions.size.times do
    current
    select_next_consumer
  end
end