Class: Karafka::Core::Instrumentation::CallbacksManager
- Inherits:
-
Object
- Object
- Karafka::Core::Instrumentation::CallbacksManager
- Defined in:
- lib/karafka/core/instrumentation/callbacks_manager.rb
Overview
This manager allows us to register multiple callbacks into a hook that is suppose to support a single callback
Instance Method Summary collapse
-
#add(id, callable) ⇒ Object
Adds a callback to the manager.
-
#call(*args) ⇒ Object
Invokes all the callbacks registered one after another.
-
#delete(id) ⇒ Object
Removes the callback from the manager.
- #initialize ⇒ ::Karafka::Core::Instrumentation::CallbacksManager constructor
Constructor Details
#initialize ⇒ ::Karafka::Core::Instrumentation::CallbacksManager
11 12 13 14 15 |
# File 'lib/karafka/core/instrumentation/callbacks_manager.rb', line 11 def initialize @callbacks = {} @values = [].freeze @mutex = Mutex.new end |
Instance Method Details
#add(id, callable) ⇒ Object
Adds a callback to the manager
34 35 36 37 38 39 |
# File 'lib/karafka/core/instrumentation/callbacks_manager.rb', line 34 def add(id, callable) @mutex.synchronize do @callbacks[id] = callable @values = @callbacks.values.freeze end end |
#call(*args) ⇒ Object
Copy-on-write: dispatch iterates an immutable snapshot that add/delete
rebuild and swap in under a mutex. Because #call never mutates shared state, it
needs neither a lock nor a per-call #values allocation, and a callback registered
or removed from another thread is never lost; it just takes effect on the next
#call. A cache invalidated from within #call could not be updated atomically
against this read, so a stale write-back would permanently drop callbacks.
Invokes all the callbacks registered one after another
26 27 28 |
# File 'lib/karafka/core/instrumentation/callbacks_manager.rb', line 26 def call(*args) @values.each { |callback| callback.call(*args) } end |
#delete(id) ⇒ Object
Removes the callback from the manager
43 44 45 46 47 48 |
# File 'lib/karafka/core/instrumentation/callbacks_manager.rb', line 43 def delete(id) @mutex.synchronize do @callbacks.delete(id) @values = @callbacks.values.freeze end end |