Class: Cosmo::API::Stream
- Inherits:
-
Object
- Object
- Cosmo::API::Stream
- Includes:
- Enumerable
- Defined in:
- lib/cosmo/api/stream.rb
Constant Summary collapse
- LIMIT =
20
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Class Method Summary collapse
Instance Method Summary collapse
- #delete(seq) ⇒ Object
- #each ⇒ Object
- #info ⇒ Object
-
#initialize(name) ⇒ Stream
constructor
A new instance of Stream.
- #message(seq) ⇒ Object
- #messages(page: nil, limit: nil) ⇒ Object
- #offset(value) ⇒ Object
- #retries ⇒ Object
- #retry(seq) ⇒ Object
- #total ⇒ Object (also: #size)
Constructor Details
#initialize(name) ⇒ Stream
Returns a new instance of Stream.
27 28 29 |
# File 'lib/cosmo/api/stream.rb', line 27 def initialize(name) @name = name end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
25 26 27 |
# File 'lib/cosmo/api/stream.rb', line 25 def name @name end |
Class Method Details
.all ⇒ Object
12 13 14 |
# File 'lib/cosmo/api/stream.rb', line 12 def self.all client.list_streams.filter_map { new(_1) } end |
.client ⇒ Object
21 22 23 |
# File 'lib/cosmo/api/stream.rb', line 21 def self.client @client ||= Client.instance end |
.jobs ⇒ Object
16 17 18 19 |
# File 'lib/cosmo/api/stream.rb', line 16 def self.jobs names = Config[:setup][:jobs].keys - %i[scheduled dead] all.select { names.include?(_1.name.to_sym) } end |
Instance Method Details
#delete(seq) ⇒ Object
99 100 101 |
# File 'lib/cosmo/api/stream.rb', line 99 def delete(seq) client.(name, seq) end |
#each ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/cosmo/api/stream.rb', line 47 def each return if total.zero? state = info[:state] current = @offset || state.first_seq.to_i last = state.last_seq.to_i loop do break if current > last job = (current) break unless job yield job current += 1 end end |
#info ⇒ Object
31 32 33 34 |
# File 'lib/cosmo/api/stream.rb', line 31 def info info = client.stream_info(name) { state: info.state, config: info.config } end |
#message(seq) ⇒ Object
85 86 87 88 89 |
# File 'lib/cosmo/api/stream.rb', line 85 def (seq) Job.new(name, client.(name, seq: seq, direct: true)) rescue NATS::JetStream::Error::NotFound # nop, acked/nacked end |
#messages(page: nil, limit: nil) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/cosmo/api/stream.rb', line 70 def (page: nil, limit: nil) jobs = [] limit = (limit || LIMIT).to_i state = info[:state] start = state.first_seq.to_i start += (page.to_i - 1) * limit if page offset(start).each do || jobs << break if jobs.size >= limit end jobs end |
#offset(value) ⇒ Object
65 66 67 68 |
# File 'lib/cosmo/api/stream.rb', line 65 def offset(value) @offset = value.to_i self end |
#retries ⇒ Object
43 44 45 |
# File 'lib/cosmo/api/stream.rb', line 43 def retries client.list_consumers(name).sum { it["num_redelivered"].to_i } end |
#retry(seq) ⇒ Object
91 92 93 94 95 96 97 |
# File 'lib/cosmo/api/stream.rb', line 91 def retry(seq) job = (seq) return unless job client.publish(job.x_subject, job..data) delete(seq) end |
#total ⇒ Object Also known as: size
36 37 38 39 40 |
# File 'lib/cosmo/api/stream.rb', line 36 def total info[:state]..to_i rescue StandardError 0 end |