Module: OMQ::Writable

Includes:
QueueWritable
Included in:
CHANNEL, CLIENT, DEALER, PAIR, PEER, PUB, PUSH, RADIO, REP, REQ, ROUTER, SCATTER, SERVER, XPUB, XSUB
Defined in:
lib/omq/writable.rb

Overview

Pure Ruby Writable mixin. Enqueues messages to the engine’s send path.

Instance Method Summary collapse

Methods included from QueueWritable

#enqueue

Instance Method Details

#<<(message) ⇒ self

Sends a message (chainable).

Parameters:

  • message (String, Array<String>)

Returns:

  • (self)


63
64
65
# File 'lib/omq/writable.rb', line 63

def <<(message)
  send(message)
end

#send(message) ⇒ self

Sends a message.

Parts must be String-like (respond to ‘#to_str`). Use an empty string to send an empty frame — `nil` raises `NoMethodError` so accidental nils surface instead of silently producing a zero-byte frame. Invariants after `#send` returns:

  • every part is a frozen String

  • unfrozen String parts are re-tagged to ‘Encoding::BINARY` in place (a flag flip, no copy)

  • the parts array (if the caller passed one) is frozen

The receiver always gets frozen ‘BINARY`-tagged parts — on TCP/IPC via byteslice on the wire, on inproc via Pipe#send_message which duplicates the one pathological case (frozen non-BINARY parts) so the receiver sees BINARY like every other transport.

Parameters:

  • message (String, #to_str, Array<String, #to_str>)

Returns:

  • (self)

Raises:

  • (IO::TimeoutError)

    if write_timeout exceeded

  • (NoMethodError)

    if a part is not String-like



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/omq/writable.rb', line 34

def send(message)
  parts = message.is_a?(Array) ? message : [message]
  raise ArgumentError, "message has no parts" if parts.empty?

  parts = parts.map { |p| p.to_str } if parts.any? { |p| !p.is_a?(String) }

  parts.each do |part|
    part.force_encoding(Encoding::BINARY) unless part.frozen? || part.encoding == Encoding::BINARY
    part.freeze
  end
  parts.freeze

  if @engine.on_io_thread?
    Reactor.run(timeout: @options.write_timeout) { @engine.enqueue_send(parts) }
  elsif (timeout = @options.write_timeout)
    Async::Task.current.with_timeout(timeout, IO::TimeoutError) { @engine.enqueue_send(parts) }
  else
    @engine.enqueue_send(parts)
  end

  self
end

#wait_writable(timeout = @options.write_timeout) ⇒ true

Waits until the socket is writable.

Parameters:

  • timeout (Numeric, nil) (defaults to: @options.write_timeout)

    timeout in seconds

Returns:

  • (true)


73
74
75
# File 'lib/omq/writable.rb', line 73

def wait_writable(timeout = @options.write_timeout)
  true
end