Class: Karafka::Core::Monitoring::Monitor

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/core/monitoring/monitor.rb

Overview

Karafka monitor that can be used to pass through instrumentation calls to selected notifications bus.

It provides abstraction layer that allows us to use both our internal notifications as well as ‘ActiveSupport::Notifications`.

Instance Method Summary collapse

Constructor Details

#initialize(notifications_bus, namespace = nil) ⇒ Monitor

Returns a new instance of Monitor.

Parameters:

  • notifications_bus (Object)

    either our internal notifications bus or ‘ActiveSupport::Notifications`

  • namespace (String, nil) (defaults to: nil)

    namespace for events or nil if no namespace



20
21
22
23
24
# File 'lib/karafka/core/monitoring/monitor.rb', line 20

def initialize(notifications_bus, namespace = nil)
  @notifications_bus = notifications_bus
  @namespace = namespace
  @mapped_events = {}
end

Instance Method Details

#instrument(event_id, payload = EMPTY_HASH) ⇒ Object

Passes the instrumentation block (if any) into the notifications bus

Parameters:

  • event_id (String, Symbol)

    event id

  • payload (Hash) (defaults to: EMPTY_HASH)


30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/karafka/core/monitoring/monitor.rb', line 30

def instrument(event_id, payload = EMPTY_HASH, &)
  # With no namespace, string event ids already are the full event names. This is the
  # case for all the events in the Karafka ecosystem, so we can skip the mapping hash
  # lookup on this hot path. Symbols still go through the mapping to be converted into
  # strings without allocating on each call.
  full_event_name = if @namespace.nil? && event_id.is_a?(String)
    event_id
  else
    @mapped_events[event_id] ||= [event_id, @namespace].compact.join(".")
  end

  @notifications_bus.instrument(full_event_name, payload, &)
end

#listenersHash{String => Array}

Note:

Please do not modify this hash. It should be used only for debugging.

Returns hash where keys are events and values are arrays with listeners subscribed to particular events. Since different events may have different listeners, this is returned that way.

Examples:

If you need to get only classes of listeners, you can run following code:

monitor.listeners.map(&:class)

Returns:

  • (Hash{String => Array})

    hash where keys are events and values are arrays with listeners subscribed to particular events. Since different events may have different listeners, this is returned that way.



65
66
67
# File 'lib/karafka/core/monitoring/monitor.rb', line 65

def listeners
  @notifications_bus.listeners
end

#subscribeObject

Allows us to subscribe to the notification bus



45
46
47
# File 'lib/karafka/core/monitoring/monitor.rb', line 45

def subscribe(*, &)
  @notifications_bus.subscribe(*, &)
end

#unsubscribe(listener_or_block) ⇒ Object

Allows for removal of whatever was subscribed

Parameters:

  • listener_or_block (Object)

    object that is subscribed whether this is a listener instance or a block.



53
54
55
# File 'lib/karafka/core/monitoring/monitor.rb', line 53

def unsubscribe(listener_or_block)
  @notifications_bus.unsubscribe(listener_or_block)
end