Class: DatagroutConduit::Transport::Ws::Subscription
- Inherits:
-
Object
- Object
- DatagroutConduit::Transport::Ws::Subscription
- Includes:
- Enumerable
- Defined in:
- lib/datagrout_conduit/transport/ws.rb
Overview
Per-subscription event stream delivered via a thread-safe Queue. Call recv to block until the next event, or iterate with each.
Instance Attribute Summary collapse
-
#sub_id ⇒ Object
readonly
Returns the value of attribute sub_id.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Instance Method Summary collapse
- #_close ⇒ Object private
- #_enqueue(event) ⇒ Object private
- #closed? ⇒ Boolean
-
#each(&block) ⇒ Object
Iterate over events until the subscription is closed.
-
#initialize(sub_id, topic) ⇒ Subscription
constructor
A new instance of Subscription.
-
#recv(timeout: nil) ⇒ Object
Block until the next event arrives.
Constructor Details
#initialize(sub_id, topic) ⇒ Subscription
Returns a new instance of Subscription.
42 43 44 45 46 47 |
# File 'lib/datagrout_conduit/transport/ws.rb', line 42 def initialize(sub_id, topic) @sub_id = sub_id @topic = topic @queue = ::Thread::Queue.new @closed = false end |
Instance Attribute Details
#sub_id ⇒ Object (readonly)
Returns the value of attribute sub_id.
40 41 42 |
# File 'lib/datagrout_conduit/transport/ws.rb', line 40 def sub_id @sub_id end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
40 41 42 |
# File 'lib/datagrout_conduit/transport/ws.rb', line 40 def topic @topic end |
Instance Method Details
#_close ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
88 89 90 91 92 93 |
# File 'lib/datagrout_conduit/transport/ws.rb', line 88 def _close return if @closed @closed = true @queue.push(nil) end |
#_enqueue(event) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
83 84 85 |
# File 'lib/datagrout_conduit/transport/ws.rb', line 83 def _enqueue(event) @queue.push(event) unless @closed end |
#closed? ⇒ Boolean
78 79 80 |
# File 'lib/datagrout_conduit/transport/ws.rb', line 78 def closed? @closed end |
#each(&block) ⇒ Object
Iterate over events until the subscription is closed.
68 69 70 71 72 73 74 |
# File 'lib/datagrout_conduit/transport/ws.rb', line 68 def each(&block) loop do event = @queue.pop break if event.nil? block.call(event) end end |
#recv(timeout: nil) ⇒ Object
Block until the next event arrives. Returns nil and raises StopIteration on close when iterating via each.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/datagrout_conduit/transport/ws.rb', line 52 def recv(timeout: nil) event = if timeout Timeout.timeout(timeout) { @queue.pop } else @queue.pop end raise StopIteration if event.nil? event rescue Timeout::Error nil end |