Class: Racecar::PartitionProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/racecar/partition_processor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config:, logger:, instrumenter:, consumer_class_instance:, consumer:, topic:, partition:, pause:, rdkafka_consumer: nil) ⇒ PartitionProcessor

Returns a new instance of PartitionProcessor.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/racecar/partition_processor.rb', line 12

def initialize(config:, logger:, instrumenter:, consumer_class_instance:, consumer:, topic:, partition:, pause:, rdkafka_consumer: nil)
  @config = config
  @logger = logger
  @instrumenter = instrumenter
  @consumer_class_instance = consumer_class_instance
  @pause = pause
  @topic = topic
  @partition = partition
  @consumer = consumer
  @rdkafka_consumer = rdkafka_consumer

  if config.multithreaded_processing_enabled
    consumer_class_instance.configure(
      producer:     consumer.producer,
      consumer:     @consumer,
      instrumenter: @instrumenter,
      config:       @config,
    )
  end

  @sleep_mutex = Mutex.new
  @sleep_cv = ConditionVariable.new
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



9
10
11
# File 'lib/racecar/partition_processor.rb', line 9

def config
  @config
end

#consumerObject (readonly)

Returns the value of attribute consumer.



9
10
11
# File 'lib/racecar/partition_processor.rb', line 9

def consumer
  @consumer
end

#consumer_class_instanceObject (readonly)

Returns the value of attribute consumer_class_instance.



9
10
11
# File 'lib/racecar/partition_processor.rb', line 9

def consumer_class_instance
  @consumer_class_instance
end

#instrumenterObject (readonly)

Returns the value of attribute instrumenter.



9
10
11
# File 'lib/racecar/partition_processor.rb', line 9

def instrumenter
  @instrumenter
end

#loggerObject (readonly)

Returns the value of attribute logger.



9
10
11
# File 'lib/racecar/partition_processor.rb', line 9

def logger
  @logger
end

#partitionObject (readonly)

Returns the value of attribute partition.



9
10
11
# File 'lib/racecar/partition_processor.rb', line 9

def partition
  @partition
end

#pauseObject (readonly)

Returns the value of attribute pause.



9
10
11
# File 'lib/racecar/partition_processor.rb', line 9

def pause
  @pause
end

#rebalancingObject

Returns the value of attribute rebalancing.



10
11
12
# File 'lib/racecar/partition_processor.rb', line 10

def rebalancing
  @rebalancing
end

#shutting_downObject

Returns the value of attribute shutting_down.



10
11
12
# File 'lib/racecar/partition_processor.rb', line 10

def shutting_down
  @shutting_down
end

#topicObject (readonly)

Returns the value of attribute topic.



9
10
11
# File 'lib/racecar/partition_processor.rb', line 9

def topic
  @topic
end

Instance Method Details

#process(message) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/racecar/partition_processor.rb', line 36

def process(message)
  payload = {
    consumer_class: consumer_class_instance.class.to_s,
    topic:          message.topic,
    partition:      message.partition,
    offset:         message.offset,
    create_time:    message.timestamp,
    key:            message.key,
    value:          message.payload,
    headers:        message.headers,
  }
  @instrumenter.instrument("start_process_message", payload)

  with_error_handling(message, payload) do |pause|
    @instrumenter.instrument("process_message", payload) do
      if @config.multithreaded_processing_enabled && consumer_class_instance.instance_variable_get(:@producer)&.closed?
        reconfigure_consumer_class_instance!
      end
      consumer_class_instance.process(Racecar::Message.new(message, retries_count: pause.pauses_count))
      consumer_class_instance.deliver!
      consumer.store_offset(message, @rdkafka_consumer) unless rebalancing
    end
  end
end

#process_batch(messages) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/racecar/partition_processor.rb', line 61

def process_batch(messages)
  first, last = messages.first, messages.last
  payload = {
    consumer_class:   consumer_class_instance.class.to_s,
    topic:            first.topic,
    partition:        first.partition,
    first_offset:     first.offset,
    last_offset:      last.offset,
    last_create_time: last.timestamp,
    message_count:    messages.size,
  }
  @instrumenter.instrument("start_process_batch", payload)

  with_error_handling(messages, payload) do |pause|
    @instrumenter.instrument("process_batch", payload) do
      racecar_messages = messages.map do |message|
        Racecar::Message.new(message, retries_count: pause.pauses_count)
      end
      if @config.multithreaded_processing_enabled && consumer_class_instance.instance_variable_get(:@producer)&.closed?
        reconfigure_consumer_class_instance!
      end
      consumer_class_instance.process_batch(racecar_messages)
      consumer_class_instance.deliver!
      consumer.store_offset(messages.last, @rdkafka_consumer) unless rebalancing
    end
  end
end

#rebalance!Object



112
113
114
115
# File 'lib/racecar/partition_processor.rb', line 112

def rebalance!
  @rebalancing = true
  @sleep_mutex.synchronize { @sleep_cv.signal }
end

#rebalancing_or_shutting_down?Boolean

Returns:

  • (Boolean)


123
124
125
# File 'lib/racecar/partition_processor.rb', line 123

def rebalancing_or_shutting_down?
  rebalancing || shutting_down
end

#resume_paused_partitionObject



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/racecar/partition_processor.rb', line 95

def resume_paused_partition
  return if config.pause_timeout == 0 || !pause.paused?

  @instrumenter.instrument("pause_status", {
    topic:          topic,
    partition:      partition,
    duration:       pause.pause_duration,
    consumer_class: consumer_class_instance.class.to_s,
  })

  if pause.paused? && pause.expired?
    logger.info "Automatically resuming partition #{topic}/#{partition}, pause timeout expired"
    consumer.resume(topic, partition)
    pause.resume!
  end
end

#shut_down!Object



117
118
119
120
121
# File 'lib/racecar/partition_processor.rb', line 117

def shut_down!
  @shutting_down = true
  @sleep_mutex.synchronize { @sleep_cv.signal }
  resume_paused_partition
end

#teardownObject



89
90
91
92
93
# File 'lib/racecar/partition_processor.rb', line 89

def teardown
  consumer_class_instance.deliver! unless rebalancing
ensure
  consumer_class_instance.teardown
end