Class: Cosmo::API::Stream
- Inherits:
-
Object
- Object
- Cosmo::API::Stream
- Includes:
- Enumerable
- Defined in:
- lib/cosmo/api/stream.rb
Constant Summary collapse
- LIMIT =
20
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Class Method Summary collapse
Instance Method Summary collapse
- #delete(seq) ⇒ Object
- #each ⇒ Object
- #info ⇒ Object
-
#initialize(name) ⇒ Stream
constructor
A new instance of Stream.
- #message(seq) ⇒ Object
- #messages(page: nil, limit: nil) ⇒ Object
- #offset(value) ⇒ Object
- #pause! ⇒ Object
- #paused? ⇒ Boolean
- #retries ⇒ Object
- #retry(seq) ⇒ Object
- #total ⇒ Object (also: #size)
- #unpause! ⇒ Object
Constructor Details
#initialize(name) ⇒ Stream
Returns a new instance of Stream.
28 29 30 |
# File 'lib/cosmo/api/stream.rb', line 28 def initialize(name) @name = name end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
26 27 28 |
# File 'lib/cosmo/api/stream.rb', line 26 def name @name end |
Class Method Details
.all ⇒ Object
12 13 14 |
# File 'lib/cosmo/api/stream.rb', line 12 def self.all client.list_streams.map { new(_1.dig("config", "name")) } end |
.client ⇒ Object
22 23 24 |
# File 'lib/cosmo/api/stream.rb', line 22 def self.client @client ||= Client.instance end |
.jobs ⇒ Object
16 17 18 19 20 |
# File 'lib/cosmo/api/stream.rb', line 16 def self.jobs client.list_streams.select { _1.dig("config", "metadata", "_cosmo.type") == "jobs" } .reject { %w[scheduled dead].include?(_1.dig("config", "name")) } .map { new(_1.dig("config", "name")) } end |
Instance Method Details
#delete(seq) ⇒ Object
105 106 107 |
# File 'lib/cosmo/api/stream.rb', line 105 def delete(seq) client.(name, seq) end |
#each ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/cosmo/api/stream.rb', line 50 def each return if total.zero? state = info[:state] current = @offset || state.first_seq.to_i last = state.last_seq.to_i loop do break if current > last job = (current) current += 1 next unless job yield job end end |
#info ⇒ Object
32 33 34 35 |
# File 'lib/cosmo/api/stream.rb', line 32 def info info = client.stream_info(name) { state: info.state, config: info.config } end |
#message(seq) ⇒ Object
88 89 90 91 92 93 94 95 |
# File 'lib/cosmo/api/stream.rb', line 88 def (seq) job = Job.new(name, client.(name, seq: seq, direct: true)) return if job.subject.to_s.start_with?(Cron::Entry::SUBJECT_PREFIX) job rescue NATS::JetStream::Error::NotFound # nop, acked/nacked end |
#messages(page: nil, limit: nil) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/cosmo/api/stream.rb', line 73 def (page: nil, limit: nil) jobs = [] limit = (limit || LIMIT).to_i state = info[:state] start = state.first_seq.to_i start += (page.to_i - 1) * limit if page offset(start).each do || jobs << break if jobs.size >= limit end jobs end |
#offset(value) ⇒ Object
68 69 70 71 |
# File 'lib/cosmo/api/stream.rb', line 68 def offset(value) @offset = value.to_i self end |
#pause! ⇒ Object
109 110 111 |
# File 'lib/cosmo/api/stream.rb', line 109 def pause! client.pause_stream(name) end |
#paused? ⇒ Boolean
117 118 119 |
# File 'lib/cosmo/api/stream.rb', line 117 def paused? client.stream_paused?(name) end |
#retries ⇒ Object
46 47 48 |
# File 'lib/cosmo/api/stream.rb', line 46 def retries client.list_consumers(name).sum { _1["num_redelivered"].to_i } end |
#retry(seq) ⇒ Object
97 98 99 100 101 102 103 |
# File 'lib/cosmo/api/stream.rb', line 97 def retry(seq) job = (seq) return unless job client.publish(job.x_subject, job..data) delete(seq) end |
#total ⇒ Object Also known as: size
37 38 39 40 41 42 43 |
# File 'lib/cosmo/api/stream.rb', line 37 def total all_msgs = info[:state]..to_i cron_count = Client.instance.cron_subjects_in_stream(name, "#{Cron::Entry::SUBJECT_PREFIX}.#{name}.>").size [all_msgs - cron_count, 0].max rescue NATS::Error 0 end |
#unpause! ⇒ Object
113 114 115 |
# File 'lib/cosmo/api/stream.rb', line 113 def unpause! client.unpause_stream(name) end |