Class: Cosmo::API::Busy

Inherits:
Object
  • Object
show all
Defined in:
lib/cosmo/api/busy.rb

Constant Summary collapse

TTL =
70
HEARTBEAT =
30
BUCKET =
"cosmostats"

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBusy

Returns a new instance of Busy.



16
17
18
19
# File 'lib/cosmo/api/busy.rb', line 16

def initialize
  @messages = {}
  @kv = KV.new(BUCKET, { ttl: TTL })
end

Class Method Details

.instanceObject



12
13
14
# File 'lib/cosmo/api/busy.rb', line 12

def self.instance
  @instance ||= new
end

Instance Method Details

#add(message) ⇒ Object



28
29
30
31
32
33
34
# File 'lib/cosmo/api/busy.rb', line 28

def add(message)
  @thread ||= Thread.new { heartbeat_loop }
  seq = message..sequence.stream
  value = Utils::Json.dump({ data: message.data, stream: message..stream, worker: worker_id, started_at: Time.now.to_i })
  @messages[seq] = value
  @kv.set(seq, value)
end

#delete(message) ⇒ Object



36
37
38
39
40
# File 'lib/cosmo/api/busy.rb', line 36

def delete(message)
  seq = message..sequence.stream
  @messages.delete(seq)
  @kv.purge(seq)
end

#list(limit: 25) ⇒ Object



42
43
44
# File 'lib/cosmo/api/busy.rb', line 42

def list(limit: 25)
  @kv.keys(limit:).filter_map { Utils::Json.parse(@kv.get(_1)) }.map { _1.merge(data: Utils::Json.parse(_1[:data])) }
end

#sizeObject



46
47
48
# File 'lib/cosmo/api/busy.rb', line 46

def size
  @kv.size
end

#with(message) ⇒ Object



21
22
23
24
25
26
# File 'lib/cosmo/api/busy.rb', line 21

def with(message)
  add(message)
  yield
ensure
  delete(message)
end