Class: OMQ::Routing::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/routing/channel.rb

Overview

CHANNEL socket routing: exclusive 1-to-1 bidirectional.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Channel

Returns a new instance of Channel.

Parameters:

  • engine (Engine)


10
11
12
13
14
15
16
17
18
# File 'lib/omq/routing/channel.rb', line 10

def initialize(engine)
  @engine         = engine
  @connection     = nil
  @recv_queue     = Routing.build_queue(engine.options.recv_hwm, :block)
  @send_queue     = nil
  @staging_queue  = Routing.build_queue(engine.options.send_hwm, :block)
  @send_pump      = nil
  @tasks          = []
end

Instance Attribute Details

#recv_queueAsync::LimitedQueue (readonly)

Returns:

  • (Async::LimitedQueue)


23
24
25
# File 'lib/omq/routing/channel.rb', line 23

def recv_queue
  @recv_queue
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Connection)

Raises:

  • (RuntimeError)

    if a connection already exists



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/omq/routing/channel.rb', line 47

def connection_added(connection)
  raise "CHANNEL allows only one peer" if @connection
  @connection = connection

  task = @engine.start_recv_pump(connection, @recv_queue)
  @tasks << task if task

  unless connection.is_a?(Transport::Inproc::DirectPipe)
    @send_queue = Routing.build_queue(@engine.options.send_hwm, :block)
    while (msg = @staging_queue.dequeue(timeout: 0))
      @send_queue.enqueue(msg)
    end
    start_send_pump(connection)
  end
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Connection)


66
67
68
69
70
71
72
73
# File 'lib/omq/routing/channel.rb', line 66

def connection_removed(connection)
  if @connection == connection
    @connection = nil
    @send_queue = nil
    @send_pump&.stop
    @send_pump = nil
  end
end

#dequeue_recvArray<String>?

Dequeues the next received message. Blocks until one is available.

Returns:

  • (Array<String>, nil)


30
31
32
# File 'lib/omq/routing/channel.rb', line 30

def dequeue_recv
  @recv_queue.dequeue
end

#enqueue(parts) ⇒ Object

Parameters:

  • parts (Array<String>)


78
79
80
81
82
83
84
85
86
87
# File 'lib/omq/routing/channel.rb', line 78

def enqueue(parts)
  conn = @connection
  if conn.is_a?(Transport::Inproc::DirectPipe) && conn.direct_recv_queue
    conn.send_message(parts)
  elsif @send_queue
    @send_queue.enqueue(parts)
  else
    @staging_queue.enqueue(parts)
  end
end

#send_queues_drained?Boolean

True when the staging and send queues are empty.

Returns:

  • (Boolean)


99
100
101
# File 'lib/omq/routing/channel.rb', line 99

def send_queues_drained?
  @staging_queue.empty? && (@send_queue.nil? || @send_queue.empty?)
end

#stopObject

Stops all background tasks (send pumps).



91
92
93
94
# File 'lib/omq/routing/channel.rb', line 91

def stop
  @tasks.each(&:stop)
  @tasks.clear
end

#unblock_recvvoid

This method returns an undefined value.

Wakes a blocked #dequeue_recv with a nil sentinel.



39
40
41
# File 'lib/omq/routing/channel.rb', line 39

def unblock_recv
  @recv_queue.enqueue(nil)
end