Class: Cosmo::API::Stream
- Inherits:
-
Object
- Object
- Cosmo::API::Stream
- Includes:
- Enumerable
- Defined in:
- lib/cosmo/api/stream.rb
Constant Summary collapse
- LIMIT =
20
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Class Method Summary collapse
Instance Method Summary collapse
- #delete(seq) ⇒ Object
- #each ⇒ Object
- #info ⇒ Object
-
#initialize(name) ⇒ Stream
constructor
A new instance of Stream.
- #message(seq) ⇒ Object
- #messages(page: nil, limit: nil) ⇒ Object
- #offset(value) ⇒ Object
- #pause! ⇒ Object
- #paused? ⇒ Boolean
- #retries ⇒ Object
- #retry(seq) ⇒ Object
- #total ⇒ Object (also: #size)
- #unpause! ⇒ Object
Constructor Details
#initialize(name) ⇒ Stream
Returns a new instance of Stream.
28 29 30 |
# File 'lib/cosmo/api/stream.rb', line 28 def initialize(name) @name = name end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
26 27 28 |
# File 'lib/cosmo/api/stream.rb', line 26 def name @name end |
Class Method Details
.all ⇒ Object
12 13 14 |
# File 'lib/cosmo/api/stream.rb', line 12 def self.all client.list_streams.map { new(_1.dig("config", "name")) } end |
.client ⇒ Object
22 23 24 |
# File 'lib/cosmo/api/stream.rb', line 22 def self.client @client ||= Client.instance end |
.jobs ⇒ Object
16 17 18 19 20 |
# File 'lib/cosmo/api/stream.rb', line 16 def self.jobs client.list_streams.select { _1.dig("config", "metadata", "_cosmo.type") == "jobs" } .reject { %w[scheduled dead].include?(_1.dig("config", "name")) } .map { new(_1.dig("config", "name")) } end |
Instance Method Details
#delete(seq) ⇒ Object
100 101 102 |
# File 'lib/cosmo/api/stream.rb', line 100 def delete(seq) client.(name, seq) end |
#each ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/cosmo/api/stream.rb', line 48 def each return if total.zero? state = info[:state] current = @offset || state.first_seq.to_i last = state.last_seq.to_i loop do break if current > last job = (current) break unless job yield job current += 1 end end |
#info ⇒ Object
32 33 34 35 |
# File 'lib/cosmo/api/stream.rb', line 32 def info info = client.stream_info(name) { state: info.state, config: info.config } end |
#message(seq) ⇒ Object
86 87 88 89 90 |
# File 'lib/cosmo/api/stream.rb', line 86 def (seq) Job.new(name, client.(name, seq: seq, direct: true)) rescue NATS::JetStream::Error::NotFound # nop, acked/nacked end |
#messages(page: nil, limit: nil) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/cosmo/api/stream.rb', line 71 def (page: nil, limit: nil) jobs = [] limit = (limit || LIMIT).to_i state = info[:state] start = state.first_seq.to_i start += (page.to_i - 1) * limit if page offset(start).each do || jobs << break if jobs.size >= limit end jobs end |
#offset(value) ⇒ Object
66 67 68 69 |
# File 'lib/cosmo/api/stream.rb', line 66 def offset(value) @offset = value.to_i self end |
#pause! ⇒ Object
104 105 106 |
# File 'lib/cosmo/api/stream.rb', line 104 def pause! client.pause_stream(name) end |
#paused? ⇒ Boolean
112 113 114 |
# File 'lib/cosmo/api/stream.rb', line 112 def paused? client.stream_paused?(name) end |
#retries ⇒ Object
44 45 46 |
# File 'lib/cosmo/api/stream.rb', line 44 def retries client.list_consumers(name).sum { _1["num_redelivered"].to_i } end |
#retry(seq) ⇒ Object
92 93 94 95 96 97 98 |
# File 'lib/cosmo/api/stream.rb', line 92 def retry(seq) job = (seq) return unless job client.publish(job.x_subject, job..data) delete(seq) end |
#total ⇒ Object Also known as: size
37 38 39 40 41 |
# File 'lib/cosmo/api/stream.rb', line 37 def total info[:state]..to_i rescue StandardError 0 end |
#unpause! ⇒ Object
108 109 110 |
# File 'lib/cosmo/api/stream.rb', line 108 def unpause! client.unpause_stream(name) end |