Exception: Rdkafka::RdkafkaError

Inherits:
BaseError
  • Object
show all
Defined in:
lib/rdkafka/error.rb

Overview

Error returned by the underlying rdkafka library.

Direct Known Subclasses

RdkafkaTopicPartitionListError

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#broker_messageString (readonly)

Error message sent by the broker

Returns:

  • (String)


19
20
21
# File 'lib/rdkafka/error.rb', line 19

def broker_message
  @broker_message
end

#message_prefixString (readonly)

Prefix to be used for human readable representation

Returns:

  • (String)


15
16
17
# File 'lib/rdkafka/error.rb', line 15

def message_prefix
  @message_prefix
end

#rdkafka_responseInteger (readonly)

The underlying raw error response

Returns:

  • (Integer)


11
12
13
# File 'lib/rdkafka/error.rb', line 11

def rdkafka_response
  @rdkafka_response
end

Class Method Details

.build(response_ptr_or_code, message_prefix = nil, broker_message: nil) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/rdkafka/error.rb', line 44

def build(response_ptr_or_code, message_prefix = nil, broker_message: nil)
  case response_ptr_or_code
  when Integer
    return false if response_ptr_or_code.zero?

    new(response_ptr_or_code, message_prefix, broker_message: broker_message)
  when Bindings::Message
    return false if response_ptr_or_code[:err].zero?

    unless response_ptr_or_code[:payload].null?
      message_prefix ||= response_ptr_or_code[:payload].read_string(response_ptr_or_code[:len])
    end

    new(response_ptr_or_code[:err], message_prefix, broker_message: broker_message)
  else
    build_from_c(response_ptr_or_code, message_prefix)
  end
end

.build_from_c(response_ptr, message_prefix = nil, broker_message: nil) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/rdkafka/error.rb', line 22

def build_from_c(response_ptr, message_prefix = nil, broker_message: nil)
  code = Rdkafka::Bindings.rd_kafka_error_code(response_ptr)

  return false if code.zero?

  message = broker_message || Rdkafka::Bindings.rd_kafka_err2str(code)
  fatal = !Rdkafka::Bindings.rd_kafka_error_is_fatal(response_ptr).zero?
  retryable = !Rdkafka::Bindings.rd_kafka_error_is_retriable(response_ptr).zero?
  abortable = !Rdkafka::Bindings.rd_kafka_error_txn_requires_abort(response_ptr).zero?

  Rdkafka::Bindings.rd_kafka_error_destroy(response_ptr)

  new(
    code,
    message_prefix,
    broker_message: message,
    fatal: fatal,
    retryable: retryable,
    abortable: abortable
  )
end

.validate!(response_ptr_or_code, message_prefix = nil, broker_message: nil) ⇒ Object



63
64
65
66
# File 'lib/rdkafka/error.rb', line 63

def validate!(response_ptr_or_code, message_prefix = nil, broker_message: nil)
  error = build(response_ptr_or_code, message_prefix, broker_message: broker_message)
  error ? raise(error) : false
end

Instance Method Details

#==(another_error) ⇒ Object

Error comparison



116
117
118
# File 'lib/rdkafka/error.rb', line 116

def ==(another_error)
   another_error.is_a?(self.class) && (self.to_s == another_error.to_s)
end

#abortable?Boolean

Returns:

  • (Boolean)


128
129
130
# File 'lib/rdkafka/error.rb', line 128

def abortable?
  @abortable
end

#codeSymbol

This error's code, for example :partition_eof, :msg_size_too_large.

Returns:

  • (Symbol)


89
90
91
92
93
94
95
96
# File 'lib/rdkafka/error.rb', line 89

def code
  code = Rdkafka::Bindings.rd_kafka_err2name(@rdkafka_response).downcase
  if code[0] == "_"
    code[1..-1].to_sym
  else
    code.to_sym
  end
end

#fatal?Boolean

Returns:

  • (Boolean)


120
121
122
# File 'lib/rdkafka/error.rb', line 120

def fatal?
  @fatal
end

#is_partition_eof?Boolean

Whether this error indicates the partition is EOF.

Returns:

  • (Boolean)


111
112
113
# File 'lib/rdkafka/error.rb', line 111

def is_partition_eof?
  code == :partition_eof
end

#retryable?Boolean

Returns:

  • (Boolean)


124
125
126
# File 'lib/rdkafka/error.rb', line 124

def retryable?
  @retryable
end

#to_sString

Human readable representation of this error.

Returns:

  • (String)


100
101
102
103
104
105
106
107
# File 'lib/rdkafka/error.rb', line 100

def to_s
  message_prefix_part = if message_prefix
                   "#{message_prefix} - "
                 else
                   ''
                 end
  "#{message_prefix_part}#{Rdkafka::Bindings.rd_kafka_err2str(@rdkafka_response)} (#{code})"
end