Module: OMQ::Readable

Includes:
QueueReadable
Included in:
CHANNEL, CLIENT, DEALER, DISH, GATHER, PAIR, PEER, PULL, REP, REQ, ROUTER, SERVER, SUB, XPUB, XSUB
Defined in:
lib/omq/readable.rb

Overview

Pure Ruby Readable mixin. Dequeues messages from the engine’s recv queue.

Instance Method Summary collapse

Methods included from QueueReadable

#dequeue, #each, #wait

Instance Method Details

#receiveArray<String>

Receives the next message directly from the engine recv queue.

Returns:

  • (Array<String>)

    message parts

Raises:

  • (IO::TimeoutError)

    if read_timeout exceeded



16
17
18
19
20
21
22
23
24
# File 'lib/omq/readable.rb', line 16

def receive
  if @engine.on_io_thread?
    Reactor.run(timeout: @options.read_timeout) { @engine.dequeue_recv }
  elsif (timeout = @options.read_timeout)
    Async::Task.current.with_timeout(timeout, IO::TimeoutError) { @engine.dequeue_recv }
  else
    @engine.dequeue_recv
  end
end

#wait_readable(timeout = @options.read_timeout) ⇒ true

Waits until the socket is readable.

Parameters:

  • timeout (Numeric, nil) (defaults to: @options.read_timeout)

    timeout in seconds

Returns:

  • (true)


32
33
34
# File 'lib/omq/readable.rb', line 32

def wait_readable(timeout = @options.read_timeout)
  true
end