Class: Rdkafka::AbstractHandle

Inherits:
FFI::Struct
  • Object
show all
Defined in:
lib/rdkafka/abstract_handle.rb

Defined Under Namespace

Classes: WaitTimeoutError

Constant Summary collapse

REGISTRY =

Subclasses must define their own layout, and the layout must start with:

layout :pending, :bool, :response, :int

{}

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.register(handle) ⇒ Object



18
19
20
21
# File 'lib/rdkafka/abstract_handle.rb', line 18

def self.register(handle)
  address = handle.to_ptr.address
  REGISTRY[address] = handle
end

.remove(address) ⇒ Object



23
24
25
# File 'lib/rdkafka/abstract_handle.rb', line 23

def self.remove(address)
  REGISTRY.delete(address)
end

Instance Method Details

#create_resultObject

Returns operation-specific result.

Returns:

  • (Object)

    operation-specific result



72
73
74
# File 'lib/rdkafka/abstract_handle.rb', line 72

def create_result
  raise "Must be implemented by subclass!"
end

#operation_nameString

Returns the name of the operation (e.g. "delivery").

Returns:

  • (String)

    the name of the operation (e.g. "delivery")



67
68
69
# File 'lib/rdkafka/abstract_handle.rb', line 67

def operation_name
  raise "Must be implemented by subclass!"
end

#pending?Boolean

Whether the handle is still pending.

Returns:

  • (Boolean)


30
31
32
# File 'lib/rdkafka/abstract_handle.rb', line 30

def pending?
  self[:pending]
end

#raise_errorObject

Allow subclasses to override



77
78
79
# File 'lib/rdkafka/abstract_handle.rb', line 77

def raise_error
  RdkafkaError.validate!(self[:response])
end

#wait(max_wait_timeout: 60, wait_timeout: 0.1, raise_response_error: true) ⇒ Object

Wait for the operation to complete or raise an error if this takes longer than the timeout. If there is a timeout this does not mean the operation failed, rdkafka might still be working on the operation. In this case it is possible to call wait again.

Parameters:

  • max_wait_timeout (Numeric, nil) (defaults to: 60)

    Amount of time to wait before timing out. If this is nil it does not time out.

  • wait_timeout (Numeric) (defaults to: 0.1)

    Amount of time we should wait before we recheck if the operation has completed

  • raise_response_error (Boolean) (defaults to: true)

    should we raise error when waiting finishes

Returns:

  • (Object)

    Operation-specific result

Raises:



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/rdkafka/abstract_handle.rb', line 46

def wait(max_wait_timeout: 60, wait_timeout: 0.1, raise_response_error: true)
  timeout = if max_wait_timeout
              CURRENT_TIME.call + max_wait_timeout
            else
              nil
            end
  loop do
    if pending?
      if timeout && timeout <= CURRENT_TIME.call
        raise WaitTimeoutError.new("Waiting for #{operation_name} timed out after #{max_wait_timeout} seconds")
      end
      sleep wait_timeout
    elsif self[:response] != 0 && raise_response_error
      raise_error
    else
      return create_result
    end
  end
end