Module: RSMP::Distributor

Included in:
Proxy
Defined in:
lib/rsmp/collect/distributor.rb

Overview

Class which distributes messages to receivers

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#receiversObject (readonly)

Returns the value of attribute receivers.



4
5
6
# File 'lib/rsmp/collect/distributor.rb', line 4

def receivers
  @receivers
end

Instance Method Details

#add_receiver(receiver) ⇒ Object

Raises:

  • (ArgumentError)


36
37
38
39
40
# File 'lib/rsmp/collect/distributor.rb', line 36

def add_receiver(receiver)
  raise ArgumentError unless receiver

  @receivers << receiver unless @receivers.include? receiver
end

#clear_deferred_distributionObject



16
17
18
# File 'lib/rsmp/collect/distributor.rb', line 16

def clear_deferred_distribution
  @deferred_messages = []
end

#distribute(message) ⇒ Object

Raises:

  • (ArgumentError)


48
49
50
51
52
53
54
55
56
# File 'lib/rsmp/collect/distributor.rb', line 48

def distribute(message)
  raise ArgumentError unless message

  if @defer_distribution
    @deferred_messages << message
  else
    distribute_immediately message
  end
end

#distribute_error(error, options = {}) ⇒ Object



62
63
64
# File 'lib/rsmp/collect/distributor.rb', line 62

def distribute_error(error, options = {})
  @receivers.each { |receiver| receiver.receive_error error, options }
end

#distribute_immediately(message) ⇒ Object



58
59
60
# File 'lib/rsmp/collect/distributor.rb', line 58

def distribute_immediately(message)
  @receivers.each { |receiver| receiver.receive message }
end

#distribute_queuedObject



30
31
32
33
34
# File 'lib/rsmp/collect/distributor.rb', line 30

def distribute_queued
  @deferred_messages.each { |message| distribute_immediately message }
ensure
  @deferred_messages = []
end

#initialize_distributorObject



10
11
12
13
14
# File 'lib/rsmp/collect/distributor.rb', line 10

def initialize_distributor
  @receivers = []
  @defer_distribution = false
  @deferred_messages = []
end

#inspectObject



6
7
8
# File 'lib/rsmp/collect/distributor.rb', line 6

def inspect
  "#<#{self.class.name}:#{object_id}}>"
end

#remove_receiver(receiver) ⇒ Object

Raises:

  • (ArgumentError)


42
43
44
45
46
# File 'lib/rsmp/collect/distributor.rb', line 42

def remove_receiver(receiver)
  raise ArgumentError unless receiver

  @receivers.delete receiver
end

#with_deferred_distributionObject



20
21
22
23
24
25
26
27
28
# File 'lib/rsmp/collect/distributor.rb', line 20

def with_deferred_distribution
  was = @defer_distribution
  @defer_distribution = true
  yield
  distribute_queued
ensure
  @defer_distribution = was
  @deferred_messages = []
end