Class: Karafka::Core::Instrumentation::CallbacksManager

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/core/instrumentation/callbacks_manager.rb

Overview

This manager allows us to register multiple callbacks into a hook that is suppose to support a single callback

Instance Method Summary collapse

Constructor Details

#initialize::Karafka::Core::Instrumentation::CallbacksManager



11
12
13
14
15
# File 'lib/karafka/core/instrumentation/callbacks_manager.rb', line 11

def initialize
  @callbacks = {}
  @values = [].freeze
  @mutex = Mutex.new
end

Instance Method Details

#add(id, callable) ⇒ Object

Adds a callback to the manager

Parameters:

  • id (String)

    id of the callback (used when deleting it)

  • callable (#call)

    object that responds to a #call method



34
35
36
37
38
39
# File 'lib/karafka/core/instrumentation/callbacks_manager.rb', line 34

def add(id, callable)
  @mutex.synchronize do
    @callbacks[id] = callable
    @values = @callbacks.values.freeze
  end
end

#call(*args) ⇒ Object

Note:

Copy-on-write: dispatch iterates an immutable snapshot that add/delete rebuild and swap in under a mutex. Because #call never mutates shared state, it needs neither a lock nor a per-call #values allocation, and a callback registered or removed from another thread is never lost; it just takes effect on the next #call. A cache invalidated from within #call could not be updated atomically against this read, so a stale write-back would permanently drop callbacks.

Invokes all the callbacks registered one after another

Parameters:

  • args (Object)

    any args that should go to the callbacks



26
27
28
# File 'lib/karafka/core/instrumentation/callbacks_manager.rb', line 26

def call(*args)
  @values.each { |callback| callback.call(*args) }
end

#delete(id) ⇒ Object

Removes the callback from the manager

Parameters:

  • id (String)

    id of the callback we want to remove



43
44
45
46
47
48
# File 'lib/karafka/core/instrumentation/callbacks_manager.rb', line 43

def delete(id)
  @mutex.synchronize do
    @callbacks.delete(id)
    @values = @callbacks.values.freeze
  end
end