Module: OMQ::Writable
- Includes:
- QueueWritable
- Included in:
- CHANNEL, CLIENT, DEALER, PAIR, PEER, PUB, PUSH, RADIO, REP, REQ, ROUTER, SCATTER, SERVER, XPUB, XSUB
- Defined in:
- lib/omq/writable.rb
Overview
Pure Ruby Writable mixin. Enqueues messages to the engine’s send path.
Instance Method Summary collapse
-
#<<(message) ⇒ self
Sends a message (chainable).
-
#send(message) ⇒ self
Sends a message.
-
#wait_writable(timeout = @options.write_timeout) ⇒ true
Waits until the socket is writable.
Methods included from QueueWritable
Instance Method Details
#<<(message) ⇒ self
Sends a message (chainable).
43 44 45 |
# File 'lib/omq/writable.rb', line 43 def <<() send() end |
#send(message) ⇒ self
Sends a message.
Caller owns the message parts. Don’t mutate them after sending — especially with inproc transport or PUB fan-out, where a single reference can be shared across peers and read later by the send pump.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/omq/writable.rb', line 22 def send() parts = .is_a?(Array) ? : [] raise ArgumentError, "message has no parts" if parts.empty? if @engine.on_io_thread? Reactor.run(timeout: @options.write_timeout) { @engine.enqueue_send(parts) } elsif (timeout = @options.write_timeout) Async::Task.current.with_timeout(timeout, IO::TimeoutError) { @engine.enqueue_send(parts) } else @engine.enqueue_send(parts) end self end |
#wait_writable(timeout = @options.write_timeout) ⇒ true
Waits until the socket is writable.
53 54 55 |
# File 'lib/omq/writable.rb', line 53 def wait_writable(timeout = @options.write_timeout) true end |