Exception: Rdkafka::RdkafkaError
- Defined in:
- lib/rdkafka/error.rb
Overview
Error returned by the underlying rdkafka library.
Direct Known Subclasses
Constant Summary collapse
- EMPTY_HASH =
Empty hash for details default allocation
{}.freeze
Instance Attribute Summary collapse
-
#broker_message ⇒ String
readonly
Error message sent by the broker.
-
#details ⇒ Hash
readonly
Optional details hash specific to a given error or empty hash if none or not supported.
-
#instance_name ⇒ String?
readonly
The name of the rdkafka instance that generated this error.
-
#message_prefix ⇒ String
readonly
Prefix to be used for human readable representation.
-
#rdkafka_response ⇒ Integer
readonly
The underlying raw error response.
Class Method Summary collapse
-
.build(response_ptr_or_code, message_prefix = nil, broker_message: nil, instance_name: nil) ⇒ RdkafkaError, false
Build an error instance from various input types.
-
.build_fatal(client_ptr, fallback_error_code: -150,, fallback_message: nil, instance_name: nil) ⇒ RdkafkaError
Build a fatal error from librdkafka’s fatal error state.
-
.build_from_c(response_ptr, message_prefix = nil, broker_message: nil) ⇒ RdkafkaError, false
Build an error instance from a rd_kafka_error_t pointer.
-
.validate!(response_ptr_or_code, message_prefix = nil, broker_message: nil, client_ptr: nil) ⇒ false
Validate a response and raise an error if it indicates a failure.
Instance Method Summary collapse
-
#==(other) ⇒ Boolean
Error comparison.
-
#abortable? ⇒ Boolean
Whether this error requires the current transaction to be aborted.
-
#code ⇒ Symbol
This error’s code, for example ‘:partition_eof`, `:msg_size_too_large`.
-
#fatal? ⇒ Boolean
Whether this error is fatal and the client instance is no longer usable.
-
#initialize(response, message_prefix = nil, broker_message: nil, fatal: false, retryable: false, abortable: false, details: EMPTY_HASH, instance_name: nil) ⇒ RdkafkaError
constructor
A new instance of RdkafkaError.
-
#is_partition_eof? ⇒ Boolean
Whether this error indicates the partition is EOF.
-
#retryable? ⇒ Boolean
Whether this error is retryable and the operation may succeed if retried.
-
#to_s ⇒ String
Human readable representation of this error.
Constructor Details
#initialize(response, message_prefix = nil, broker_message: nil, fatal: false, retryable: false, abortable: false, details: EMPTY_HASH, instance_name: nil) ⇒ RdkafkaError
Returns a new instance of RdkafkaError.
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/rdkafka/error.rb', line 170 def initialize( response, = nil, broker_message: nil, fatal: false, retryable: false, abortable: false, details: EMPTY_HASH, instance_name: nil ) raise TypeError.new("Response has to be an integer") unless response.is_a? Integer @rdkafka_response = response @message_prefix = @broker_message = @fatal = fatal @retryable = retryable @abortable = abortable @details = details @instance_name = instance_name end |
Instance Attribute Details
#broker_message ⇒ String (readonly)
Error message sent by the broker
22 23 24 |
# File 'lib/rdkafka/error.rb', line 22 def @broker_message end |
#details ⇒ Hash (readonly)
Optional details hash specific to a given error or empty hash if none or not supported
26 27 28 |
# File 'lib/rdkafka/error.rb', line 26 def details @details end |
#instance_name ⇒ String? (readonly)
The name of the rdkafka instance that generated this error
30 31 32 |
# File 'lib/rdkafka/error.rb', line 30 def instance_name @instance_name end |
#message_prefix ⇒ String (readonly)
Prefix to be used for human readable representation
18 19 20 |
# File 'lib/rdkafka/error.rb', line 18 def @message_prefix end |
#rdkafka_response ⇒ Integer (readonly)
The underlying raw error response
14 15 16 |
# File 'lib/rdkafka/error.rb', line 14 def rdkafka_response @rdkafka_response end |
Class Method Details
.build(response_ptr_or_code, message_prefix = nil, broker_message: nil, instance_name: nil) ⇒ RdkafkaError, false
Build an error instance from various input types
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/rdkafka/error.rb', line 69 def build(response_ptr_or_code, = nil, broker_message: nil, instance_name: nil) case response_ptr_or_code when Integer return false if response_ptr_or_code == Bindings::RD_KAFKA_RESP_ERR_NO_ERROR new(response_ptr_or_code, , broker_message: , instance_name: instance_name) when Bindings::Message return false if response_ptr_or_code[:err] == Bindings::RD_KAFKA_RESP_ERR_NO_ERROR unless response_ptr_or_code[:payload].null? ||= response_ptr_or_code[:payload].read_string(response_ptr_or_code[:len]) end details = if response_ptr_or_code[:rkt].null? EMPTY_HASH else { partition: response_ptr_or_code[:partition], offset: response_ptr_or_code[:offset], topic: Bindings.rd_kafka_topic_name(response_ptr_or_code[:rkt]) }.freeze end new( response_ptr_or_code[:err], , broker_message: , details: details ) else build_from_c(response_ptr_or_code, ) end end |
.build_fatal(client_ptr, fallback_error_code: -150,, fallback_message: nil, instance_name: nil) ⇒ RdkafkaError
Build a fatal error from librdkafka’s fatal error state. Calls rd_kafka_fatal_error() to get the actual underlying error code and description.
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/rdkafka/error.rb', line 138 def build_fatal(client_ptr, fallback_error_code: -150, fallback_message: nil, instance_name: nil) fatal_error_details = Rdkafka::Bindings.extract_fatal_error(client_ptr) if fatal_error_details new( fatal_error_details[:error_code], broker_message: fatal_error_details[:error_string], fatal: true, instance_name: instance_name ) else # Fallback: if extract_fatal_error returns nil (shouldn't happen in practice), # the error code itself still indicates a fatal condition new( fallback_error_code, broker_message: , fatal: true, instance_name: instance_name ) end end |
.build_from_c(response_ptr, message_prefix = nil, broker_message: nil) ⇒ RdkafkaError, false
Build an error instance from a rd_kafka_error_t pointer
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/rdkafka/error.rb', line 39 def build_from_c(response_ptr, = nil, broker_message: nil) code = Rdkafka::Bindings.rd_kafka_error_code(response_ptr) return false if code == Bindings::RD_KAFKA_RESP_ERR_NO_ERROR = || Rdkafka::Bindings.rd_kafka_err2str(code) fatal = !Rdkafka::Bindings.rd_kafka_error_is_fatal(response_ptr).zero? retryable = !Rdkafka::Bindings.rd_kafka_error_is_retriable(response_ptr).zero? abortable = !Rdkafka::Bindings.rd_kafka_error_txn_requires_abort(response_ptr).zero? Rdkafka::Bindings.rd_kafka_error_destroy(response_ptr) new( code, , broker_message: , fatal: fatal, retryable: retryable, abortable: abortable ) end |
.validate!(response_ptr_or_code, message_prefix = nil, broker_message: nil, client_ptr: nil) ⇒ false
Validate a response and raise an error if it indicates a failure
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/rdkafka/error.rb', line 112 def validate!(response_ptr_or_code, = nil, broker_message: nil, client_ptr: nil) error = build(response_ptr_or_code, , broker_message: ) return false unless error # Auto-detect and handle fatal errors (-150) if error.rdkafka_response == Bindings::RD_KAFKA_RESP_ERR__FATAL && client_ptr # Discover the underlying fatal error from librdkafka error = build_fatal( client_ptr, fallback_error_code: error.rdkafka_response, fallback_message: ) end raise error end |
Instance Method Details
#==(other) ⇒ Boolean
Error comparison
235 236 237 |
# File 'lib/rdkafka/error.rb', line 235 def ==(other) other.is_a?(self.class) && (to_s == other.to_s) end |
#abortable? ⇒ Boolean
Whether this error requires the current transaction to be aborted
253 254 255 |
# File 'lib/rdkafka/error.rb', line 253 def abortable? @abortable end |
#code ⇒ Symbol
This error’s code, for example ‘:partition_eof`, `:msg_size_too_large`.
193 194 195 196 197 198 199 200 |
# File 'lib/rdkafka/error.rb', line 193 def code code = Rdkafka::Bindings.rd_kafka_err2name(@rdkafka_response).downcase if code[0] == "_" code[1..].to_sym else code.to_sym end end |
#fatal? ⇒ Boolean
Whether this error is fatal and the client instance is no longer usable
241 242 243 |
# File 'lib/rdkafka/error.rb', line 241 def fatal? @fatal end |
#is_partition_eof? ⇒ Boolean
Whether this error indicates the partition is EOF.
228 229 230 |
# File 'lib/rdkafka/error.rb', line 228 def is_partition_eof? code == :partition_eof end |
#retryable? ⇒ Boolean
Whether this error is retryable and the operation may succeed if retried
247 248 249 |
# File 'lib/rdkafka/error.rb', line 247 def retryable? @retryable end |
#to_s ⇒ String
Human readable representation of this error.
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/rdkafka/error.rb', line 204 def to_s = if "#{} - " else "" end instance_name_part = if instance_name " [#{instance_name}]" else "" end err_str = Rdkafka::Bindings.rd_kafka_err2str(@rdkafka_response) base = "#{}#{err_str} (#{code})#{instance_name_part}" return base if .nil? return base if .empty? "#{base}\n#{}" end |