Class: Rdkafka::AbstractHandle

Inherits:
FFI::Struct
  • Object
show all
Includes:
Helpers::Time
Defined in:
lib/rdkafka/abstract_handle.rb

Overview

This class serves as an abstract base class to represent handles within the Rdkafka module. As a subclass of ‘FFI::Struct`, this class provides a blueprint for other specific handle classes to inherit from, ensuring they adhere to a particular structure and behavior.

Subclasses must define their own layout, and the layout must start with:

layout :pending, :bool,

:response, :int

Defined Under Namespace

Classes: WaitTimeoutError

Constant Summary collapse

REGISTRY =

Registry for registering all the handles.

{}

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Helpers::Time

#monotonic_now, #monotonic_now_ms

Constructor Details

#initializeAbstractHandle

Returns a new instance of AbstractHandle.



39
40
41
42
43
44
# File 'lib/rdkafka/abstract_handle.rb', line 39

def initialize
  @mutex = Thread::Mutex.new
  @resource = Thread::ConditionVariable.new

  super
end

Class Method Details

.register(handle) ⇒ Object

Adds handle to the register

Parameters:



26
27
28
29
# File 'lib/rdkafka/abstract_handle.rb', line 26

def register(handle)
  address = handle.to_ptr.address
  REGISTRY[address] = handle
end

.remove(address) ⇒ Object

Removes handle from the register based on the handle address

Parameters:

  • address (Integer)

    address of the registered handle we want to remove



34
35
36
# File 'lib/rdkafka/abstract_handle.rb', line 34

def remove(address)
  REGISTRY.delete(address)
end

Instance Method Details

#create_resultObject

Returns operation-specific result.

Returns:

  • (Object)

    operation-specific result



121
122
123
# File 'lib/rdkafka/abstract_handle.rb', line 121

def create_result
  raise "Must be implemented by subclass!"
end

#operation_nameString

Returns the name of the operation (e.g. “delivery”).

Returns:

  • (String)

    the name of the operation (e.g. “delivery”)



116
117
118
# File 'lib/rdkafka/abstract_handle.rb', line 116

def operation_name
  raise "Must be implemented by subclass!"
end

#pending?Boolean

Whether the handle is still pending.

Returns:

  • (Boolean)


49
50
51
# File 'lib/rdkafka/abstract_handle.rb', line 49

def pending?
  self[:pending]
end

#raise_errorObject

Allow subclasses to override

Raises:



126
127
128
# File 'lib/rdkafka/abstract_handle.rb', line 126

def raise_error
  raise RdkafkaError.new(self[:response])
end

#unlockObject

Unlock the resources



108
109
110
111
112
113
# File 'lib/rdkafka/abstract_handle.rb', line 108

def unlock
  @mutex.synchronize do
    self[:pending] = false
    @resource.broadcast
  end
end

#wait(max_wait_timeout: :not_provided, max_wait_timeout_ms: :not_provided, raise_response_error: true) ⇒ Object

Wait for the operation to complete or raise an error if this takes longer than the timeout. If there is a timeout this does not mean the operation failed, rdkafka might still be working on the operation. In this case it is possible to call wait again.

Parameters:

  • max_wait_timeout (Numeric, nil) (defaults to: :not_provided)

    DEPRECATED: Use max_wait_timeout_ms instead. Amount of time in seconds to wait before timing out. Will be removed in v1.0.0.

  • max_wait_timeout_ms (Numeric, nil) (defaults to: :not_provided)

    Amount of time in milliseconds to wait before timing out. If this is nil we will wait forever. Defaults to 60,000ms (60 seconds).

  • raise_response_error (Boolean) (defaults to: true)

    should we raise error when waiting finishes

Returns:

  • (Object)

    Operation-specific result

Raises:



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/rdkafka/abstract_handle.rb', line 67

def wait(max_wait_timeout: :not_provided, max_wait_timeout_ms: :not_provided, raise_response_error: true)
  # Determine which timeout value to use
  if max_wait_timeout != :not_provided && max_wait_timeout_ms != :not_provided
    warn "DEPRECATION WARNING: Both max_wait_timeout and max_wait_timeout_ms were provided. " \
         "Using max_wait_timeout_ms. The max_wait_timeout parameter is deprecated and will be removed in v1.0.0."
    timeout_ms = max_wait_timeout_ms
  elsif max_wait_timeout != :not_provided
    warn "DEPRECATION WARNING: max_wait_timeout (seconds) is deprecated. " \
         "Use max_wait_timeout_ms (milliseconds) instead. This parameter will be removed in v1.0.0."
    timeout_ms = max_wait_timeout ? (max_wait_timeout * 1000).to_i : nil
  elsif max_wait_timeout_ms == :not_provided
    timeout_ms = Defaults::HANDLE_WAIT_TIMEOUT_MS
  else
    timeout_ms = max_wait_timeout_ms
  end

  timeout_s = timeout_ms ? timeout_ms / 1000.0 : nil
  timeout = timeout_s ? monotonic_now + timeout_s : MAX_WAIT_TIMEOUT_FOREVER

  @mutex.synchronize do
    loop do
      if pending?
        to_wait = (timeout - monotonic_now)

        if to_wait.positive?
          @resource.wait(@mutex, to_wait)
        else
          raise WaitTimeoutError.new(
            "Waiting for #{operation_name} timed out after #{timeout_ms} ms"
          )
        end
      elsif self[:response] != 0 && raise_response_error
        raise_error
      else
        return create_result
      end
    end
  end
end