Class: Cosmo::API::Busy
- Inherits:
-
Object
- Object
- Cosmo::API::Busy
- Defined in:
- lib/cosmo/api/busy.rb
Constant Summary collapse
- TTL =
70- HEARTBEAT =
30- BUCKET =
"cosmostats"
Class Method Summary collapse
Instance Method Summary collapse
- #add(message) ⇒ Object
- #delete(message) ⇒ Object
-
#initialize ⇒ Busy
constructor
A new instance of Busy.
- #list(limit: 25) ⇒ Object
- #size ⇒ Object
- #with(message) ⇒ Object
Constructor Details
Class Method Details
.instance ⇒ Object
12 13 14 |
# File 'lib/cosmo/api/busy.rb', line 12 def self.instance @instance ||= new end |
Instance Method Details
#add(message) ⇒ Object
28 29 30 31 32 33 34 |
# File 'lib/cosmo/api/busy.rb', line 28 def add() @thread ||= Thread.new { heartbeat_loop } seq = ..sequence.stream value = Utils::Json.dump({ data: .data, stream: ..stream, worker: worker_id, started_at: Time.now.to_i }) @messages[seq] = value @kv.set(seq, value) end |
#delete(message) ⇒ Object
36 37 38 39 40 |
# File 'lib/cosmo/api/busy.rb', line 36 def delete() seq = ..sequence.stream @messages.delete(seq) @kv.purge(seq) end |
#list(limit: 25) ⇒ Object
42 43 44 |
# File 'lib/cosmo/api/busy.rb', line 42 def list(limit: 25) @kv.keys(limit:).filter_map { Utils::Json.parse(@kv.get(_1)) }.map { _1.merge(data: Utils::Json.parse(_1[:data])) } end |
#size ⇒ Object
46 47 48 |
# File 'lib/cosmo/api/busy.rb', line 46 def size @kv.size end |
#with(message) ⇒ Object
21 22 23 24 25 26 |
# File 'lib/cosmo/api/busy.rb', line 21 def with() add() yield ensure delete() end |