Class: OMQ::Routing::Req

Inherits:
Object
  • Object
show all
Includes:
FairRecv, RoundRobin
Defined in:
lib/omq/routing/req.rb

Overview

REQ socket routing: round-robin send with strict send/recv alternation.

REQ prepends an empty delimiter frame on send and strips it on receive.

Constant Summary collapse

EMPTY_BINARY =

Shared frozen empty binary string to avoid repeated allocations.

::Protocol::ZMTP::Codec::EMPTY_BINARY

Constants included from RoundRobin

OMQ::Routing::RoundRobin::BATCH_BYTE_CAP, OMQ::Routing::RoundRobin::BATCH_MSG_CAP

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from FairRecv

#dequeue_recv, #unblock_recv

Methods included from RoundRobin

#send_queues_drained?

Constructor Details

#initialize(engine) ⇒ Req

Returns a new instance of Req.

Parameters:



24
25
26
27
28
29
30
# File 'lib/omq/routing/req.rb', line 24

def initialize(engine)
  @engine          = engine
  @recv_queue      = FairQueue.new
  @tasks           = []
  @state           = :ready        # :ready or :waiting_reply
  init_round_robin(engine)
end

Instance Attribute Details

#recv_queueFairQueue (readonly)

Returns:



19
20
21
# File 'lib/omq/routing/req.rb', line 19

def recv_queue
  @recv_queue
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Connection)


35
36
37
38
39
40
41
# File 'lib/omq/routing/req.rb', line 35

def connection_added(connection)
  add_fair_recv_connection(connection) do |msg|
    @state = :ready
    msg.first&.empty? ? msg[1..] : msg
  end
  add_round_robin_send_connection(connection)
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Connection)


46
47
48
49
50
# File 'lib/omq/routing/req.rb', line 46

def connection_removed(connection)
  @connections.delete(connection)
  @recv_queue.remove_queue(connection)
  remove_round_robin_send_connection(connection)
end

#enqueue(parts) ⇒ Object

Parameters:

  • parts (Array<String>)

Raises:

  • (SocketError)


55
56
57
58
59
# File 'lib/omq/routing/req.rb', line 55

def enqueue(parts)
  raise SocketError, "REQ socket expects send/recv/send/recv order" unless @state == :ready
  @state = :waiting_reply
  enqueue_round_robin(parts)
end

#stopvoid

This method returns an undefined value.

Stops all background tasks.



66
67
68
69
# File 'lib/omq/routing/req.rb', line 66

def stop
  @tasks.each(&:stop)
  @tasks.clear
end