Class: OMQ::Routing::Router

Inherits:
Object
  • Object
show all
Includes:
FairRecv
Defined in:
lib/omq/routing/router.rb

Overview

ROUTER socket routing: identity-based routing.

Prepends peer identity frame on receive. Uses first frame as routing identity on send.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from FairRecv

#dequeue_recv, #unblock_recv

Constructor Details

#initialize(engine) ⇒ Router

Returns a new instance of Router.

Parameters:



24
25
26
27
28
29
30
31
32
# File 'lib/omq/routing/router.rb', line 24

def initialize(engine)
  @engine                  = engine
  @recv_queue              = FairQueue.new
  @connections_by_identity = {}
  @identity_by_connection  = {}
  @conn_queues             = {}  # connection => per-connection send queue
  @conn_send_tasks         = {}  # connection => send pump task
  @tasks                   = []
end

Instance Attribute Details

#recv_queueFairQueue (readonly)

Returns:



19
20
21
# File 'lib/omq/routing/router.rb', line 19

def recv_queue
  @recv_queue
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Connection)


37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/omq/routing/router.rb', line 37

def connection_added(connection)
  identity = connection.peer_identity
  identity = SecureRandom.bytes(5) if identity.nil? || identity.empty?
  @connections_by_identity[identity] = connection
  @identity_by_connection[connection] = identity

  add_fair_recv_connection(connection) { |msg| [identity, *msg] }

  q = Routing.build_queue(@engine.options.send_hwm, :block)
  @conn_queues[connection] = q
  @conn_send_tasks[connection] = ConnSendPump.start(@engine, connection, q, @tasks)
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Connection)


53
54
55
56
57
58
59
# File 'lib/omq/routing/router.rb', line 53

def connection_removed(connection)
  identity = @identity_by_connection.delete(connection)
  @connections_by_identity.delete(identity) if identity
  @recv_queue.remove_queue(connection)
  @conn_queues.delete(connection)
  @conn_send_tasks.delete(connection)&.stop
end

#enqueue(parts) ⇒ Object

Enqueues a message for sending. The first frame is the routing identity.

Parameters:

  • parts (Array<String>)


66
67
68
69
70
71
72
73
74
75
76
# File 'lib/omq/routing/router.rb', line 66

def enqueue(parts)
  identity = parts.first
  if @engine.options.router_mandatory?
    unless @connections_by_identity[identity]
      raise SocketError, "no route to identity #{identity.inspect}"
    end
  end
  conn = @connections_by_identity[identity]
  return unless conn  # silently drop if peer disconnected
  @conn_queues[conn]&.enqueue(parts[1..])
end

#send_queues_drained?Boolean

Returns true when all per-connection send queues are empty.

Returns:

  • (Boolean)

    true when all per-connection send queues are empty



91
92
93
# File 'lib/omq/routing/router.rb', line 91

def send_queues_drained?
  @conn_queues.values.all?(&:empty?)
end

#stopvoid

This method returns an undefined value.

Stops all background tasks.



83
84
85
86
# File 'lib/omq/routing/router.rb', line 83

def stop
  @tasks.each(&:stop)
  @tasks.clear
end