Class: OMQ::Routing::Client

Inherits:
Object
  • Object
show all
Includes:
RoundRobin
Defined in:
lib/omq/routing/client.rb

Overview

CLIENT socket routing: round-robin send, fair-queue receive.

Same as DEALER — no envelope manipulation.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Client

Returns a new instance of Client.

Parameters:

  • engine (Engine)


14
15
16
17
18
19
# File 'lib/omq/routing/client.rb', line 14

def initialize(engine)
  @engine     = engine
  @recv_queue = Routing.build_queue(engine.options.recv_hwm, :block)
  @tasks      = []
  init_round_robin(engine)
end

Instance Attribute Details

#recv_queueAsync::LimitedQueue (readonly)

Returns:

  • (Async::LimitedQueue)


24
25
26
# File 'lib/omq/routing/client.rb', line 24

def recv_queue
  @recv_queue
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Connection)


47
48
49
50
51
52
# File 'lib/omq/routing/client.rb', line 47

def connection_added(connection)
  @connections << connection
  task = @engine.start_recv_pump(connection, @recv_queue)
  @tasks << task if task
  add_round_robin_send_connection(connection)
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Connection)


57
58
59
60
# File 'lib/omq/routing/client.rb', line 57

def connection_removed(connection)
  @connections.delete(connection)
  remove_round_robin_send_connection(connection)
end

#dequeue_recvArray<String>?

Dequeues the next received message. Blocks until one is available.

Returns:

  • (Array<String>, nil)


31
32
33
# File 'lib/omq/routing/client.rb', line 31

def dequeue_recv
  @recv_queue.dequeue
end

#enqueue(parts) ⇒ Object

Parameters:

  • parts (Array<String>)


65
66
67
# File 'lib/omq/routing/client.rb', line 65

def enqueue(parts)
  enqueue_round_robin(parts)
end

#stopObject

Stops all background tasks.



71
72
73
74
# File 'lib/omq/routing/client.rb', line 71

def stop
  @tasks.each(&:stop)
  @tasks.clear
end

#unblock_recvvoid

This method returns an undefined value.

Wakes a blocked #dequeue_recv with a nil sentinel.



40
41
42
# File 'lib/omq/routing/client.rb', line 40

def unblock_recv
  @recv_queue.enqueue(nil)
end