Class: DatagroutConduit::Transport::Ws::Subscription
- Inherits:
-
Object
- Object
- DatagroutConduit::Transport::Ws::Subscription
- Includes:
- Enumerable
- Defined in:
- lib/datagrout_conduit/transport/ws.rb
Overview
Per-subscription event stream delivered via a thread-safe Queue. Call recv to block until the next event, or iterate with each.
Instance Attribute Summary collapse
-
#sub_id ⇒ Object
readonly
Returns the value of attribute sub_id.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Instance Method Summary collapse
- #_close ⇒ Object private
- #_enqueue(event) ⇒ Object private
- #closed? ⇒ Boolean
-
#each(&block) ⇒ Object
Iterate over events until the subscription is closed.
-
#initialize(sub_id, topic) ⇒ Subscription
constructor
A new instance of Subscription.
-
#recv(timeout: nil) ⇒ Object
Block until the next event arrives.
Constructor Details
#initialize(sub_id, topic) ⇒ Subscription
Returns a new instance of Subscription.
50 51 52 53 54 55 |
# File 'lib/datagrout_conduit/transport/ws.rb', line 50 def initialize(sub_id, topic) @sub_id = sub_id @topic = topic @queue = ::Thread::Queue.new @closed = false end |
Instance Attribute Details
#sub_id ⇒ Object (readonly)
Returns the value of attribute sub_id.
48 49 50 |
# File 'lib/datagrout_conduit/transport/ws.rb', line 48 def sub_id @sub_id end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
48 49 50 |
# File 'lib/datagrout_conduit/transport/ws.rb', line 48 def topic @topic end |
Instance Method Details
#_close ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
96 97 98 99 100 101 |
# File 'lib/datagrout_conduit/transport/ws.rb', line 96 def _close return if @closed @closed = true @queue.push(nil) end |
#_enqueue(event) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
91 92 93 |
# File 'lib/datagrout_conduit/transport/ws.rb', line 91 def _enqueue(event) @queue.push(event) unless @closed end |
#closed? ⇒ Boolean
86 87 88 |
# File 'lib/datagrout_conduit/transport/ws.rb', line 86 def closed? @closed end |
#each(&block) ⇒ Object
Iterate over events until the subscription is closed.
76 77 78 79 80 81 82 |
# File 'lib/datagrout_conduit/transport/ws.rb', line 76 def each(&block) loop do event = @queue.pop break if event.nil? block.call(event) end end |
#recv(timeout: nil) ⇒ Object
Block until the next event arrives. Returns nil and raises StopIteration on close when iterating via each.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/datagrout_conduit/transport/ws.rb', line 60 def recv(timeout: nil) event = if timeout Timeout.timeout(timeout) { @queue.pop } else @queue.pop end raise StopIteration if event.nil? event rescue Timeout::Error nil end |