Class: Cosmo::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/cosmo/client.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(nats_url: ENV.fetch("NATS_URL", "nats://localhost:4222")) ⇒ Client

Returns a new instance of Client.



14
15
16
17
18
19
# File 'lib/cosmo/client.rb', line 14

def initialize(nats_url: ENV.fetch("NATS_URL", "nats://localhost:4222"))
  Logger.debug "Connecting to NATS server at #{nats_url}..."
  @nc = NATS.connect(nats_url)
  Logger.debug "Connection established"
  @js = @nc.jetstream
end

Instance Attribute Details

#jsObject (readonly)

Returns the value of attribute js.



12
13
14
# File 'lib/cosmo/client.rb', line 12

def js
  @js
end

#ncObject (readonly)

Returns the value of attribute nc.



12
13
14
# File 'lib/cosmo/client.rb', line 12

def nc
  @nc
end

Class Method Details

.instanceObject



8
9
10
# File 'lib/cosmo/client.rb', line 8

def self.instance
  @instance ||= Client.new
end

Instance Method Details

#closeObject



83
84
85
# File 'lib/cosmo/client.rb', line 83

def close
  nc.close
end

#consumer_info(stream_name, consumer_name) ⇒ Object



55
56
57
# File 'lib/cosmo/client.rb', line 55

def consumer_info(stream_name, consumer_name)
  js.consumer_info(stream_name, consumer_name)
end

#create_stream(name, config) ⇒ Object



33
34
35
# File 'lib/cosmo/client.rb', line 33

def create_stream(name, config)
  js.add_stream(name: name, **config)
end

#delete_message(name, seq) ⇒ Object



63
64
65
66
# File 'lib/cosmo/client.rb', line 63

def delete_message(name, seq)
  response = nc.request("$JS.API.STREAM.MSG.DELETE.#{name}", JSON.dump({ seq: seq }))
  Utils::Json.parse(response.data, symbolize_names: false)
end

#delete_stream(name, params = {}) ⇒ Object



37
38
39
# File 'lib/cosmo/client.rb', line 37

def delete_stream(name, params = {})
  js.delete_stream(name, params)
end

#get_message(name, **options) ⇒ Object



59
60
61
# File 'lib/cosmo/client.rb', line 59

def get_message(name, **options)
  js.get_msg(name, **options)
end

#kv(name, **options) ⇒ Object



77
78
79
80
81
# File 'lib/cosmo/client.rb', line 77

def kv(name, **options)
  js.key_value(name)
rescue NATS::KeyValue::BucketNotFoundError
  js.create_key_value({ bucket: name }.merge(options))
end

#list_consumers(stream_name) ⇒ Object



49
50
51
52
53
# File 'lib/cosmo/client.rb', line 49

def list_consumers(stream_name)
  response = nc.request("$JS.API.CONSUMER.LIST.#{stream_name}", "")
  data = Utils::Json.parse(response.data, symbolize_names: false)
  data["consumers"]
end

#list_streamsObject



41
42
43
44
45
46
47
# File 'lib/cosmo/client.rb', line 41

def list_streams
  response = nc.request("$JS.API.STREAM.LIST", "")
  data = Utils::Json.parse(response.data, symbolize_names: false)
  return [] if data.nil? || data["streams"].nil?

  data["streams"].filter_map { _1.dig("config", "name") }
end

#publish(subject, payload, **params) ⇒ Object



21
22
23
# File 'lib/cosmo/client.rb', line 21

def publish(subject, payload, **params)
  js.publish(subject, payload, **params)
end

#purge(stream_name, subject) ⇒ Object

Raises:

  • (NATS::JetStream::Error)


68
69
70
71
72
73
74
75
# File 'lib/cosmo/client.rb', line 68

def purge(stream_name, subject)
  payload = subject ? Utils::Json.dump({ filter: subject }) : ""
  response = @nc.request("$JS.API.STREAM.PURGE.#{stream_name}", payload)
  result = Utils::Json.parse(response.data, default: {}, symbolize_names: false)
  raise NATS::JetStream::Error, result.dig("error", "description") if result["error"]

  result["purged"] # number of messages purged
end

#stream_info(name) ⇒ Object



29
30
31
# File 'lib/cosmo/client.rb', line 29

def stream_info(name)
  js.stream_info(name)
end

#subscribe(subject, consumer_name, config) ⇒ Object



25
26
27
# File 'lib/cosmo/client.rb', line 25

def subscribe(subject, consumer_name, config)
  js.pull_subscribe(subject, consumer_name, config: config)
end