Class: Racecar::AsyncPartitionProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/racecar/async_partition_processor.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic:, partition:, logger:, config:, consumer:, consumer_class:, instrumenter:, rdkafka_consumer:) ⇒ AsyncPartitionProcessor

Returns a new instance of AsyncPartitionProcessor.



18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/racecar/async_partition_processor.rb', line 18

def initialize(topic:, partition:, logger:, config:, consumer:, consumer_class:, instrumenter:, rdkafka_consumer:)
  @topic      = topic
  @partition  = partition
  @logger     = logger
  @config     = config
  @consumer   = consumer
  @consumer_class = consumer_class
  @instrumenter = instrumenter
  @rdkafka_consumer = rdkafka_consumer
  @backpressure_paused = Concurrent::AtomicBoolean.new
  @tpl = build_tpl(topic, partition)
  setup_async_processing
end

Instance Attribute Details

#threadObject (readonly)

Returns the value of attribute thread.



8
9
10
# File 'lib/racecar/async_partition_processor.rb', line 8

def thread
  @thread
end

Class Method Details

.thread_key(topic, partition) ⇒ Object



10
11
12
# File 'lib/racecar/async_partition_processor.rb', line 10

def self.thread_key(topic, partition)
  "#{topic}/#{partition}"
end

Instance Method Details

#process(message) ⇒ Object



32
33
34
# File 'lib/racecar/async_partition_processor.rb', line 32

def process(message)
  push(message)
end

#process_batch(messages) ⇒ Object



36
37
38
# File 'lib/racecar/async_partition_processor.rb', line 36

def process_batch(messages)
  push(messages)
end

#rebalance!Object



40
41
42
43
# File 'lib/racecar/async_partition_processor.rb', line 40

def rebalance!
  processor.rebalance!
  @queue << nil
end

#rebalancing_or_shutting_down?Boolean

Returns:

  • (Boolean)


50
51
52
# File 'lib/racecar/async_partition_processor.rb', line 50

def rebalancing_or_shutting_down?
  processor.rebalancing_or_shutting_down?
end

#resume_paused_partitionObject



54
55
56
# File 'lib/racecar/async_partition_processor.rb', line 54

def resume_paused_partition
  processor.resume_paused_partition
end

#shut_down!Object



45
46
47
48
# File 'lib/racecar/async_partition_processor.rb', line 45

def shut_down!
  processor.shut_down!
  @queue << nil
end

#thread_keyObject



14
15
16
# File 'lib/racecar/async_partition_processor.rb', line 14

def thread_key
  self.class.thread_key(@topic, @partition)
end