Class: Rdkafka::AbstractHandle
- Inherits:
-
FFI::Struct
- Object
- FFI::Struct
- Rdkafka::AbstractHandle
- Includes:
- Helpers::Time
- Defined in:
- lib/rdkafka/abstract_handle.rb
Overview
This class serves as an abstract base class to represent handles within the Rdkafka module.
As a subclass of FFI::Struct, this class provides a blueprint for other specific handle
classes to inherit from, ensuring they adhere to a particular structure and behavior.
Subclasses must define their own layout, and the layout must start with:
layout :pending, :bool, :response, :int
Direct Known Subclasses
Rdkafka::Admin::CreateAclHandle, Rdkafka::Admin::CreatePartitionsHandle, Rdkafka::Admin::CreateTopicHandle, Rdkafka::Admin::DeleteAclHandle, Rdkafka::Admin::DeleteGroupsHandle, Rdkafka::Admin::DeleteTopicHandle, Rdkafka::Admin::DescribeAclHandle, Rdkafka::Admin::DescribeConfigsHandle, Rdkafka::Admin::IncrementalAlterConfigsHandle, Producer::DeliveryHandle
Defined Under Namespace
Classes: WaitTimeoutError
Constant Summary collapse
- REGISTRY =
Registry for registering all the handles.
{}
- WAIT_TIMEOUT_DEPRECATION_MESSAGE =
Deprecation message for wait_timeout argument in wait method
"The 'wait_timeout' argument is deprecated and will be removed in future versions without replacement. " \ "We don't rely on it's value anymore. Please refactor your code to remove references to it."
Class Method Summary collapse
-
.register(handle) ⇒ Object
Adds handle to the register.
-
.remove(address) ⇒ Object
Removes handle from the register based on the handle address.
Instance Method Summary collapse
-
#create_result ⇒ Object
Operation-specific result.
-
#initialize ⇒ AbstractHandle
constructor
A new instance of AbstractHandle.
-
#operation_name ⇒ String
The name of the operation (e.g. "delivery").
-
#pending? ⇒ Boolean
Whether the handle is still pending.
-
#raise_error ⇒ Object
Allow subclasses to override.
-
#unlock ⇒ Object
Unlock the resources.
-
#wait(max_wait_timeout: 60, wait_timeout: nil, raise_response_error: true) ⇒ Object
Wait for the operation to complete or raise an error if this takes longer than the timeout.
Methods included from Helpers::Time
Constructor Details
#initialize ⇒ AbstractHandle
Returns a new instance of AbstractHandle.
42 43 44 45 46 47 |
# File 'lib/rdkafka/abstract_handle.rb', line 42 def initialize @mutex = Thread::Mutex.new @resource = Thread::ConditionVariable.new super end |
Class Method Details
.register(handle) ⇒ Object
Adds handle to the register
29 30 31 32 |
# File 'lib/rdkafka/abstract_handle.rb', line 29 def register(handle) address = handle.to_ptr.address REGISTRY[address] = handle end |
.remove(address) ⇒ Object
Removes handle from the register based on the handle address
37 38 39 |
# File 'lib/rdkafka/abstract_handle.rb', line 37 def remove(address) REGISTRY.delete(address) end |
Instance Method Details
#create_result ⇒ Object
Returns operation-specific result.
109 110 111 |
# File 'lib/rdkafka/abstract_handle.rb', line 109 def create_result raise "Must be implemented by subclass!" end |
#operation_name ⇒ String
Returns the name of the operation (e.g. "delivery").
104 105 106 |
# File 'lib/rdkafka/abstract_handle.rb', line 104 def operation_name raise "Must be implemented by subclass!" end |
#pending? ⇒ Boolean
Whether the handle is still pending.
52 53 54 |
# File 'lib/rdkafka/abstract_handle.rb', line 52 def pending? self[:pending] end |
#raise_error ⇒ Object
Allow subclasses to override
114 115 116 |
# File 'lib/rdkafka/abstract_handle.rb', line 114 def raise_error RdkafkaError.validate!(self[:response]) end |
#unlock ⇒ Object
Unlock the resources
96 97 98 99 100 101 |
# File 'lib/rdkafka/abstract_handle.rb', line 96 def unlock @mutex.synchronize do self[:pending] = false @resource.broadcast end end |
#wait(max_wait_timeout: 60, wait_timeout: nil, raise_response_error: true) ⇒ Object
Wait for the operation to complete or raise an error if this takes longer than the timeout. If there is a timeout this does not mean the operation failed, rdkafka might still be working on the operation. In this case it is possible to call wait again.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/rdkafka/abstract_handle.rb', line 69 def wait(max_wait_timeout: 60, wait_timeout: nil, raise_response_error: true) Kernel.warn(WAIT_TIMEOUT_DEPRECATION_MESSAGE) unless wait_timeout.nil? timeout = max_wait_timeout ? monotonic_now + max_wait_timeout : MAX_WAIT_TIMEOUT_FOREVER @mutex.synchronize do loop do if pending? to_wait = (timeout - monotonic_now) if to_wait.positive? @resource.wait(@mutex, to_wait) else raise WaitTimeoutError.new( "Waiting for #{operation_name} timed out after #{max_wait_timeout} seconds" ) end elsif self[:response] != 0 && raise_response_error raise_error else return create_result end end end end |