Class: OMQ::Routing::Push

Inherits:
Object
  • Object
show all
Includes:
RoundRobin
Defined in:
lib/omq/routing/push.rb

Overview

PUSH socket routing: round-robin send to PULL peers.

Constant Summary

Constants included from RoundRobin

RoundRobin::BATCH_BYTE_CAP, RoundRobin::BATCH_MSG_CAP

Instance Method Summary collapse

Methods included from RoundRobin

#send_queues_drained?

Constructor Details

#initialize(engine) ⇒ Push

Returns a new instance of Push.

Parameters:



13
14
15
16
17
# File 'lib/omq/routing/push.rb', line 13

def initialize(engine)
  @engine = engine
  @tasks  = []
  init_round_robin(engine)
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Connection)


39
40
41
42
# File 'lib/omq/routing/push.rb', line 39

def connection_added(connection)
  add_round_robin_send_connection(connection)
  start_reaper(connection)
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Connection)


47
48
49
50
# File 'lib/omq/routing/push.rb', line 47

def connection_removed(connection)
  @connections.delete(connection)
  remove_round_robin_send_connection(connection)
end

#dequeue_recvObject



28
29
30
# File 'lib/omq/routing/push.rb', line 28

def dequeue_recv
  raise "PUSH sockets cannot receive"
end

#enqueue(parts) ⇒ Object

Parameters:

  • parts (Array<String>)


55
56
57
# File 'lib/omq/routing/push.rb', line 55

def enqueue(parts)
  enqueue_round_robin(parts)
end

#recv_queueObject

PUSH is write-only. Engine-facing recv contract: dequeue raises, unblock is a no-op (fatal-error propagation still calls it).



23
24
25
# File 'lib/omq/routing/push.rb', line 23

def recv_queue
  raise "PUSH sockets cannot receive"
end

#stopObject

Stops all background tasks (send pumps, reapers).



62
63
64
65
# File 'lib/omq/routing/push.rb', line 62

def stop
  @tasks.each(&:stop)
  @tasks.clear
end

#unblock_recvObject



33
34
# File 'lib/omq/routing/push.rb', line 33

def unblock_recv
end