Class: Cosmo::API::Stream

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/cosmo/api/stream.rb

Constant Summary collapse

LIMIT =
20

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name) ⇒ Stream

Returns a new instance of Stream.



28
29
30
# File 'lib/cosmo/api/stream.rb', line 28

def initialize(name)
  @name = name
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



26
27
28
# File 'lib/cosmo/api/stream.rb', line 26

def name
  @name
end

Class Method Details

.allObject



12
13
14
# File 'lib/cosmo/api/stream.rb', line 12

def self.all
  client.list_streams.map { new(_1.dig("config", "name")) }
end

.clientObject



22
23
24
# File 'lib/cosmo/api/stream.rb', line 22

def self.client
  @client ||= Client.instance
end

.jobsObject



16
17
18
19
20
# File 'lib/cosmo/api/stream.rb', line 16

def self.jobs
  client.list_streams.select { _1.dig("config", "metadata", "_cosmo.type") == "jobs" }
                     .reject { %w[scheduled dead].include?(_1.dig("config", "name")) }
                     .map { new(_1.dig("config", "name")) }
end

Instance Method Details

#delete(seq) ⇒ Object



105
106
107
# File 'lib/cosmo/api/stream.rb', line 105

def delete(seq)
  client.delete_message(name, seq)
end

#eachObject



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/cosmo/api/stream.rb', line 50

def each
  return if total.zero?

  state = info[:state]
  current = @offset || state.first_seq.to_i
  last = state.last_seq.to_i

  loop do
    break if current > last

    job = message(current)
    current += 1
    next unless job

    yield job
  end
end

#infoObject



32
33
34
35
# File 'lib/cosmo/api/stream.rb', line 32

def info
  info = client.stream_info(name)
  { state: info.state, config: info.config }
end

#message(seq) ⇒ Object



88
89
90
91
92
93
94
95
# File 'lib/cosmo/api/stream.rb', line 88

def message(seq)
  job = Job.new(name, client.get_message(name, seq: seq, direct: true))
  return if job.subject.to_s.start_with?(Cron::Entry::SUBJECT_PREFIX)

  job
rescue NATS::JetStream::Error::NotFound
  # nop, acked/nacked
end

#messages(page: nil, limit: nil) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/cosmo/api/stream.rb', line 73

def messages(page: nil, limit: nil)
  jobs = []
  limit = (limit || LIMIT).to_i
  state = info[:state]
  start = state.first_seq.to_i
  start += (page.to_i - 1) * limit if page

  offset(start).each do |message|
    jobs << message
    break if jobs.size >= limit
  end

  jobs
end

#offset(value) ⇒ Object



68
69
70
71
# File 'lib/cosmo/api/stream.rb', line 68

def offset(value)
  @offset = value.to_i
  self
end

#pause!Object



109
110
111
# File 'lib/cosmo/api/stream.rb', line 109

def pause!
  client.pause_stream(name)
end

#paused?Boolean

Returns:

  • (Boolean)


117
118
119
# File 'lib/cosmo/api/stream.rb', line 117

def paused?
  client.stream_paused?(name)
end

#retriesObject



46
47
48
# File 'lib/cosmo/api/stream.rb', line 46

def retries
  client.list_consumers(name).sum { _1["num_redelivered"].to_i }
end

#retry(seq) ⇒ Object



97
98
99
100
101
102
103
# File 'lib/cosmo/api/stream.rb', line 97

def retry(seq)
  job = message(seq)
  return unless job

  client.publish(job.x_subject, job.message.data)
  delete(seq)
end

#totalObject Also known as: size



37
38
39
40
41
42
43
# File 'lib/cosmo/api/stream.rb', line 37

def total
  all_msgs = info[:state].messages.to_i
  cron_count = Client.instance.cron_subjects_in_stream(name, "#{Cron::Entry::SUBJECT_PREFIX}.#{name}.>").size
  [all_msgs - cron_count, 0].max
rescue NATS::Error
  0
end

#unpause!Object



113
114
115
# File 'lib/cosmo/api/stream.rb', line 113

def unpause!
  client.unpause_stream(name)
end