Class: Racecar::PartitionProcessor
- Inherits:
-
Object
- Object
- Racecar::PartitionProcessor
- Defined in:
- lib/racecar/partition_processor.rb
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#consumer ⇒ Object
readonly
Returns the value of attribute consumer.
-
#consumer_class_instance ⇒ Object
readonly
Returns the value of attribute consumer_class_instance.
-
#instrumenter ⇒ Object
readonly
Returns the value of attribute instrumenter.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#partition ⇒ Object
readonly
Returns the value of attribute partition.
-
#pause ⇒ Object
readonly
Returns the value of attribute pause.
-
#rebalancing ⇒ Object
Returns the value of attribute rebalancing.
-
#shutting_down ⇒ Object
Returns the value of attribute shutting_down.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Instance Method Summary collapse
-
#initialize(config:, logger:, instrumenter:, consumer_class_instance:, consumer:, topic:, partition:, pause:, rdkafka_consumer: nil) ⇒ PartitionProcessor
constructor
A new instance of PartitionProcessor.
- #process(message) ⇒ Object
- #process_batch(messages) ⇒ Object
- #rebalance! ⇒ Object
- #rebalancing_or_shutting_down? ⇒ Boolean
- #resume_paused_partition ⇒ Object
- #shut_down! ⇒ Object
- #teardown ⇒ Object
Constructor Details
#initialize(config:, logger:, instrumenter:, consumer_class_instance:, consumer:, topic:, partition:, pause:, rdkafka_consumer: nil) ⇒ PartitionProcessor
Returns a new instance of PartitionProcessor.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/racecar/partition_processor.rb', line 12 def initialize(config:, logger:, instrumenter:, consumer_class_instance:, consumer:, topic:, partition:, pause:, rdkafka_consumer: nil) @config = config @logger = logger @instrumenter = instrumenter @consumer_class_instance = consumer_class_instance @pause = pause @topic = topic @partition = partition @consumer = consumer @rdkafka_consumer = rdkafka_consumer if config.multithreaded_processing_enabled consumer_class_instance.configure( producer: consumer.producer, consumer: @consumer, instrumenter: @instrumenter, config: @config, ) end @sleep_mutex = Mutex.new @sleep_cv = ConditionVariable.new end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
9 10 11 |
# File 'lib/racecar/partition_processor.rb', line 9 def config @config end |
#consumer ⇒ Object (readonly)
Returns the value of attribute consumer.
9 10 11 |
# File 'lib/racecar/partition_processor.rb', line 9 def consumer @consumer end |
#consumer_class_instance ⇒ Object (readonly)
Returns the value of attribute consumer_class_instance.
9 10 11 |
# File 'lib/racecar/partition_processor.rb', line 9 def consumer_class_instance @consumer_class_instance end |
#instrumenter ⇒ Object (readonly)
Returns the value of attribute instrumenter.
9 10 11 |
# File 'lib/racecar/partition_processor.rb', line 9 def instrumenter @instrumenter end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
9 10 11 |
# File 'lib/racecar/partition_processor.rb', line 9 def logger @logger end |
#partition ⇒ Object (readonly)
Returns the value of attribute partition.
9 10 11 |
# File 'lib/racecar/partition_processor.rb', line 9 def partition @partition end |
#pause ⇒ Object (readonly)
Returns the value of attribute pause.
9 10 11 |
# File 'lib/racecar/partition_processor.rb', line 9 def pause @pause end |
#rebalancing ⇒ Object
Returns the value of attribute rebalancing.
10 11 12 |
# File 'lib/racecar/partition_processor.rb', line 10 def rebalancing @rebalancing end |
#shutting_down ⇒ Object
Returns the value of attribute shutting_down.
10 11 12 |
# File 'lib/racecar/partition_processor.rb', line 10 def shutting_down @shutting_down end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
9 10 11 |
# File 'lib/racecar/partition_processor.rb', line 9 def topic @topic end |
Instance Method Details
#process(message) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/racecar/partition_processor.rb', line 36 def process() payload = { consumer_class: consumer_class_instance.class.to_s, topic: .topic, partition: .partition, offset: .offset, create_time: ., key: .key, value: .payload, headers: .headers, } @instrumenter.instrument("start_process_message", payload) with_error_handling(, payload) do |pause| @instrumenter.instrument("process_message", payload) do if @config.multithreaded_processing_enabled && consumer_class_instance.instance_variable_get(:@producer)&.closed? reconfigure_consumer_class_instance! end consumer_class_instance.process(Racecar::Message.new(, retries_count: pause.pauses_count)) consumer_class_instance.deliver! consumer.store_offset(, @rdkafka_consumer) unless rebalancing end end end |
#process_batch(messages) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/racecar/partition_processor.rb', line 61 def process_batch() first, last = .first, .last payload = { consumer_class: consumer_class_instance.class.to_s, topic: first.topic, partition: first.partition, first_offset: first.offset, last_offset: last.offset, last_create_time: last., message_count: .size, } @instrumenter.instrument("start_process_batch", payload) with_error_handling(, payload) do |pause| @instrumenter.instrument("process_batch", payload) do = .map do || Racecar::Message.new(, retries_count: pause.pauses_count) end if @config.multithreaded_processing_enabled && consumer_class_instance.instance_variable_get(:@producer)&.closed? reconfigure_consumer_class_instance! end consumer_class_instance.process_batch() consumer_class_instance.deliver! consumer.store_offset(.last, @rdkafka_consumer) unless rebalancing end end end |
#rebalance! ⇒ Object
112 113 114 115 |
# File 'lib/racecar/partition_processor.rb', line 112 def rebalance! @rebalancing = true @sleep_mutex.synchronize { @sleep_cv.signal } end |
#rebalancing_or_shutting_down? ⇒ Boolean
123 124 125 |
# File 'lib/racecar/partition_processor.rb', line 123 def rebalancing_or_shutting_down? rebalancing || shutting_down end |
#resume_paused_partition ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/racecar/partition_processor.rb', line 95 def resume_paused_partition return if config.pause_timeout == 0 || !pause.paused? @instrumenter.instrument("pause_status", { topic: topic, partition: partition, duration: pause.pause_duration, consumer_class: consumer_class_instance.class.to_s, }) if pause.paused? && pause.expired? logger.info "Automatically resuming partition #{topic}/#{partition}, pause timeout expired" consumer.resume(topic, partition) pause.resume! end end |
#shut_down! ⇒ Object
117 118 119 120 121 |
# File 'lib/racecar/partition_processor.rb', line 117 def shut_down! @shutting_down = true @sleep_mutex.synchronize { @sleep_cv.signal } resume_paused_partition end |
#teardown ⇒ Object
89 90 91 92 93 |
# File 'lib/racecar/partition_processor.rb', line 89 def teardown consumer_class_instance.deliver! unless rebalancing ensure consumer_class_instance.teardown end |