Class: Rdkafka::AbstractHandle
- Inherits:
-
FFI::Struct
- Object
- FFI::Struct
- Rdkafka::AbstractHandle
- Defined in:
- lib/rdkafka/abstract_handle.rb
Direct Known Subclasses
Rdkafka::Admin::CreatePartitionsHandle, Rdkafka::Admin::CreateTopicHandle, Rdkafka::Admin::DeleteTopicHandle, Producer::DeliveryHandle
Defined Under Namespace
Classes: WaitTimeoutError
Constant Summary collapse
- REGISTRY =
Subclasses must define their own layout, and the layout must start with:
layout :pending, :bool, :response, :int
{}
Class Method Summary collapse
Instance Method Summary collapse
-
#create_result ⇒ Object
Operation-specific result.
-
#operation_name ⇒ String
The name of the operation (e.g. "delivery").
-
#pending? ⇒ Boolean
Whether the handle is still pending.
-
#raise_error ⇒ Object
Allow subclasses to override.
-
#wait(max_wait_timeout: 60, wait_timeout: 0.1, raise_response_error: true) ⇒ Object
Wait for the operation to complete or raise an error if this takes longer than the timeout.
Class Method Details
.register(handle) ⇒ Object
18 19 20 21 |
# File 'lib/rdkafka/abstract_handle.rb', line 18 def self.register(handle) address = handle.to_ptr.address REGISTRY[address] = handle end |
.remove(address) ⇒ Object
23 24 25 |
# File 'lib/rdkafka/abstract_handle.rb', line 23 def self.remove(address) REGISTRY.delete(address) end |
Instance Method Details
#create_result ⇒ Object
Returns operation-specific result.
72 73 74 |
# File 'lib/rdkafka/abstract_handle.rb', line 72 def create_result raise "Must be implemented by subclass!" end |
#operation_name ⇒ String
Returns the name of the operation (e.g. "delivery").
67 68 69 |
# File 'lib/rdkafka/abstract_handle.rb', line 67 def operation_name raise "Must be implemented by subclass!" end |
#pending? ⇒ Boolean
Whether the handle is still pending.
30 31 32 |
# File 'lib/rdkafka/abstract_handle.rb', line 30 def pending? self[:pending] end |
#raise_error ⇒ Object
Allow subclasses to override
77 78 79 |
# File 'lib/rdkafka/abstract_handle.rb', line 77 def raise_error RdkafkaError.validate!(self[:response]) end |
#wait(max_wait_timeout: 60, wait_timeout: 0.1, raise_response_error: true) ⇒ Object
Wait for the operation to complete or raise an error if this takes longer than the timeout. If there is a timeout this does not mean the operation failed, rdkafka might still be working on the operation. In this case it is possible to call wait again.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/rdkafka/abstract_handle.rb', line 46 def wait(max_wait_timeout: 60, wait_timeout: 0.1, raise_response_error: true) timeout = if max_wait_timeout CURRENT_TIME.call + max_wait_timeout else nil end loop do if pending? if timeout && timeout <= CURRENT_TIME.call raise WaitTimeoutError.new("Waiting for #{operation_name} timed out after #{max_wait_timeout} seconds") end sleep wait_timeout elsif self[:response] != 0 && raise_response_error raise_error else return create_result end end end |