Exception: Rdkafka::RdkafkaError

Inherits:
BaseError
  • Object
show all
Defined in:
lib/rdkafka/error.rb

Overview

Error returned by the underlying rdkafka library.

Direct Known Subclasses

RdkafkaTopicPartitionListError

Constant Summary collapse

EMPTY_HASH =

Empty hash for details default allocation

{}.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(response, message_prefix = nil, broker_message: nil, fatal: false, retryable: false, abortable: false, details: EMPTY_HASH) ⇒ RdkafkaError

Returns a new instance of RdkafkaError.

Parameters:

  • response (Integer)

    the raw error response code from librdkafka

  • message_prefix (String, nil) (defaults to: nil)

    optional prefix for error messages

  • broker_message (String, nil) (defaults to: nil)

    optional error message from the broker

  • fatal (Boolean) (defaults to: false)

    whether this is a fatal error

  • retryable (Boolean) (defaults to: false)

    whether this error is retryable

  • abortable (Boolean) (defaults to: false)

    whether this error requires transaction abort

  • details (Hash) (defaults to: EMPTY_HASH)

    additional error details

Raises:

  • (TypeError)


161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/rdkafka/error.rb', line 161

def initialize(
  response,
  message_prefix = nil,
  broker_message: nil,
  fatal: false,
  retryable: false,
  abortable: false,
  details: EMPTY_HASH
)
  raise TypeError.new("Response has to be an integer") unless response.is_a? Integer
  @rdkafka_response = response
  @message_prefix = message_prefix
  @broker_message = broker_message
  @fatal = fatal
  @retryable = retryable
  @abortable = abortable
  @details = details
end

Instance Attribute Details

#broker_messageString (readonly)

Error message sent by the broker

Returns:

  • (String)


22
23
24
# File 'lib/rdkafka/error.rb', line 22

def broker_message
  @broker_message
end

#detailsHash (readonly)

Optional details hash specific to a given error or empty hash if none or not supported

Returns:

  • (Hash)


26
27
28
# File 'lib/rdkafka/error.rb', line 26

def details
  @details
end

#message_prefixString (readonly)

Prefix to be used for human readable representation

Returns:

  • (String)


18
19
20
# File 'lib/rdkafka/error.rb', line 18

def message_prefix
  @message_prefix
end

#rdkafka_responseInteger (readonly)

The underlying raw error response

Returns:

  • (Integer)


14
15
16
# File 'lib/rdkafka/error.rb', line 14

def rdkafka_response
  @rdkafka_response
end

Class Method Details

.build(response_ptr_or_code, message_prefix = nil, broker_message: nil) ⇒ RdkafkaError, false

Build an error instance from various input types

Parameters:

  • response_ptr_or_code (Integer, FFI::Pointer, Bindings::Message)

    Error code, pointer, or message struct

  • message_prefix (String, nil) (defaults to: nil)

    Optional prefix for the error message

  • broker_message (String, nil) (defaults to: nil)

    Optional broker error message

Returns:

  • (RdkafkaError, false)

    Error instance or false if no error



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/rdkafka/error.rb', line 64

def build(response_ptr_or_code, message_prefix = nil, broker_message: nil)
  case response_ptr_or_code
  when Integer
    return false if response_ptr_or_code == Bindings::RD_KAFKA_RESP_ERR_NO_ERROR

    new(response_ptr_or_code, message_prefix, broker_message: broker_message)
  when Bindings::Message
    return false if response_ptr_or_code[:err] == Bindings::RD_KAFKA_RESP_ERR_NO_ERROR

    unless response_ptr_or_code[:payload].null?
      message_prefix ||= response_ptr_or_code[:payload].read_string(response_ptr_or_code[:len])
    end

    details = if response_ptr_or_code[:rkt].null?
      EMPTY_HASH
    else
      {
        partition: response_ptr_or_code[:partition],
        offset: response_ptr_or_code[:offset],
        topic: Bindings.rd_kafka_topic_name(response_ptr_or_code[:rkt])
      }.freeze
    end
    new(
      response_ptr_or_code[:err],
      message_prefix,
      broker_message: broker_message,
      details: details
    )
  else
    build_from_c(response_ptr_or_code, message_prefix)
  end
end

.build_fatal(client_ptr, fallback_error_code: -150,, fallback_message: nil) ⇒ RdkafkaError

Build a fatal error from librdkafka’s fatal error state. Calls rd_kafka_fatal_error() to get the actual underlying error code and description.

Parameters:

  • client_ptr (FFI::Pointer)

    Pointer to rd_kafka_t client

  • fallback_error_code (Integer) (defaults to: -150,)

    Error code to use if no fatal error found (default: -150)

  • fallback_message (String, nil) (defaults to: nil)

    Message to use if no fatal error found

Returns:

  • (RdkafkaError)

    Error object with fatal flag set to true



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/rdkafka/error.rb', line 132

def build_fatal(client_ptr, fallback_error_code: -150, fallback_message: nil)
  fatal_error_details = Rdkafka::Bindings.extract_fatal_error(client_ptr)

  if fatal_error_details
    new(
      fatal_error_details[:error_code],
      broker_message: fatal_error_details[:error_string],
      fatal: true
    )
  else
    # Fallback: if extract_fatal_error returns nil (shouldn't happen in practice),
    # the error code itself still indicates a fatal condition
    new(
      fallback_error_code,
      broker_message: fallback_message,
      fatal: true
    )
  end
end

.build_from_c(response_ptr, message_prefix = nil, broker_message: nil) ⇒ RdkafkaError, false

Build an error instance from a rd_kafka_error_t pointer

Parameters:

  • response_ptr (FFI::Pointer)

    Pointer to rd_kafka_error_t

  • message_prefix (String, nil) (defaults to: nil)

    Optional prefix for the error message

  • broker_message (String, nil) (defaults to: nil)

    Optional broker error message

Returns:

  • (RdkafkaError, false)

    Error instance or false if no error



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/rdkafka/error.rb', line 35

def build_from_c(response_ptr, message_prefix = nil, broker_message: nil)
  code = Rdkafka::Bindings.rd_kafka_error_code(response_ptr)

  return false if code == Bindings::RD_KAFKA_RESP_ERR_NO_ERROR

  message = broker_message || Rdkafka::Bindings.rd_kafka_err2str(code)
  fatal = !Rdkafka::Bindings.rd_kafka_error_is_fatal(response_ptr).zero?
  retryable = !Rdkafka::Bindings.rd_kafka_error_is_retriable(response_ptr).zero?
  abortable = !Rdkafka::Bindings.rd_kafka_error_txn_requires_abort(response_ptr).zero?

  Rdkafka::Bindings.rd_kafka_error_destroy(response_ptr)

  new(
    code,
    message_prefix,
    broker_message: message,
    fatal: fatal,
    retryable: retryable,
    abortable: abortable
  )
end

.validate!(response_ptr_or_code, message_prefix = nil, broker_message: nil, client_ptr: nil) ⇒ false

Validate a response and raise an error if it indicates a failure

Parameters:

  • response_ptr_or_code (Integer, FFI::Pointer, Bindings::Message)

    Error code, pointer, or message struct

  • message_prefix (String, nil) (defaults to: nil)

    Optional prefix for the error message

  • broker_message (String, nil) (defaults to: nil)

    Optional broker error message

  • client_ptr (FFI::Pointer, nil) (defaults to: nil)

    Optional pointer to rd_kafka_t client for fatal error detection

Returns:

  • (false)

    Returns false if no error

Raises:



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/rdkafka/error.rb', line 107

def validate!(response_ptr_or_code, message_prefix = nil, broker_message: nil, client_ptr: nil)
  error = build(response_ptr_or_code, message_prefix, broker_message: broker_message)

  return false unless error

  # Auto-detect and handle fatal errors (-150)
  if error.rdkafka_response == Bindings::RD_KAFKA_RESP_ERR__FATAL && client_ptr
    # Discover the underlying fatal error from librdkafka
    error = build_fatal(
      client_ptr,
      fallback_error_code: error.rdkafka_response,
      fallback_message: broker_message
    )
  end

  raise error
end

Instance Method Details

#==(other) ⇒ Boolean

Error comparison

Parameters:

  • other (Object)

    object to compare with

Returns:

  • (Boolean)


218
219
220
# File 'lib/rdkafka/error.rb', line 218

def ==(other)
  other.is_a?(self.class) && (to_s == other.to_s)
end

#abortable?Boolean

Whether this error requires the current transaction to be aborted

Returns:

  • (Boolean)


236
237
238
# File 'lib/rdkafka/error.rb', line 236

def abortable?
  @abortable
end

#codeSymbol

This error’s code, for example ‘:partition_eof`, `:msg_size_too_large`.

Returns:

  • (Symbol)


182
183
184
185
186
187
188
189
# File 'lib/rdkafka/error.rb', line 182

def code
  code = Rdkafka::Bindings.rd_kafka_err2name(@rdkafka_response).downcase
  if code[0] == "_"
    code[1..].to_sym
  else
    code.to_sym
  end
end

#fatal?Boolean

Whether this error is fatal and the client instance is no longer usable

Returns:

  • (Boolean)


224
225
226
# File 'lib/rdkafka/error.rb', line 224

def fatal?
  @fatal
end

#is_partition_eof?Boolean

Whether this error indicates the partition is EOF.

Returns:

  • (Boolean)


211
212
213
# File 'lib/rdkafka/error.rb', line 211

def is_partition_eof?
  code == :partition_eof
end

#retryable?Boolean

Whether this error is retryable and the operation may succeed if retried

Returns:

  • (Boolean)


230
231
232
# File 'lib/rdkafka/error.rb', line 230

def retryable?
  @retryable
end

#to_sString

Human readable representation of this error.

Returns:

  • (String)


193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/rdkafka/error.rb', line 193

def to_s
  message_prefix_part = if message_prefix
    "#{message_prefix} - "
  else
    ""
  end

  err_str = Rdkafka::Bindings.rd_kafka_err2str(@rdkafka_response)
  base = "#{message_prefix_part}#{err_str} (#{code})"

  return base if broker_message.nil?
  return base if broker_message.empty?

  "#{base}\n#{broker_message}"
end