Class: JRPC::SharedClient::OutboundQueue
- Inherits:
-
Object
- Object
- JRPC::SharedClient::OutboundQueue
- Defined in:
- lib/jrpc/shared_client/outbound_queue.rb
Instance Method Summary collapse
-
#close_and_drain ⇒ Object
Sets closed = true, returns and clears all remaining tickets.
-
#delete(ticket) ⇒ Object
Removes by object identity.
-
#each_snapshot ⇒ Object
Yields each ticket from a snapshot; does not hold the mutex during the block.
-
#earliest_deadline ⇒ Object
Returns the earliest expires_at across all queued tickets, or nil.
- #empty? ⇒ Boolean
-
#initialize(capacity: nil) ⇒ OutboundQueue
constructor
A new instance of OutboundQueue.
-
#pop_nonblock ⇒ Object
Returns next Ticket or nil if empty.
-
#push_nonblock(ticket) ⇒ Object
Raises ClientError(“queue full”) or ClientError(“queue closed”) on failure.
- #size ⇒ Object
Constructor Details
#initialize(capacity: nil) ⇒ OutboundQueue
Returns a new instance of OutboundQueue.
6 7 8 9 10 11 |
# File 'lib/jrpc/shared_client/outbound_queue.rb', line 6 def initialize(capacity: nil) @mutex = Mutex.new @arr = [] @capacity = capacity @closed = false end |
Instance Method Details
#close_and_drain ⇒ Object
Sets closed = true, returns and clears all remaining tickets. Idempotent: returns [] on second call.
63 64 65 66 67 68 |
# File 'lib/jrpc/shared_client/outbound_queue.rb', line 63 def close_and_drain @mutex.synchronize do @closed = true @arr.slice!(0..) end end |
#delete(ticket) ⇒ Object
Removes by object identity. Returns true if removed, false if not found.
35 36 37 38 39 40 41 42 43 |
# File 'lib/jrpc/shared_client/outbound_queue.rb', line 35 def delete(ticket) @mutex.synchronize do idx = @arr.index { |t| t.equal?(ticket) } return false unless idx @arr.delete_at(idx) true end end |
#each_snapshot ⇒ Object
Yields each ticket from a snapshot; does not hold the mutex during the block.
29 30 31 32 |
# File 'lib/jrpc/shared_client/outbound_queue.rb', line 29 def each_snapshot(&) snapshot = @mutex.synchronize { @arr.dup } snapshot.each(&) end |
#earliest_deadline ⇒ Object
Returns the earliest expires_at across all queued tickets, or nil.
46 47 48 49 50 51 |
# File 'lib/jrpc/shared_client/outbound_queue.rb', line 46 def earliest_deadline @mutex.synchronize do deadlines = @arr.filter_map(&:expires_at) deadlines.min end end |
#empty? ⇒ Boolean
53 54 55 |
# File 'lib/jrpc/shared_client/outbound_queue.rb', line 53 def empty? @mutex.synchronize { @arr.empty? } end |
#pop_nonblock ⇒ Object
Returns next Ticket or nil if empty.
24 25 26 |
# File 'lib/jrpc/shared_client/outbound_queue.rb', line 24 def pop_nonblock @mutex.synchronize { @arr.shift } end |
#push_nonblock(ticket) ⇒ Object
Raises ClientError(“queue full”) or ClientError(“queue closed”) on failure.
14 15 16 17 18 19 20 21 |
# File 'lib/jrpc/shared_client/outbound_queue.rb', line 14 def push_nonblock(ticket) @mutex.synchronize do raise Errors::ClientError, 'queue closed' if @closed raise Errors::ClientError, 'queue full' if @capacity && @arr.size >= @capacity @arr << ticket end end |
#size ⇒ Object
57 58 59 |
# File 'lib/jrpc/shared_client/outbound_queue.rb', line 57 def size @mutex.synchronize { @arr.size } end |