Module: OMQ::QueueReadable

Included in:
Readable
Defined in:
lib/omq/queue_interface.rb

Overview

Async::Queue-compatible read interface.

Automatically included by Readable. Provides #dequeue, #pop, #wait, and #each so sockets can be used where an Async::Queue is expected.

Instance Method Summary collapse

Instance Method Details

#dequeue(timeout: @options.read_timeout) ⇒ Array<String> Also known as: pop

Dequeues the next message.

Parameters:

  • timeout (Numeric, nil) (defaults to: @options.read_timeout)

    timeout in seconds (overrides the socket’s read_timeout for this call)

Returns:

  • (Array<String>)

    message parts

Raises:

  • (IO::TimeoutError)

    if timeout exceeded



18
19
20
# File 'lib/omq/queue_interface.rb', line 18

def dequeue(timeout: @options.read_timeout)
  Reactor.run(timeout:) { @engine.dequeue_recv }
end

#each {|Array<String>| ... } ⇒ void

This method returns an undefined value.

Yields each received message until the socket is closed or a receive timeout expires.

Yields:

  • (Array<String>)

    message parts



41
42
43
44
45
46
47
# File 'lib/omq/queue_interface.rb', line 41

def each
  while (msg = receive)
    yield msg
  end
rescue IO::TimeoutError
  nil
end

#waitArray<String>

Waits for the next message indefinitely (ignores read_timeout).

Returns:

  • (Array<String>)

    message parts



30
31
32
# File 'lib/omq/queue_interface.rb', line 30

def wait
  dequeue(timeout: nil)
end