Module: OMQ::Writable

Includes:
QueueWritable
Included in:
CHANNEL, CLIENT, DEALER, PAIR, PEER, PUB, PUSH, RADIO, REP, REQ, ROUTER, SCATTER, SERVER, XPUB, XSUB
Defined in:
lib/omq/writable.rb

Overview

Pure Ruby Writable mixin. Enqueues messages to the engine’s send path.

Instance Method Summary collapse

Methods included from QueueWritable

#enqueue

Instance Method Details

#<<(message) ⇒ self

Sends a message (chainable).

Parameters:

  • message (String, Array<String>)

Returns:

  • (self)


43
44
45
# File 'lib/omq/writable.rb', line 43

def <<(message)
  send(message)
end

#send(message) ⇒ self

Sends a message.

Caller owns the message parts. Don’t mutate them after sending — especially with inproc transport or PUB fan-out, where a single reference can be shared across peers and read later by the send pump.

Parameters:

  • message (String, Array<String>)

    message parts

Returns:

  • (self)

Raises:

  • (IO::TimeoutError)

    if write_timeout exceeded



22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/omq/writable.rb', line 22

def send(message)
  parts = message.is_a?(Array) ? message : [message]
  raise ArgumentError, "message has no parts" if parts.empty?

  if @engine.on_io_thread?
    Reactor.run(timeout: @options.write_timeout) { @engine.enqueue_send(parts) }
  elsif (timeout = @options.write_timeout)
    Async::Task.current.with_timeout(timeout, IO::TimeoutError) { @engine.enqueue_send(parts) }
  else
    @engine.enqueue_send(parts)
  end

  self
end

#wait_writable(timeout = @options.write_timeout) ⇒ true

Waits until the socket is writable.

Parameters:

  • timeout (Numeric, nil) (defaults to: @options.write_timeout)

    timeout in seconds

Returns:

  • (true)


53
54
55
# File 'lib/omq/writable.rb', line 53

def wait_writable(timeout = @options.write_timeout)
  true
end