Class: Cosmo::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/cosmo/client.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(nats_url: ENV.fetch("NATS_URL", "nats://localhost:4222")) ⇒ Client

Returns a new instance of Client.



14
15
16
17
18
19
# File 'lib/cosmo/client.rb', line 14

def initialize(nats_url: ENV.fetch("NATS_URL", "nats://localhost:4222"))
  Logger.debug "Connecting to NATS server at #{nats_url}..."
  @nc = NATS.connect(nats_url)
  Logger.debug "Connection established"
  @js = @nc.jetstream
end

Instance Attribute Details

#jsObject (readonly)

Returns the value of attribute js.



12
13
14
# File 'lib/cosmo/client.rb', line 12

def js
  @js
end

#ncObject (readonly)

Returns the value of attribute nc.



12
13
14
# File 'lib/cosmo/client.rb', line 12

def nc
  @nc
end

Class Method Details

.instanceObject



8
9
10
# File 'lib/cosmo/client.rb', line 8

def self.instance
  @instance ||= Client.new
end

Instance Method Details

#closeObject



128
129
130
# File 'lib/cosmo/client.rb', line 128

def close
  nc.close
end

#consumer_info(stream_name, consumer_name) ⇒ Object



100
101
102
# File 'lib/cosmo/client.rb', line 100

def consumer_info(stream_name, consumer_name)
  js.consumer_info(stream_name, consumer_name)
end

#create_stream(name, config) ⇒ Object



33
34
35
# File 'lib/cosmo/client.rb', line 33

def create_stream(name, config)
  js.add_stream(name: name, **config)
end

#cron_subjects_in_stream(stream_name, filter) ⇒ Array<String>

Return all subjects in stream_name that match filter using NATS’s subjects_filter on STREAM.INFO (requires NATS ≥ 2.9).

Returns:

  • (Array<String>)


57
58
59
60
61
62
63
64
# File 'lib/cosmo/client.rb', line 57

def cron_subjects_in_stream(stream_name, filter)
  payload = Utils::Json.dump({ subjects_filter: filter })
  resp = nc.request("$JS.API.STREAM.INFO.#{stream_name}", payload)
  data = Utils::Json.parse(resp.data, symbolize_names: false)
  (data&.dig("state", "subjects") || {}).keys
rescue StandardError
  []
end

#delete_message(name, seq) ⇒ Object



108
109
110
111
# File 'lib/cosmo/client.rb', line 108

def delete_message(name, seq)
  response = nc.request("$JS.API.STREAM.MSG.DELETE.#{name}", JSON.dump({ seq: seq }))
  Utils::Json.parse(response.data, symbolize_names: false)
end

#delete_stream(name, params = {}) ⇒ Object



37
38
39
# File 'lib/cosmo/client.rb', line 37

def delete_stream(name, params = {})
  js.delete_stream(name, params)
end

#get_message(name, **options) ⇒ Object



104
105
106
# File 'lib/cosmo/client.rb', line 104

def get_message(name, **options)
  js.get_msg(name, **options)
end

#kv(name, allow_msg_ttl: false, **options) ⇒ Object



122
123
124
125
126
# File 'lib/cosmo/client.rb', line 122

def kv(name, allow_msg_ttl: false, **options)
  js.key_value(name)
rescue NATS::KeyValue::BucketNotFoundError
  allow_msg_ttl ? create_kv_with_msg_ttl(name, **options) : js.create_key_value({ bucket: name }.merge(options))
end

#list_consumers(stream_name) ⇒ Object



94
95
96
97
98
# File 'lib/cosmo/client.rb', line 94

def list_consumers(stream_name)
  response = nc.request("$JS.API.CONSUMER.LIST.#{stream_name}", "")
  data = Utils::Json.parse(response.data, default: {}, symbolize_names: false)
  Array(data["consumers"])
end

#list_streamsObject



66
67
68
69
70
71
72
# File 'lib/cosmo/client.rb', line 66

def list_streams
  response = nc.request("$JS.API.STREAM.LIST", "")
  data = Utils::Json.parse(response.data, symbolize_names: false)
  return [] if data.nil? || data["streams"].nil?

  data["streams"]
end

#pause_stream(name) ⇒ Object



74
75
76
77
78
79
# File 'lib/cosmo/client.rb', line 74

def pause_stream(name)
  config = stream_info(name).config.to_h
  config[:metadata] ||= {}
  config[:metadata][:"_cosmo.paused"] = "true"
  update_stream(name, config)
end

#publish(subject, payload, **params) ⇒ Object



21
22
23
# File 'lib/cosmo/client.rb', line 21

def publish(subject, payload, **params)
  js.publish(subject, payload, **params)
end

#purge(stream_name, subject) ⇒ Object

Raises:

  • (NATS::JetStream::Error)


113
114
115
116
117
118
119
120
# File 'lib/cosmo/client.rb', line 113

def purge(stream_name, subject)
  payload = subject ? Utils::Json.dump({ filter: subject }) : ""
  response = @nc.request("$JS.API.STREAM.PURGE.#{stream_name}", payload)
  result = Utils::Json.parse(response.data, default: {}, symbolize_names: false)
  raise NATS::JetStream::Error, result.dig("error", "description") if result["error"]

  result["purged"] # number of messages purged
end

#setup_stream(name, config) ⇒ Object

Create/update a stream, falling back to create when there’s no stream.

Parameters:

  • name (String)

    Stream name

  • config (Hash)

    Full desired stream configuration



48
49
50
51
52
# File 'lib/cosmo/client.rb', line 48

def setup_stream(name, config)
  update_stream(name, config)
rescue NATS::JetStream::Error::StreamNotFound
  create_stream(name, config)
end

#stream_info(name) ⇒ Object



29
30
31
# File 'lib/cosmo/client.rb', line 29

def stream_info(name)
  js.stream_info(name)
end

#stream_paused?(name) ⇒ Boolean

Returns:

  • (Boolean)


88
89
90
91
92
# File 'lib/cosmo/client.rb', line 88

def stream_paused?(name)
  stream_info(name).config.&.[](:"_cosmo.paused") == "true"
rescue NATS::IO::Timeout
  false
end

#subscribe(subject, consumer_name, config) ⇒ Object



25
26
27
# File 'lib/cosmo/client.rb', line 25

def subscribe(subject, consumer_name, config)
  js.pull_subscribe(subject, consumer_name, config: config)
end

#unpause_stream(name) ⇒ Object



81
82
83
84
85
86
# File 'lib/cosmo/client.rb', line 81

def unpause_stream(name)
  config = stream_info(name).config.to_h
  config[:metadata] ||= {}
  config[:metadata].delete(:"_cosmo.paused")
  update_stream(name, config)
end

#update_stream(name, config) ⇒ Object



41
42
43
# File 'lib/cosmo/client.rb', line 41

def update_stream(name, config)
  js.update_stream(name: name, **config)
end