Class: Karafka::Core::Monitoring::Notifications

Inherits:
Object
  • Object
show all
Includes:
Helpers::Time
Defined in:
lib/karafka/core/monitoring/notifications.rb

Overview

A simple notifications layer for Karafka ecosystem that aims to provide API compatible with both ActiveSupport::Notifications and dry-monitor.

We do not use any of them by default as our use-case is fairly simple and we do not want to have too many external dependencies.

Constant Summary collapse

EventNotRegistered =

Raised when someone wants to publish event that was not registered

Class.new(StandardError)

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helpers::Time

#float_now, #monotonic_now

Constructor Details

#initializeNotifications

Returns a new instance of Notifications.



24
25
26
27
28
29
# File 'lib/karafka/core/monitoring/notifications.rb', line 24

def initialize
  @listeners = {}
  @mutex = Mutex.new
  # This allows us to optimize the method calling lookups
  @events_methods_map = {}
end

Instance Attribute Details

#listenersObject (readonly)

Returns the value of attribute listeners.



14
15
16
# File 'lib/karafka/core/monitoring/notifications.rb', line 14

def listeners
  @listeners
end

#nameObject (readonly)

Returns the value of attribute name.



14
15
16
# File 'lib/karafka/core/monitoring/notifications.rb', line 14

def name
  @name
end

Instance Method Details

#available_eventsArray<String>

Returns list of available events.

Returns:

  • (Array<String>)

    list of available events



32
33
34
# File 'lib/karafka/core/monitoring/notifications.rb', line 32

def available_events
  @events_methods_map.keys
end

#clear(event_id = nil) ⇒ Object

Clears all the subscribed listeners. If given an event, only clear listeners for the given event type.

Parameters:

  • event_id (String) (defaults to: nil)

    the key of the event to clear listeners for.



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/karafka/core/monitoring/notifications.rb', line 49

def clear(event_id = nil)
  @mutex.synchronize do
    # Copy-on-write: replace the per-event arrays rather than mutating them in place, so a
    # dispatch iterating a previously captured array is unaffected (see #notify_listeners).
    unless event_id
      @listeners.transform_values! { [] }
      return
    end

    if @listeners.key?(event_id)
      @listeners[event_id] = []
      return
    end

    raise(EventNotRegistered, "#{event_id} not registered!")
  end
end

#instrument(event_id, payload = EMPTY_HASH) {|Proc| ... } ⇒ Object

Allows for code instrumentation Runs the provided code and sends the instrumentation details to all registered listeners

Examples:

Instrument some code

instrument('sleeping') do
  sleep(1)
end

Parameters:

  • event_id (String)
  • payload (Hash) (defaults to: EMPTY_HASH)

    payload for the instrumentation

Yields:

  • (Proc)

    instrumented code

Returns:

  • (Object)

    whatever the provided block (if any) returns

Raises:



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/karafka/core/monitoring/notifications.rb', line 130

def instrument(event_id, payload = EMPTY_HASH)
  assigned_listeners = @listeners[event_id]

  # Allow for instrumentation of only events we registered. If listeners array does not
  # exist, it means the event was not registered.
  raise EventNotRegistered, event_id unless assigned_listeners

  if block_given?
    # No point in instrumentation when no one is listening
    # Since the outcome will be ignored, we may as well save on allocations
    # There are many events that happen often like (`message.acknowledged`) that most
    # users do not subscribe to. Such check prevents us from publishing events that would
    # not be used at all saving on time measurements and objects allocations
    return yield if assigned_listeners.empty?

    start = monotonic_now
    result = yield
    time = monotonic_now - start
  elsif assigned_listeners.empty?
    # Skip measuring or doing anything if no one listening
    return
  end

  event = Event.new(event_id, payload, time)

  notify_listeners(@events_methods_map[event_id], event, assigned_listeners)

  result
end

#register_event(event_id) ⇒ Object

Registers a new event on which we can publish

Parameters:

  • event_id (String)

    event id



39
40
41
42
43
44
# File 'lib/karafka/core/monitoring/notifications.rb', line 39

def register_event(event_id)
  @mutex.synchronize do
    @listeners[event_id] = []
    @events_methods_map[event_id] = :"on_#{event_id.to_s.tr(".", "_")}"
  end
end

#subscribe(event_id_or_listener, &block) ⇒ Object

Allows for subscription to an event There are two ways you can subscribe: via block or via listener.

Examples:

Subscribe using listener

subscribe(MyListener.new)

Subscribe via block

subscribe do |event|
  puts event
end

Parameters:

  • event_id_or_listener (Object)

    event id when we want to subscribe to a particular event with a block or listener if we want to subscribe with general listener

  • block (Proc)

    block of code if we want to subscribe with it



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/karafka/core/monitoring/notifications.rb', line 81

def subscribe(event_id_or_listener, &block)
  @mutex.synchronize do
    if block
      event_id = event_id_or_listener

      raise EventNotRegistered, event_id unless @listeners.key?(event_id)

      # Copy-on-write: append by replacing the array, never mutating the one a concurrent
      # dispatch may be iterating (see #notify_listeners).
      @listeners[event_id] += [block]
    else
      listener = event_id_or_listener

      @listeners.each_key do |reg_event_id|
        next unless listener.respond_to?(@events_methods_map[reg_event_id])

        @listeners[reg_event_id] += [listener]
      end
    end
  end
end

#unsubscribe(listener_or_block) ⇒ Object

Allows for unsubscription from events This method will remove the listener/block from all events where it's currently subscribed.

Examples:

Unsubscribe using listener (removes from all events where it's subscribed)

unsubscribe(my_listener)

Parameters:

  • listener_or_block (Object)

    listener object or block to remove from all events



110
111
112
113
114
115
116
# File 'lib/karafka/core/monitoring/notifications.rb', line 110

def unsubscribe(listener_or_block)
  @mutex.synchronize do
    # Copy-on-write: rebuild each array without the listener instead of deleting in place,
    # so a dispatch iterating a previously captured array still sees the full set.
    @listeners.transform_values! { |event_listeners| event_listeners - [listener_or_block] }
  end
end