Class: Beachcomber::WatchStream

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/beachcomber/watch_stream.rb

Instance Method Summary collapse

Constructor Details

#initialize(socket) ⇒ WatchStream

Returns a new instance of WatchStream.



5
6
7
# File 'lib/beachcomber/watch_stream.rb', line 5

def initialize(socket)
  @socket = socket
end

Instance Method Details

#closeObject



46
47
48
# File 'lib/beachcomber/watch_stream.rb', line 46

def close
  @socket.close
end

#eachObject

Yields a WatchEvent per emitted change.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/beachcomber/watch_stream.rb', line 10

def each
  return enum_for(:each) unless block_given?
  while (line = @socket.gets)
    line.strip!
    next if line.empty?
    resp = JSON.parse(line)
    unless resp["ok"]
      raise ServerError, resp["error"] || "watch error"
    end
    yield WatchEvent.new(
      data:   resp["data"],
      age_ms: Integer(resp["age_ms"] || 0),
      stale:  resp["stale"] == true,
    )
  end
end

#next_eventObject

Read the next event; returns nil on connection close.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/beachcomber/watch_stream.rb', line 28

def next_event
  loop do
    line = @socket.gets
    return nil if line.nil?
    line.strip!
    next if line.empty?
    resp = JSON.parse(line)
    unless resp["ok"]
      raise ServerError, resp["error"] || "watch error"
    end
    return WatchEvent.new(
      data:   resp["data"],
      age_ms: Integer(resp["age_ms"] || 0),
      stale:  resp["stale"] == true,
    )
  end
end