Class: OMQ::Routing::Server

Inherits:
Object
  • Object
show all
Includes:
FairRecv
Defined in:
lib/omq/routing/server.rb

Overview

SERVER socket routing: identity-based routing with auto-generated 4-byte routing IDs.

Prepends routing ID on receive. Strips routing ID on send and routes to the identified connection.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Server

Returns a new instance of Server.

Parameters:

  • engine (Engine)


18
19
20
21
22
23
24
25
26
# File 'lib/omq/routing/server.rb', line 18

def initialize(engine)
  @engine                     = engine
  @recv_queue                 = FairQueue.new
  @connections_by_routing_id  = {}
  @routing_id_by_connection   = {}
  @conn_queues                = {}
  @conn_send_tasks            = {}
  @tasks                      = []
end

Instance Attribute Details

#recv_queueFairQueue (readonly)

Returns:

  • (FairQueue)


31
32
33
# File 'lib/omq/routing/server.rb', line 31

def recv_queue
  @recv_queue
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Connection)


35
36
37
38
39
40
41
42
43
44
45
# File 'lib/omq/routing/server.rb', line 35

def connection_added(connection)
  routing_id = SecureRandom.bytes(4)
  @connections_by_routing_id[routing_id] = connection
  @routing_id_by_connection[connection]  = routing_id

  add_fair_recv_connection(connection) { |msg| [routing_id, *msg] }

  q = Routing.build_queue(@engine.options.send_hwm, :block)
  @conn_queues[connection] = q
  @conn_send_tasks[connection] = ConnSendPump.start(@engine, connection, q, @tasks)
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Connection)


50
51
52
53
54
55
56
# File 'lib/omq/routing/server.rb', line 50

def connection_removed(connection)
  routing_id = @routing_id_by_connection.delete(connection)
  @connections_by_routing_id.delete(routing_id) if routing_id
  @recv_queue.remove_queue(connection)
  @conn_queues.delete(connection)
  @conn_send_tasks.delete(connection)&.stop
end

#enqueue(parts) ⇒ Object

Parameters:

  • parts (Array<String>)


61
62
63
64
65
66
# File 'lib/omq/routing/server.rb', line 61

def enqueue(parts)
  routing_id = parts.first
  conn = @connections_by_routing_id[routing_id]
  return unless conn
  @conn_queues[conn]&.enqueue(parts[1..])
end

#send_queues_drained?Boolean

True when all per-connection send queues are empty.

Returns:

  • (Boolean)


78
79
80
# File 'lib/omq/routing/server.rb', line 78

def send_queues_drained?
  @conn_queues.values.all?(&:empty?)
end

#stopObject

Stops all background tasks (send pumps).



70
71
72
73
# File 'lib/omq/routing/server.rb', line 70

def stop
  @tasks.each(&:stop)
  @tasks.clear
end