Class: Easyop::Events::Bus::Memory

Inherits:
Base
  • Object
show all
Defined in:
lib/easyop/events/bus/memory.rb

Overview

In-process, synchronous bus. Default adapter — requires no external gems.

Thread-safe: subscriptions are protected by a Mutex, and publish snapshots the subscriber list before calling handlers so the lock is not held during handler execution (prevents deadlocks).

Examples:

bus = Easyop::Events::Bus::Memory.new
bus.subscribe("order.placed") { |e| puts e.name }
bus.publish(Easyop::Events::Event.new(name: "order.placed"))

Instance Method Summary collapse

Constructor Details

#initializeMemory

Returns a new instance of Memory.



17
18
19
20
# File 'lib/easyop/events/bus/memory.rb', line 17

def initialize
  @subscribers = []
  @mutex       = Mutex.new
end

Instance Method Details

#clear!Object

Remove all subscriptions. Useful in tests.



53
54
55
# File 'lib/easyop/events/bus/memory.rb', line 53

def clear!
  @mutex.synchronize { @subscribers.clear }
end

#publish(event) ⇒ Object

Deliver event to all matching subscribers. Handlers are called outside the lock; failures in individual handlers are swallowed so other handlers still run.

Parameters:



27
28
29
30
31
32
33
34
# File 'lib/easyop/events/bus/memory.rb', line 27

def publish(event)
  subs = @mutex.synchronize { @subscribers.dup }
  subs.each do |sub|
    sub[:handler].call(event) if _pattern_matches?(sub[:pattern], event.name)
  rescue StandardError
    # Individual handler failures must not prevent other handlers from running.
  end
end

#subscribe(pattern, &block) ⇒ Hash

Register a handler block for events matching pattern.

Parameters:

  • pattern (String, Regexp)

    exact name, glob, or Regexp

Returns:

  • (Hash)

    subscription handle (pass to #unsubscribe to remove)



40
41
42
43
44
# File 'lib/easyop/events/bus/memory.rb', line 40

def subscribe(pattern, &block)
  entry = { pattern: pattern, handler: block }
  @mutex.synchronize { @subscribers << entry }
  entry
end

#subscriber_countInteger

Returns number of active subscriptions.

Returns:

  • (Integer)

    number of active subscriptions



58
59
60
# File 'lib/easyop/events/bus/memory.rb', line 58

def subscriber_count
  @mutex.synchronize { @subscribers.size }
end

#unsubscribe(handle) ⇒ Object

Remove a previously registered subscription.

Parameters:

  • handle (Hash)

    the value returned by #subscribe



48
49
50
# File 'lib/easyop/events/bus/memory.rb', line 48

def unsubscribe(handle)
  @mutex.synchronize { @subscribers.delete(handle) }
end