Class: OMQ::Routing::Dealer

Inherits:
Object
  • Object
show all
Includes:
FairRecv, RoundRobin
Defined in:
lib/omq/routing/dealer.rb

Overview

DEALER socket routing: round-robin send, fair-queue receive.

No envelope manipulation — messages pass through unchanged.

Constant Summary

Constants included from RoundRobin

RoundRobin::BATCH_BYTE_CAP, RoundRobin::BATCH_MSG_CAP

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from FairRecv

#dequeue_recv, #unblock_recv

Methods included from RoundRobin

#send_queues_drained?

Constructor Details

#initialize(engine) ⇒ Dealer

Returns a new instance of Dealer.

Parameters:



21
22
23
24
25
26
# File 'lib/omq/routing/dealer.rb', line 21

def initialize(engine)
  @engine     = engine
  @recv_queue = FairQueue.new
  @tasks      = []
  init_round_robin(engine)
end

Instance Attribute Details

#recv_queueFairQueue (readonly)

Returns:



16
17
18
# File 'lib/omq/routing/dealer.rb', line 16

def recv_queue
  @recv_queue
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Connection)


31
32
33
34
# File 'lib/omq/routing/dealer.rb', line 31

def connection_added(connection)
  add_fair_recv_connection(connection)
  add_round_robin_send_connection(connection)
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Connection)


39
40
41
42
43
# File 'lib/omq/routing/dealer.rb', line 39

def connection_removed(connection)
  @connections.delete(connection)
  @recv_queue.remove_queue(connection)
  remove_round_robin_send_connection(connection)
end

#enqueue(parts) ⇒ Object

Parameters:

  • parts (Array<String>)


48
49
50
# File 'lib/omq/routing/dealer.rb', line 48

def enqueue(parts)
  enqueue_round_robin(parts)
end

#stopvoid

This method returns an undefined value.

Stops all background tasks.



57
58
59
60
# File 'lib/omq/routing/dealer.rb', line 57

def stop
  @tasks.each(&:stop)
  @tasks.clear
end