Class: Rdkafka::AbstractHandle
- Inherits:
-
FFI::Struct
- Object
- FFI::Struct
- Rdkafka::AbstractHandle
- Includes:
- Helpers::Time
- Defined in:
- lib/rdkafka/abstract_handle.rb
Overview
This class serves as an abstract base class to represent handles within the Rdkafka module. As a subclass of ‘FFI::Struct`, this class provides a blueprint for other specific handle classes to inherit from, ensuring they adhere to a particular structure and behavior.
Subclasses must define their own layout, and the layout must start with:
layout :pending, :bool,
:response, :int
Direct Known Subclasses
Rdkafka::Admin::CreateAclHandle, Rdkafka::Admin::CreatePartitionsHandle, Rdkafka::Admin::CreateTopicHandle, Rdkafka::Admin::DeleteAclHandle, Rdkafka::Admin::DeleteGroupsHandle, Rdkafka::Admin::DeleteTopicHandle, Rdkafka::Admin::DescribeAclHandle, Rdkafka::Admin::DescribeConfigsHandle, Rdkafka::Admin::IncrementalAlterConfigsHandle, Rdkafka::Admin::ListOffsetsHandle, Producer::DeliveryHandle
Defined Under Namespace
Classes: WaitTimeoutError
Constant Summary collapse
- REGISTRY =
Registry for registering all the handles.
{}
Class Method Summary collapse
-
.register(handle) ⇒ Object
Adds handle to the register.
-
.remove(address) ⇒ Object
Removes handle from the register based on the handle address.
Instance Method Summary collapse
-
#create_result ⇒ Object
Operation-specific result.
-
#initialize ⇒ AbstractHandle
constructor
A new instance of AbstractHandle.
-
#operation_name ⇒ String
The name of the operation (e.g. “delivery”).
-
#pending? ⇒ Boolean
Whether the handle is still pending.
-
#raise_error ⇒ Object
Allow subclasses to override.
-
#unlock ⇒ Object
Unlock the resources.
-
#wait(max_wait_timeout: :not_provided, max_wait_timeout_ms: :not_provided, raise_response_error: true) ⇒ Object
Wait for the operation to complete or raise an error if this takes longer than the timeout.
Methods included from Helpers::Time
#monotonic_now, #monotonic_now_ms
Constructor Details
#initialize ⇒ AbstractHandle
Returns a new instance of AbstractHandle.
39 40 41 42 43 44 |
# File 'lib/rdkafka/abstract_handle.rb', line 39 def initialize @mutex = Thread::Mutex.new @resource = Thread::ConditionVariable.new super end |
Class Method Details
.register(handle) ⇒ Object
Adds handle to the register
26 27 28 29 |
# File 'lib/rdkafka/abstract_handle.rb', line 26 def register(handle) address = handle.to_ptr.address REGISTRY[address] = handle end |
.remove(address) ⇒ Object
Removes handle from the register based on the handle address
34 35 36 |
# File 'lib/rdkafka/abstract_handle.rb', line 34 def remove(address) REGISTRY.delete(address) end |
Instance Method Details
#create_result ⇒ Object
Returns operation-specific result.
121 122 123 |
# File 'lib/rdkafka/abstract_handle.rb', line 121 def create_result raise "Must be implemented by subclass!" end |
#operation_name ⇒ String
Returns the name of the operation (e.g. “delivery”).
116 117 118 |
# File 'lib/rdkafka/abstract_handle.rb', line 116 def operation_name raise "Must be implemented by subclass!" end |
#pending? ⇒ Boolean
Whether the handle is still pending.
49 50 51 |
# File 'lib/rdkafka/abstract_handle.rb', line 49 def pending? self[:pending] end |
#raise_error ⇒ Object
Allow subclasses to override
126 127 128 |
# File 'lib/rdkafka/abstract_handle.rb', line 126 def raise_error raise RdkafkaError.new(self[:response]) end |
#unlock ⇒ Object
Unlock the resources
108 109 110 111 112 113 |
# File 'lib/rdkafka/abstract_handle.rb', line 108 def unlock @mutex.synchronize do self[:pending] = false @resource.broadcast end end |
#wait(max_wait_timeout: :not_provided, max_wait_timeout_ms: :not_provided, raise_response_error: true) ⇒ Object
Wait for the operation to complete or raise an error if this takes longer than the timeout. If there is a timeout this does not mean the operation failed, rdkafka might still be working on the operation. In this case it is possible to call wait again.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/rdkafka/abstract_handle.rb', line 67 def wait(max_wait_timeout: :not_provided, max_wait_timeout_ms: :not_provided, raise_response_error: true) # Determine which timeout value to use if max_wait_timeout != :not_provided && max_wait_timeout_ms != :not_provided warn "DEPRECATION WARNING: Both max_wait_timeout and max_wait_timeout_ms were provided. " \ "Using max_wait_timeout_ms. The max_wait_timeout parameter is deprecated and will be removed in v1.0.0." timeout_ms = max_wait_timeout_ms elsif max_wait_timeout != :not_provided warn "DEPRECATION WARNING: max_wait_timeout (seconds) is deprecated. " \ "Use max_wait_timeout_ms (milliseconds) instead. This parameter will be removed in v1.0.0." timeout_ms = max_wait_timeout ? (max_wait_timeout * 1000).to_i : nil elsif max_wait_timeout_ms == :not_provided timeout_ms = Defaults::HANDLE_WAIT_TIMEOUT_MS else timeout_ms = max_wait_timeout_ms end timeout_s = timeout_ms ? timeout_ms / 1000.0 : nil timeout = timeout_s ? monotonic_now + timeout_s : MAX_WAIT_TIMEOUT_FOREVER @mutex.synchronize do loop do if pending? to_wait = (timeout - monotonic_now) if to_wait.positive? @resource.wait(@mutex, to_wait) else raise WaitTimeoutError.new( "Waiting for #{operation_name} timed out after #{timeout_ms} ms" ) end elsif self[:response] != 0 && raise_response_error raise_error else return create_result end end end end |