Module: OMQ::QueueReadable
- Included in:
- Readable
- Defined in:
- lib/omq/queue_interface.rb
Overview
Async::Queue-compatible read interface.
Automatically included by Readable. Provides #dequeue, #pop, #wait, and #each so sockets can be used where an Async::Queue is expected.
Instance Method Summary collapse
-
#dequeue(timeout: @options.read_timeout) ⇒ Array<String>
(also: #pop)
Dequeues the next message.
-
#each {|Array<String>| ... } ⇒ void
Yields each received message until the socket is closed or a receive timeout expires.
-
#wait ⇒ Array<String>
Waits for the next message indefinitely (ignores read_timeout).
Instance Method Details
#dequeue(timeout: @options.read_timeout) ⇒ Array<String> Also known as: pop
Dequeues the next message.
18 19 20 |
# File 'lib/omq/queue_interface.rb', line 18 def dequeue(timeout: @options.read_timeout) Reactor.run(timeout:) { @engine.dequeue_recv } end |
#each {|Array<String>| ... } ⇒ void
This method returns an undefined value.
Yields each received message until the socket is closed or a receive timeout expires.
41 42 43 44 45 46 47 |
# File 'lib/omq/queue_interface.rb', line 41 def each while (msg = receive) yield msg end rescue IO::TimeoutError nil end |
#wait ⇒ Array<String>
Waits for the next message indefinitely (ignores read_timeout).
30 31 32 |
# File 'lib/omq/queue_interface.rb', line 30 def wait dequeue(timeout: nil) end |