Class: Karafka::Pro::Processing::Filters::Base

Inherits:
Object
  • Object
show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/pro/processing/filters/base.rb

Overview

Base for all the filters. All filters (including custom) need to use this API.

Due to the fact, that filters can limit data in such a way, that we need to pause or seek (throttling for example), the api is not just “remove some things from batch” but also provides ways to control the post-filtering operations that may be needed.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBase

Initializes the filter as not yet applied



49
50
51
52
# File 'lib/karafka/pro/processing/filters/base.rb', line 49

def initialize
  @applied = false
  @cursor = nil
end

Instance Attribute Details

#cursorKarafka::Messages::Message? (readonly)

Returns the message that we want to use as a cursor one to pause or seek or nil if not applicable.

Returns:

  • (Karafka::Messages::Message, nil)

    the message that we want to use as a cursor one to pause or seek or nil if not applicable.



44
45
46
# File 'lib/karafka/pro/processing/filters/base.rb', line 44

def cursor
  @cursor
end

Instance Method Details

#actionSymbol

Returns filter post-execution action on consumer. Either ‘:skip`, `:pause` or `:seek`.

Returns:

  • (Symbol)

    filter post-execution action on consumer. Either ‘:skip`, `:pause` or `:seek`.



62
63
64
# File 'lib/karafka/pro/processing/filters/base.rb', line 62

def action
  :skip
end

#applied?Boolean

Returns did this filter change messages in any way.

Returns:

  • (Boolean)

    did this filter change messages in any way



67
68
69
# File 'lib/karafka/pro/processing/filters/base.rb', line 67

def applied?
  @applied
end

#apply!(messages) ⇒ Object

Parameters:

  • messages (Array<Karafka::Messages::Message>)

    array with messages. Please keep in mind, this may already be partial due to execution of previous filters.

Raises:

  • (NotImplementedError)


56
57
58
# File 'lib/karafka/pro/processing/filters/base.rb', line 56

def apply!(messages)
  raise NotImplementedError, "Implement in a subclass"
end

#mark_as_consumed?Boolean

Returns should we use the cursor value to mark as consumed. If any of the filters returns true, we return lowers applicable cursor value (if any).

Returns:

  • (Boolean)

    should we use the cursor value to mark as consumed. If any of the filters returns true, we return lowers applicable cursor value (if any)



80
81
82
# File 'lib/karafka/pro/processing/filters/base.rb', line 80

def mark_as_consumed?
  false
end

#marking_cursorKarafka::Messages::Message?

Returns cursor message for marking or nil if no marking.

Returns:



92
93
94
# File 'lib/karafka/pro/processing/filters/base.rb', line 92

def marking_cursor
  cursor
end

#marking_methodSymbol

Returns ‘:mark_as_consumed` or `:mark_as_consumed!`. Applicable only if marking is requested.

Returns:

  • (Symbol)

    ‘:mark_as_consumed` or `:mark_as_consumed!`. Applicable only if marking is requested



86
87
88
# File 'lib/karafka/pro/processing/filters/base.rb', line 86

def marking_method
  :mark_as_consumed
end

#timeoutInteger?

Note:

Please do not return ‘0` when your filter is not pausing as it may interact with other filters that want to pause.

Returns default timeout for pausing (if applicable) or nil if not.

Returns:

  • (Integer, nil)

    default timeout for pausing (if applicable) or nil if not



74
75
76
# File 'lib/karafka/pro/processing/filters/base.rb', line 74

def timeout
  nil
end