Exception: Rdkafka::RdkafkaError
- Defined in:
- lib/rdkafka/error.rb
Overview
Error returned by the underlying rdkafka library.
Direct Known Subclasses
Constant Summary collapse
- EMPTY_HASH =
Empty hash for details default allocation
{}.freeze
Instance Attribute Summary collapse
-
#broker_message ⇒ String
readonly
Error message sent by the broker.
-
#details ⇒ Hash
readonly
Optional details hash specific to a given error or empty hash if none or not supported.
-
#message_prefix ⇒ String
readonly
Prefix to be used for human readable representation.
-
#rdkafka_response ⇒ Integer
readonly
The underlying raw error response.
Class Method Summary collapse
-
.build(response_ptr_or_code, message_prefix = nil, broker_message: nil) ⇒ RdkafkaError, false
Build an error instance from various input types.
-
.build_fatal(client_ptr, fallback_error_code: -150,, fallback_message: nil) ⇒ RdkafkaError
Build a fatal error from librdkafka’s fatal error state.
-
.build_from_c(response_ptr, message_prefix = nil, broker_message: nil) ⇒ RdkafkaError, false
Build an error instance from a rd_kafka_error_t pointer.
-
.validate!(response_ptr_or_code, message_prefix = nil, broker_message: nil, client_ptr: nil) ⇒ false
Validate a response and raise an error if it indicates a failure.
Instance Method Summary collapse
-
#==(other) ⇒ Boolean
Error comparison.
-
#abortable? ⇒ Boolean
Whether this error requires the current transaction to be aborted.
-
#code ⇒ Symbol
This error’s code, for example ‘:partition_eof`, `:msg_size_too_large`.
-
#fatal? ⇒ Boolean
Whether this error is fatal and the client instance is no longer usable.
-
#initialize(response, message_prefix = nil, broker_message: nil, fatal: false, retryable: false, abortable: false, details: EMPTY_HASH) ⇒ RdkafkaError
constructor
A new instance of RdkafkaError.
-
#is_partition_eof? ⇒ Boolean
Whether this error indicates the partition is EOF.
-
#retryable? ⇒ Boolean
Whether this error is retryable and the operation may succeed if retried.
-
#to_s ⇒ String
Human readable representation of this error.
Constructor Details
#initialize(response, message_prefix = nil, broker_message: nil, fatal: false, retryable: false, abortable: false, details: EMPTY_HASH) ⇒ RdkafkaError
Returns a new instance of RdkafkaError.
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/rdkafka/error.rb', line 161 def initialize( response, = nil, broker_message: nil, fatal: false, retryable: false, abortable: false, details: EMPTY_HASH ) raise TypeError.new("Response has to be an integer") unless response.is_a? Integer @rdkafka_response = response @message_prefix = @broker_message = @fatal = fatal @retryable = retryable @abortable = abortable @details = details end |
Instance Attribute Details
#broker_message ⇒ String (readonly)
Error message sent by the broker
22 23 24 |
# File 'lib/rdkafka/error.rb', line 22 def @broker_message end |
#details ⇒ Hash (readonly)
Optional details hash specific to a given error or empty hash if none or not supported
26 27 28 |
# File 'lib/rdkafka/error.rb', line 26 def details @details end |
#message_prefix ⇒ String (readonly)
Prefix to be used for human readable representation
18 19 20 |
# File 'lib/rdkafka/error.rb', line 18 def @message_prefix end |
#rdkafka_response ⇒ Integer (readonly)
The underlying raw error response
14 15 16 |
# File 'lib/rdkafka/error.rb', line 14 def rdkafka_response @rdkafka_response end |
Class Method Details
.build(response_ptr_or_code, message_prefix = nil, broker_message: nil) ⇒ RdkafkaError, false
Build an error instance from various input types
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/rdkafka/error.rb', line 64 def build(response_ptr_or_code, = nil, broker_message: nil) case response_ptr_or_code when Integer return false if response_ptr_or_code == Bindings::RD_KAFKA_RESP_ERR_NO_ERROR new(response_ptr_or_code, , broker_message: ) when Bindings::Message return false if response_ptr_or_code[:err] == Bindings::RD_KAFKA_RESP_ERR_NO_ERROR unless response_ptr_or_code[:payload].null? ||= response_ptr_or_code[:payload].read_string(response_ptr_or_code[:len]) end details = if response_ptr_or_code[:rkt].null? EMPTY_HASH else { partition: response_ptr_or_code[:partition], offset: response_ptr_or_code[:offset], topic: Bindings.rd_kafka_topic_name(response_ptr_or_code[:rkt]) }.freeze end new( response_ptr_or_code[:err], , broker_message: , details: details ) else build_from_c(response_ptr_or_code, ) end end |
.build_fatal(client_ptr, fallback_error_code: -150,, fallback_message: nil) ⇒ RdkafkaError
Build a fatal error from librdkafka’s fatal error state. Calls rd_kafka_fatal_error() to get the actual underlying error code and description.
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/rdkafka/error.rb', line 132 def build_fatal(client_ptr, fallback_error_code: -150, fallback_message: nil) fatal_error_details = Rdkafka::Bindings.extract_fatal_error(client_ptr) if fatal_error_details new( fatal_error_details[:error_code], broker_message: fatal_error_details[:error_string], fatal: true ) else # Fallback: if extract_fatal_error returns nil (shouldn't happen in practice), # the error code itself still indicates a fatal condition new( fallback_error_code, broker_message: , fatal: true ) end end |
.build_from_c(response_ptr, message_prefix = nil, broker_message: nil) ⇒ RdkafkaError, false
Build an error instance from a rd_kafka_error_t pointer
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/rdkafka/error.rb', line 35 def build_from_c(response_ptr, = nil, broker_message: nil) code = Rdkafka::Bindings.rd_kafka_error_code(response_ptr) return false if code == Bindings::RD_KAFKA_RESP_ERR_NO_ERROR = || Rdkafka::Bindings.rd_kafka_err2str(code) fatal = !Rdkafka::Bindings.rd_kafka_error_is_fatal(response_ptr).zero? retryable = !Rdkafka::Bindings.rd_kafka_error_is_retriable(response_ptr).zero? abortable = !Rdkafka::Bindings.rd_kafka_error_txn_requires_abort(response_ptr).zero? Rdkafka::Bindings.rd_kafka_error_destroy(response_ptr) new( code, , broker_message: , fatal: fatal, retryable: retryable, abortable: abortable ) end |
.validate!(response_ptr_or_code, message_prefix = nil, broker_message: nil, client_ptr: nil) ⇒ false
Validate a response and raise an error if it indicates a failure
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/rdkafka/error.rb', line 107 def validate!(response_ptr_or_code, = nil, broker_message: nil, client_ptr: nil) error = build(response_ptr_or_code, , broker_message: ) return false unless error # Auto-detect and handle fatal errors (-150) if error.rdkafka_response == Bindings::RD_KAFKA_RESP_ERR__FATAL && client_ptr # Discover the underlying fatal error from librdkafka error = build_fatal( client_ptr, fallback_error_code: error.rdkafka_response, fallback_message: ) end raise error end |
Instance Method Details
#==(other) ⇒ Boolean
Error comparison
218 219 220 |
# File 'lib/rdkafka/error.rb', line 218 def ==(other) other.is_a?(self.class) && (to_s == other.to_s) end |
#abortable? ⇒ Boolean
Whether this error requires the current transaction to be aborted
236 237 238 |
# File 'lib/rdkafka/error.rb', line 236 def abortable? @abortable end |
#code ⇒ Symbol
This error’s code, for example ‘:partition_eof`, `:msg_size_too_large`.
182 183 184 185 186 187 188 189 |
# File 'lib/rdkafka/error.rb', line 182 def code code = Rdkafka::Bindings.rd_kafka_err2name(@rdkafka_response).downcase if code[0] == "_" code[1..].to_sym else code.to_sym end end |
#fatal? ⇒ Boolean
Whether this error is fatal and the client instance is no longer usable
224 225 226 |
# File 'lib/rdkafka/error.rb', line 224 def fatal? @fatal end |
#is_partition_eof? ⇒ Boolean
Whether this error indicates the partition is EOF.
211 212 213 |
# File 'lib/rdkafka/error.rb', line 211 def is_partition_eof? code == :partition_eof end |
#retryable? ⇒ Boolean
Whether this error is retryable and the operation may succeed if retried
230 231 232 |
# File 'lib/rdkafka/error.rb', line 230 def retryable? @retryable end |
#to_s ⇒ String
Human readable representation of this error.
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/rdkafka/error.rb', line 193 def to_s = if "#{} - " else "" end err_str = Rdkafka::Bindings.rd_kafka_err2str(@rdkafka_response) base = "#{}#{err_str} (#{code})" return base if .nil? return base if .empty? "#{base}\n#{}" end |