Module: OMQ::Writable
- Includes:
- QueueWritable
- Included in:
- CHANNEL, CLIENT, DEALER, PAIR, PEER, PUB, PUSH, RADIO, REP, REQ, ROUTER, SCATTER, SERVER, XPUB, XSUB
- Defined in:
- lib/omq/writable.rb
Overview
Pure Ruby Writable mixin. Enqueues messages to the engine’s send path.
Instance Method Summary collapse
-
#<<(message) ⇒ self
Sends a message (chainable).
-
#send(message) ⇒ self
Sends a message.
-
#wait_writable(timeout = @options.write_timeout) ⇒ true
Waits until the socket is writable.
Methods included from QueueWritable
Instance Method Details
#<<(message) ⇒ self
Sends a message (chainable).
63 64 65 |
# File 'lib/omq/writable.rb', line 63 def <<() send() end |
#send(message) ⇒ self
Sends a message.
Parts must be String-like (respond to ‘#to_str`). Use an empty string to send an empty frame — `nil` raises `NoMethodError` so accidental nils surface instead of silently producing a zero-byte frame. Invariants after `#send` returns:
-
every part is a frozen String
-
unfrozen String parts are re-tagged to ‘Encoding::BINARY` in place (a flag flip, no copy)
-
the parts array (if the caller passed one) is frozen
The receiver always gets frozen ‘BINARY`-tagged parts — on TCP/IPC via byteslice on the wire, on inproc via Pipe#send_message which duplicates the one pathological case (frozen non-BINARY parts) so the receiver sees BINARY like every other transport.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/omq/writable.rb', line 34 def send() parts = .is_a?(Array) ? : [] raise ArgumentError, "message has no parts" if parts.empty? parts = parts.map { |p| p.to_str } if parts.any? { |p| !p.is_a?(String) } parts.each do |part| part.force_encoding(Encoding::BINARY) unless part.frozen? || part.encoding == Encoding::BINARY part.freeze end parts.freeze if @engine.on_io_thread? Reactor.run(timeout: @options.write_timeout) { @engine.enqueue_send(parts) } elsif (timeout = @options.write_timeout) Async::Task.current.with_timeout(timeout, IO::TimeoutError) { @engine.enqueue_send(parts) } else @engine.enqueue_send(parts) end self end |
#wait_writable(timeout = @options.write_timeout) ⇒ true
Waits until the socket is writable.
73 74 75 |
# File 'lib/omq/writable.rb', line 73 def wait_writable(timeout = @options.write_timeout) true end |