Class: Karafka::Pro::Processing::Coordinators::ErrorsTracker

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/karafka/pro/processing/coordinators/errors_tracker.rb

Overview

Object used to track errors in between executions to be able to build error-type based recovery flows.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic, partition, limit: STORAGE_LIMIT) ⇒ ErrorsTracker

Note:

‘limit` does not apply to the counts. They will work beyond the number of errors occurring

Returns a new instance of ErrorsTracker.

Parameters:

  • topic (Karafka::Routing::Topic)
  • partition (Integer)
  • limit (Integer) (defaults to: STORAGE_LIMIT)

    max number of errors we want to keep for reference when implementing custom error handling.



66
67
68
69
70
71
72
73
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 66

def initialize(topic, partition, limit: STORAGE_LIMIT)
  @errors = []
  @counts = Hash.new { |hash, key| hash[key] = 0 }
  @topic = topic
  @partition = partition
  @limit = limit
  @trace_id = SecureRandom.uuid
end

Instance Attribute Details

#countsHash (readonly)

Returns:

  • (Hash)


48
49
50
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 48

def counts
  @counts
end

#partitionInteger (readonly)

Returns partition of this error tracker.

Returns:

  • (Integer)

    partition of this error tracker



45
46
47
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 45

def partition
  @partition
end

#topicKarafka::Routing::Topic (readonly)

Returns topic of this error tracker.

Returns:



42
43
44
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 42

def topic
  @topic
end

#trace_idString (readonly)

Returns:

  • (String)


51
52
53
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 51

def trace_id
  @trace_id
end

Instance Method Details

#<<(error) ⇒ Object

Parameters:

  • error (StandardError)

    adds the error to the tracker



82
83
84
85
86
87
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 82

def <<(error)
  @errors.shift if @errors.size >= @limit
  @errors << error
  @counts[error.class] += 1
  @trace_id = SecureRandom.uuid
end

#allArray<StandardError>

Returns array with all the errors that occurred.

Returns:

  • (Array<StandardError>)

    array with all the errors that occurred



112
113
114
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 112

def all
  @errors
end

#clearObject

Clears all the errors



76
77
78
79
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 76

def clear
  @errors.clear
  @counts.clear
end

#eachObject

Iterates over errors



107
108
109
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 107

def each(&)
  @errors.each(&)
end

#empty?Boolean

Returns is the error tracker empty.

Returns:

  • (Boolean)

    is the error tracker empty



90
91
92
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 90

def empty?
  @errors.empty?
end

#lastStandardError?

Returns last error that occurred or nil if no errors.

Returns:

  • (StandardError, nil)

    last error that occurred or nil if no errors



102
103
104
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 102

def last
  @errors.last
end

#sizeInteger

Returns number of elements.

Returns:

  • (Integer)

    number of elements



95
96
97
98
99
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 95

def size
  # We use counts reference of all errors and not the `@errors` array because it allows
  # us to go beyond the whole errors storage limit
  @counts.values.sum
end