Class: Rdkafka::Callbacks::BackgroundEventCallback

Inherits:
Object
  • Object
show all
Defined in:
lib/rdkafka/callbacks.rb

Class Method Summary collapse

Class Method Details

.call(_client_ptr, event_ptr, _opaque_ptr) ⇒ Object

Handles background events from librdkafka

Parameters:

  • _client_ptr (FFI::Pointer)

    unused client pointer

  • event_ptr (FFI::Pointer)

    pointer to the event

  • _opaque_ptr (FFI::Pointer)

    unused opaque pointer



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/rdkafka/callbacks.rb', line 210

def self.call(_client_ptr, event_ptr, _opaque_ptr)
  case Rdkafka::Bindings.rd_kafka_event_type(event_ptr)
  when Rdkafka::Bindings::RD_KAFKA_EVENT_CREATETOPICS_RESULT
    process_create_topic(event_ptr)
  when Rdkafka::Bindings::RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT
    process_describe_configs(event_ptr)
  when Rdkafka::Bindings::RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT
    process_incremental_alter_configs(event_ptr)
  when Rdkafka::Bindings::RD_KAFKA_EVENT_DELETETOPICS_RESULT
    process_delete_topic(event_ptr)
  when Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_CREATEPARTITIONS_RESULT
    process_create_partitions(event_ptr)
  when Rdkafka::Bindings::RD_KAFKA_EVENT_CREATEACLS_RESULT
    process_create_acl(event_ptr)
  when Rdkafka::Bindings::RD_KAFKA_EVENT_DELETEACLS_RESULT
    process_delete_acl(event_ptr)
  when Rdkafka::Bindings::RD_KAFKA_EVENT_DESCRIBEACLS_RESULT
    process_describe_acl(event_ptr)
  when Rdkafka::Bindings::RD_KAFKA_EVENT_DELETEGROUPS_RESULT
    process_delete_groups(event_ptr)
  when Rdkafka::Bindings::RD_KAFKA_EVENT_LISTOFFSETS_RESULT
    process_list_offsets(event_ptr)
  end
end

.process_create_acl(event_ptr) ⇒ Object

Processes create ACL result event

Parameters:

  • event_ptr (FFI::Pointer)

    pointer to the event



359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
# File 'lib/rdkafka/callbacks.rb', line 359

def self.process_create_acl(event_ptr)
  create_acls_result = Rdkafka::Bindings.rd_kafka_event_CreateAcls_result(event_ptr)

  # Get the number of acl results
  pointer_to_size_t = FFI::MemoryPointer.new(:int32)
  create_acl_result_array = Rdkafka::Bindings.rd_kafka_CreateAcls_result_acls(create_acls_result, pointer_to_size_t)
  create_acl_results = CreateAclResult.create_acl_results_from_array(pointer_to_size_t.read_int, create_acl_result_array)
  create_acl_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr)

  if create_acl_handle = Rdkafka::Admin::CreateAclHandle.remove(create_acl_handle_ptr.address)
    create_acl_handle[:response] = create_acl_results[0].result_error
    create_acl_handle[:response_string] = create_acl_results[0].error_string

    create_acl_handle.unlock
  end
end

.process_create_partitions(event_ptr) ⇒ Object

Processes create partitions result event

Parameters:

  • event_ptr (FFI::Pointer)

    pointer to the event



339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
# File 'lib/rdkafka/callbacks.rb', line 339

def self.process_create_partitions(event_ptr)
  create_partitionss_result = Rdkafka::Bindings.rd_kafka_event_CreatePartitions_result(event_ptr)

  # Get the number of create topic results
  pointer_to_size_t = FFI::MemoryPointer.new(:int32)
  create_partitions_result_array = Rdkafka::Bindings.rd_kafka_CreatePartitions_result_topics(create_partitionss_result, pointer_to_size_t)
  create_partitions_results = TopicResult.create_topic_results_from_array(pointer_to_size_t.read_int, create_partitions_result_array)
  create_partitions_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr)

  if create_partitions_handle = Rdkafka::Admin::CreatePartitionsHandle.remove(create_partitions_handle_ptr.address)
    create_partitions_handle[:response] = create_partitions_results[0].result_error
    create_partitions_handle[:error_string] = create_partitions_results[0].error_string
    create_partitions_handle[:result_name] = create_partitions_results[0].result_name

    create_partitions_handle.unlock
  end
end

.process_create_topic(event_ptr) ⇒ Object

Processes create topic result event

Parameters:

  • event_ptr (FFI::Pointer)

    pointer to the event



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/rdkafka/callbacks.rb', line 239

def self.process_create_topic(event_ptr)
  create_topics_result = Rdkafka::Bindings.rd_kafka_event_CreateTopics_result(event_ptr)

  # Get the number of create topic results
  pointer_to_size_t = FFI::MemoryPointer.new(:int32)
  create_topic_result_array = Rdkafka::Bindings.rd_kafka_CreateTopics_result_topics(create_topics_result, pointer_to_size_t)
  create_topic_results = TopicResult.create_topic_results_from_array(pointer_to_size_t.read_int, create_topic_result_array)
  create_topic_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr)

  if create_topic_handle = Rdkafka::Admin::CreateTopicHandle.remove(create_topic_handle_ptr.address)
    create_topic_handle[:response] = create_topic_results[0].result_error
    create_topic_handle[:error_string] = create_topic_results[0].error_string
    create_topic_handle[:result_name] = create_topic_results[0].result_name

    create_topic_handle.unlock
  end
end

.process_delete_acl(event_ptr) ⇒ Object

Processes delete ACL result event

Parameters:

  • event_ptr (FFI::Pointer)

    pointer to the event



378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
# File 'lib/rdkafka/callbacks.rb', line 378

def self.process_delete_acl(event_ptr)
  delete_acls_result = Rdkafka::Bindings.rd_kafka_event_DeleteAcls_result(event_ptr)

  # Get the number of acl results
  pointer_to_size_t = FFI::MemoryPointer.new(:int32)
  delete_acl_result_responses = Rdkafka::Bindings.rd_kafka_DeleteAcls_result_responses(delete_acls_result, pointer_to_size_t)
  delete_acl_results = DeleteAclResult.delete_acl_results_from_array(pointer_to_size_t.read_int, delete_acl_result_responses)
  delete_acl_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr)

  if delete_acl_handle = Rdkafka::Admin::DeleteAclHandle.remove(delete_acl_handle_ptr.address)
    delete_acl_handle[:response] = delete_acl_results[0].result_error
    delete_acl_handle[:response_string] = delete_acl_results[0].error_string

    if delete_acl_results[0].result_error == Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
      delete_acl_handle[:matching_acls] = delete_acl_results[0].matching_acls
      delete_acl_handle[:matching_acls_count] = delete_acl_results[0].matching_acls_count
    end

    delete_acl_handle.unlock
  end
end

.process_delete_groups(event_ptr) ⇒ Object

Processes delete groups result event

Parameters:

  • event_ptr (FFI::Pointer)

    pointer to the event



299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/rdkafka/callbacks.rb', line 299

def self.process_delete_groups(event_ptr)
  delete_groups_result = Rdkafka::Bindings.rd_kafka_event_DeleteGroups_result(event_ptr)

  # Get the number of delete group results
  pointer_to_size_t = FFI::MemoryPointer.new(:size_t)
  delete_group_result_array = Rdkafka::Bindings.rd_kafka_DeleteGroups_result_groups(delete_groups_result, pointer_to_size_t)
  delete_group_results = GroupResult.create_group_results_from_array(pointer_to_size_t.read_int, delete_group_result_array) # TODO fix this
  delete_group_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr)

  if (delete_group_handle = Rdkafka::Admin::DeleteGroupsHandle.remove(delete_group_handle_ptr.address))
    delete_group_handle[:response] = delete_group_results[0].result_error
    delete_group_handle[:error_string] = delete_group_results[0].error_string
    delete_group_handle[:result_name] = delete_group_results[0].result_name

    delete_group_handle.unlock
  end
end

.process_delete_topic(event_ptr) ⇒ Object

Processes delete topic result event

Parameters:

  • event_ptr (FFI::Pointer)

    pointer to the event



319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/rdkafka/callbacks.rb', line 319

def self.process_delete_topic(event_ptr)
  delete_topics_result = Rdkafka::Bindings.rd_kafka_event_DeleteTopics_result(event_ptr)

  # Get the number of topic results
  pointer_to_size_t = FFI::MemoryPointer.new(:int32)
  delete_topic_result_array = Rdkafka::Bindings.rd_kafka_DeleteTopics_result_topics(delete_topics_result, pointer_to_size_t)
  delete_topic_results = TopicResult.create_topic_results_from_array(pointer_to_size_t.read_int, delete_topic_result_array)
  delete_topic_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr)

  if delete_topic_handle = Rdkafka::Admin::DeleteTopicHandle.remove(delete_topic_handle_ptr.address)
    delete_topic_handle[:response] = delete_topic_results[0].result_error
    delete_topic_handle[:error_string] = delete_topic_results[0].error_string
    delete_topic_handle[:result_name] = delete_topic_results[0].result_name

    delete_topic_handle.unlock
  end
end

.process_describe_acl(event_ptr) ⇒ Object

Processes describe ACL result event

Parameters:

  • event_ptr (FFI::Pointer)

    pointer to the event



402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
# File 'lib/rdkafka/callbacks.rb', line 402

def self.process_describe_acl(event_ptr)
  describe_acl = DescribeAclResult.new(event_ptr)
  describe_acl_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr)

  if describe_acl_handle = Rdkafka::Admin::DescribeAclHandle.remove(describe_acl_handle_ptr.address)
    describe_acl_handle[:response] = describe_acl.result_error
    describe_acl_handle[:response_string] = describe_acl.error_string

    if describe_acl.result_error == Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
      describe_acl_handle[:acls] = describe_acl.matching_acls
      describe_acl_handle[:acls_count] = describe_acl.matching_acls_count
    end

    describe_acl_handle.unlock
  end
end

.process_describe_configs(event_ptr) ⇒ Object

Processes describe configs result event

Parameters:

  • event_ptr (FFI::Pointer)

    pointer to the event



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/rdkafka/callbacks.rb', line 259

def self.process_describe_configs(event_ptr)
  describe_configs = DescribeConfigsResult.new(event_ptr)
  describe_configs_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr)

  if describe_configs_handle = Rdkafka::Admin::DescribeConfigsHandle.remove(describe_configs_handle_ptr.address)
    describe_configs_handle[:response] = describe_configs.result_error
    describe_configs_handle[:response_string] = describe_configs.error_string
    describe_configs_handle[:pending] = false

    if describe_configs.result_error == Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
      describe_configs_handle[:config_entries] = describe_configs.results
      describe_configs_handle[:entry_count] = describe_configs.results_count
    end

    describe_configs_handle.unlock
  end
end

.process_incremental_alter_configs(event_ptr) ⇒ Object

Processes incremental alter configs result event

Parameters:

  • event_ptr (FFI::Pointer)

    pointer to the event



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# File 'lib/rdkafka/callbacks.rb', line 279

def self.process_incremental_alter_configs(event_ptr)
  incremental_alter = IncrementalAlterConfigsResult.new(event_ptr)
  incremental_alter_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr)

  if incremental_alter_handle = Rdkafka::Admin::IncrementalAlterConfigsHandle.remove(incremental_alter_handle_ptr.address)
    incremental_alter_handle[:response] = incremental_alter.result_error
    incremental_alter_handle[:response_string] = incremental_alter.error_string
    incremental_alter_handle[:pending] = false

    if incremental_alter.result_error == Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
      incremental_alter_handle[:config_entries] = incremental_alter.results
      incremental_alter_handle[:entry_count] = incremental_alter.results_count
    end

    incremental_alter_handle.unlock
  end
end

.process_list_offsets(event_ptr) ⇒ Object

Processes list offsets result event

Parameters:

  • event_ptr (FFI::Pointer)

    pointer to the event



421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
# File 'lib/rdkafka/callbacks.rb', line 421

def self.process_list_offsets(event_ptr)
  list_offsets = ListOffsetsResult.new(event_ptr)
  list_offsets_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr)

  if list_offsets_handle = Rdkafka::Admin::ListOffsetsHandle.remove(list_offsets_handle_ptr.address)
    list_offsets_handle[:response] = list_offsets.result_error
    list_offsets_handle[:response_string] = list_offsets.error_string
    list_offsets_handle[:pending] = false

    if list_offsets.result_error == Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
      list_offsets_handle[:result_infos] = list_offsets.result_infos
      list_offsets_handle[:result_count] = list_offsets.result_count
    end

    list_offsets_handle.unlock
  end
end