Class: Cosmo::API::Stream

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/cosmo/api/stream.rb

Constant Summary collapse

LIMIT =
20

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name) ⇒ Stream

Returns a new instance of Stream.



27
28
29
# File 'lib/cosmo/api/stream.rb', line 27

def initialize(name)
  @name = name
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



25
26
27
# File 'lib/cosmo/api/stream.rb', line 25

def name
  @name
end

Class Method Details

.allObject



12
13
14
# File 'lib/cosmo/api/stream.rb', line 12

def self.all
  client.list_streams.filter_map { new(_1) }
end

.clientObject



21
22
23
# File 'lib/cosmo/api/stream.rb', line 21

def self.client
  @client ||= Client.instance
end

.jobsObject



16
17
18
19
# File 'lib/cosmo/api/stream.rb', line 16

def self.jobs
  names = Config[:setup][:jobs].keys - %i[scheduled dead]
  all.select { names.include?(_1.name.to_sym) }
end

Instance Method Details

#delete(seq) ⇒ Object



99
100
101
# File 'lib/cosmo/api/stream.rb', line 99

def delete(seq)
  client.delete_message(name, seq)
end

#eachObject



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/cosmo/api/stream.rb', line 47

def each
  return if total.zero?

  state = info[:state]
  current = @offset || state.first_seq.to_i
  last = state.last_seq.to_i

  loop do
    break if current > last

    job = message(current)
    break unless job

    yield job
    current += 1
  end
end

#infoObject



31
32
33
34
# File 'lib/cosmo/api/stream.rb', line 31

def info
  info = client.stream_info(name)
  { state: info.state, config: info.config }
end

#message(seq) ⇒ Object



85
86
87
88
89
# File 'lib/cosmo/api/stream.rb', line 85

def message(seq)
  Job.new(name, client.get_message(name, seq: seq, direct: true))
rescue NATS::JetStream::Error::NotFound
  # nop, acked/nacked
end

#messages(page: nil, limit: nil) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/cosmo/api/stream.rb', line 70

def messages(page: nil, limit: nil)
  jobs = []
  limit = (limit || LIMIT).to_i
  state = info[:state]
  start = state.first_seq.to_i
  start += (page.to_i - 1) * limit if page

  offset(start).each do |message|
    jobs << message
    break if jobs.size >= limit
  end

  jobs
end

#offset(value) ⇒ Object



65
66
67
68
# File 'lib/cosmo/api/stream.rb', line 65

def offset(value)
  @offset = value.to_i
  self
end

#retriesObject



43
44
45
# File 'lib/cosmo/api/stream.rb', line 43

def retries
  client.list_consumers(name).sum { it["num_redelivered"].to_i }
end

#retry(seq) ⇒ Object



91
92
93
94
95
96
97
# File 'lib/cosmo/api/stream.rb', line 91

def retry(seq)
  job = message(seq)
  return unless job

  client.publish(job.x_subject, job.message.data)
  delete(seq)
end

#totalObject Also known as: size



36
37
38
39
40
# File 'lib/cosmo/api/stream.rb', line 36

def total
  info[:state].messages.to_i
rescue StandardError
  0
end