Class: Racecar::ConsumerSet

Inherits:
Object
  • Object
show all
Defined in:
lib/racecar/consumer_set.rb

Constant Summary collapse

MAX_POLL_TRIES =
10

Instance Method Summary collapse

Constructor Details

#initialize(config, logger, partition_processors, instrumenter = NullInstrumenter) ⇒ ConsumerSet

Returns a new instance of ConsumerSet.

Raises:

  • (ArgumentError)


9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/racecar/consumer_set.rb', line 9

def initialize(config, logger, partition_processors, instrumenter = NullInstrumenter)
  @config, @logger = config, logger
  @instrumenter = instrumenter
  @partition_processors = partition_processors
  raise ArgumentError, "Subscriptions must not be empty when subscribing" if @config.subscriptions.empty?

  @consumers = []
  @consumer_id_iterator = (0...@config.subscriptions.size).cycle
  @producer_mutex = Mutex.new

  @previous_retries = 0

  @last_poll_read_nil_message = false
end

Instance Method Details

#batch_poll(max_wait_time_ms = @config.max_wait_time_ms, max_messages = @config.fetch_messages) ⇒ Object

batch_poll collects messages until any of the following occurs:

  • max_wait_time_ms time has passed

  • max_messages have been collected

  • a nil message was polled (end of topic, Kafka stalled, etc.)

The messages are from a single topic, but potentially from more than one partition.

Any errors during polling are retried in an exponential backoff fashion. If an error occurs, but there is no time left for a backoff and retry, it will return the already collected messages and only retry on the next call.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/racecar/consumer_set.rb', line 38

def batch_poll(max_wait_time_ms = @config.max_wait_time_ms, max_messages = @config.fetch_messages)
  started_at = Time.now
  remain_ms = max_wait_time_ms
  maybe_select_next_consumer
  messages = []

  while remain_ms > 0 && messages.size < max_messages
    remain_ms = remaining_time_ms(max_wait_time_ms, started_at)
    msg = poll_with_retries(remain_ms)
    break if msg.nil?
    messages << msg
  end

  messages
end

#closeObject



71
72
73
74
75
# File 'lib/racecar/consumer_set.rb', line 71

def close
  each_subscribed(&:close)
ensure
  reset_producer!
end

#commitObject



65
66
67
68
69
# File 'lib/racecar/consumer_set.rb', line 65

def commit
  each_subscribed do |consumer|
    commit_rescue_no_offset(consumer)
  end
end

#currentObject



92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/racecar/consumer_set.rb', line 92

def current
  @consumers[@consumer_id_iterator.peek] ||= begin
    consumer_config = Rdkafka::Config.new(rdkafka_config(current_subscription))
    listener = RebalanceListener.new(@config, @instrumenter, @partition_processors)
    consumer_config.consumer_rebalance_listener = listener
    consumer = consumer_config.consumer
    listener.rdkafka_consumer = consumer

    @instrumenter.instrument('join_group') do
      consumer.subscribe current_subscription.topic
    end
    consumer
  end
end

#each_subscribedObject Also known as: each



107
108
109
110
111
112
113
# File 'lib/racecar/consumer_set.rb', line 107

def each_subscribed
  if block_given?
    @consumers.compact.each { |c| yield c }
  else
    @consumers.compact.each
  end
end

#pause(topic, partition, offset = nil) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/racecar/consumer_set.rb', line 115

def pause(topic, partition, offset = nil)
  consumer, filtered_tpl = find_consumer_by(topic, partition)
  unless consumer
    @logger.info "Attempted to pause #{topic}/#{partition}, but we're not subscribed to it"
    return
  end

  consumer.pause(filtered_tpl)
  if offset
    fake_msg = OpenStruct.new(topic: topic, partition: partition, offset: offset)
    consumer.seek(fake_msg)
  end
end

#poll(max_wait_time_ms = @config.max_wait_time_ms) ⇒ Object



24
25
26
# File 'lib/racecar/consumer_set.rb', line 24

def poll(max_wait_time_ms = @config.max_wait_time_ms)
  batch_poll(max_wait_time_ms, 1).first
end

#producerObject



77
78
79
80
81
82
83
# File 'lib/racecar/consumer_set.rb', line 77

def producer
  @producer_mutex.synchronize do
    @producer ||= Rdkafka::Config.new(producer_config).producer.tap do |p|
      p.delivery_callback = Racecar::DeliveryCallback.new(instrumenter: @instrumenter)
    end
  end
end

#reset_producer!Object



85
86
87
88
89
90
# File 'lib/racecar/consumer_set.rb', line 85

def reset_producer!
  @producer_mutex.synchronize do
    @producer&.close
    @producer = nil
  end
end

#resume(topic, partition) ⇒ Object



129
130
131
132
133
134
135
136
137
138
# File 'lib/racecar/consumer_set.rb', line 129

def resume(topic, partition)
  consumer, filtered_tpl = find_consumer_by(topic, partition)

  unless consumer
    @logger.info "Attempted to resume #{topic}/#{partition}, but we're not subscribed to it"
    return
  end

  consumer.resume(filtered_tpl)
end

#store_offset(message, raw_consumer = nil) ⇒ Object



54
55
56
57
58
59
60
61
62
63
# File 'lib/racecar/consumer_set.rb', line 54

def store_offset(message, raw_consumer = nil)
  consumer = raw_consumer || current
  consumer.store_offset(message)
rescue Rdkafka::RdkafkaError => e
  if e.code == :state # -172
    @logger.warn "Attempted to store_offset, but we're not subscribed to it: #{ErroneousStateError.new(e)}"
    return
  end
  raise e
end

#subscribe_allObject

Subscribe to all topics eagerly, even if there’s still messages elsewhere. Usually that’s not needed and Kafka might rebalance if topics are not polled frequently enough.



145
146
147
148
149
150
# File 'lib/racecar/consumer_set.rb', line 145

def subscribe_all
  @config.subscriptions.size.times do
    current
    select_next_consumer
  end
end