Class: Racecar::AsyncPartitionProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/racecar/async_partition_processor.rb

Constant Summary collapse

THREAD_KEY_IDENTIFIER =
'racecar_topic_partition_identifier'.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic:, partition:, logger:, config:, consumer:, consumer_class:, instrumenter:, rdkafka_consumer:) ⇒ AsyncPartitionProcessor

Returns a new instance of AsyncPartitionProcessor.



16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/racecar/async_partition_processor.rb', line 16

def initialize(topic:, partition:, logger:, config:, consumer:, consumer_class:, instrumenter:, rdkafka_consumer:)
  @topic      = topic
  @partition  = partition
  @logger     = logger
  @config     = config
  @consumer   = consumer
  @consumer_class = consumer_class
  @instrumenter = instrumenter
  @rdkafka_consumer = rdkafka_consumer
  @backpressure_paused = Concurrent::AtomicBoolean.new
  @tpl = build_tpl(topic, partition)
  setup_async_processing
end

Instance Attribute Details

#threadObject (readonly)

Returns the value of attribute thread.



8
9
10
# File 'lib/racecar/async_partition_processor.rb', line 8

def thread
  @thread
end

Class Method Details

.thread_key(topic, partition) ⇒ Object



12
13
14
# File 'lib/racecar/async_partition_processor.rb', line 12

def self.thread_key(topic, partition)
  "#{topic}/#{partition}"
end

Instance Method Details

#process(message) ⇒ Object



30
31
32
# File 'lib/racecar/async_partition_processor.rb', line 30

def process(message)
  push(message)
end

#process_batch(messages) ⇒ Object



34
35
36
# File 'lib/racecar/async_partition_processor.rb', line 34

def process_batch(messages)
  push(messages)
end

#rebalance!Object



38
39
40
41
# File 'lib/racecar/async_partition_processor.rb', line 38

def rebalance!
  processor.rebalance!
  @queue << nil
end

#rebalancing_or_shutting_down?Boolean

Returns:

  • (Boolean)


48
49
50
# File 'lib/racecar/async_partition_processor.rb', line 48

def rebalancing_or_shutting_down?
  processor.rebalancing_or_shutting_down?
end

#resume_paused_partitionObject



52
53
54
# File 'lib/racecar/async_partition_processor.rb', line 52

def resume_paused_partition
  processor.resume_paused_partition
end

#shut_down!Object



43
44
45
46
# File 'lib/racecar/async_partition_processor.rb', line 43

def shut_down!
  processor.shut_down!
  @queue << nil
end