Class: DatagroutConduit::Transport::Ws::Subscription

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/datagrout_conduit/transport/ws.rb

Overview

Per-subscription event stream delivered via a thread-safe Queue. Call recv to block until the next event, or iterate with each.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sub_id, topic) ⇒ Subscription

Returns a new instance of Subscription.



42
43
44
45
46
47
# File 'lib/datagrout_conduit/transport/ws.rb', line 42

def initialize(sub_id, topic)
  @sub_id = sub_id
  @topic  = topic
  @queue  = ::Thread::Queue.new
  @closed = false
end

Instance Attribute Details

#sub_idObject (readonly)

Returns the value of attribute sub_id.



40
41
42
# File 'lib/datagrout_conduit/transport/ws.rb', line 40

def sub_id
  @sub_id
end

#topicObject (readonly)

Returns the value of attribute topic.



40
41
42
# File 'lib/datagrout_conduit/transport/ws.rb', line 40

def topic
  @topic
end

Instance Method Details

#_closeObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



88
89
90
91
92
93
# File 'lib/datagrout_conduit/transport/ws.rb', line 88

def _close
  return if @closed

  @closed = true
  @queue.push(nil)
end

#_enqueue(event) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



83
84
85
# File 'lib/datagrout_conduit/transport/ws.rb', line 83

def _enqueue(event)
  @queue.push(event) unless @closed
end

#closed?Boolean

Returns:

  • (Boolean)


78
79
80
# File 'lib/datagrout_conduit/transport/ws.rb', line 78

def closed?
  @closed
end

#each(&block) ⇒ Object

Iterate over events until the subscription is closed.



68
69
70
71
72
73
74
# File 'lib/datagrout_conduit/transport/ws.rb', line 68

def each(&block)
  loop do
    event = @queue.pop
    break if event.nil?
    block.call(event)
  end
end

#recv(timeout: nil) ⇒ Object

Block until the next event arrives. Returns nil and raises StopIteration on close when iterating via each.

Parameters:

  • timeout (Numeric, nil) (defaults to: nil)

    optional timeout in seconds; returns nil on expiry



52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/datagrout_conduit/transport/ws.rb', line 52

def recv(timeout: nil)
  event =
    if timeout
      Timeout.timeout(timeout) { @queue.pop }
    else
      @queue.pop
    end

  raise StopIteration if event.nil?

  event
rescue Timeout::Error
  nil
end