Module: Karafka::Patches::Rdkafka::Bindings
- Includes:
 - Rdkafka::Bindings
 
- Defined in:
 - lib/karafka/patches/rdkafka/bindings.rb
 
Overview
Binding patches that slightly change how rdkafka operates in certain places
Constant Summary collapse
- RB =
          
Alias internally
 ::Rdkafka::Bindings
- RebalanceCallback =
          
This patch changes few things:
- 
it commits offsets (if any) upon partition revocation, so less jobs need to be reprocessed if they are assigned to a different process
 - 
reports callback errors into the errors instrumentation instead of the logger
 - 
catches only StandardError instead of Exception as we fully control the directly executed callbacks
 
 - 
 FFI::Function.new( :void, %i[pointer int pointer pointer] ) do |client_ptr, code, partitions_ptr, opaque_ptr| # Patch reference pr = ::Karafka::Patches::Rdkafka::Bindings tpl = ::Rdkafka::Consumer::TopicPartitionList.from_native_tpl(partitions_ptr).freeze opaque = ::Rdkafka::Config.opaques[opaque_ptr.to_i] if RB.rd_kafka_rebalance_protocol(client_ptr) == 'COOPERATIVE' pr.on_cooperative_rebalance(client_ptr, code, partitions_ptr, tpl, opaque) else pr.on_eager_rebalance(client_ptr, code, partitions_ptr, tpl, opaque) end end
Class Method Summary collapse
- 
  
    
      .on_cooperative_rebalance(client_ptr, code, partitions_ptr, tpl, opaque)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
Handle assignments on cooperative rebalance.
 - 
  
    
      .on_eager_rebalance(client_ptr, code, partitions_ptr, tpl, opaque)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
Handle assignments on a eager rebalance.
 
Class Method Details
.on_cooperative_rebalance(client_ptr, code, partitions_ptr, tpl, opaque) ⇒ Object
Handle assignments on cooperative rebalance
      23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39  | 
    
      # File 'lib/karafka/patches/rdkafka/bindings.rb', line 23 def on_cooperative_rebalance(client_ptr, code, partitions_ptr, tpl, opaque) case code when RB::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS opaque&.call_on_partitions_assign(tpl) RB.rd_kafka_incremental_assign(client_ptr, partitions_ptr) opaque&.call_on_partitions_assigned(tpl) when RB::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS opaque&.call_on_partitions_revoke(tpl) RB.rd_kafka_commit(client_ptr, nil, false) RB.rd_kafka_incremental_unassign(client_ptr, partitions_ptr) opaque&.call_on_partitions_revoked(tpl) else opaque&.call_on_partitions_assign(tpl) RB.rd_kafka_assign(client_ptr, FFI::Pointer::NULL) opaque&.call_on_partitions_assigned(tpl) end end  | 
  
.on_eager_rebalance(client_ptr, code, partitions_ptr, tpl, opaque) ⇒ Object
Handle assignments on a eager rebalance
      48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64  | 
    
      # File 'lib/karafka/patches/rdkafka/bindings.rb', line 48 def on_eager_rebalance(client_ptr, code, partitions_ptr, tpl, opaque) case code when RB::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS opaque&.call_on_partitions_assign(tpl) RB.rd_kafka_assign(client_ptr, partitions_ptr) opaque&.call_on_partitions_assigned(tpl) when RB::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS opaque&.call_on_partitions_revoke(tpl) RB.rd_kafka_commit(client_ptr, nil, false) RB.rd_kafka_assign(client_ptr, FFI::Pointer::NULL) opaque&.call_on_partitions_revoked(tpl) else opaque&.call_on_partitions_assign(tpl) RB.rd_kafka_assign(client_ptr, FFI::Pointer::NULL) opaque&.call_on_partitions_assigned(tpl) end end  |