Class: JRPC::SharedClient::OutboundQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/jrpc/shared_client/outbound_queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(capacity: nil) ⇒ OutboundQueue

Returns a new instance of OutboundQueue.



6
7
8
9
10
11
# File 'lib/jrpc/shared_client/outbound_queue.rb', line 6

def initialize(capacity: nil)
  @mutex = Mutex.new
  @arr = []
  @capacity = capacity
  @closed = false
end

Instance Method Details

#close_and_drainObject

Sets closed = true, returns and clears all remaining tickets. Idempotent: returns [] on second call.



63
64
65
66
67
68
# File 'lib/jrpc/shared_client/outbound_queue.rb', line 63

def close_and_drain
  @mutex.synchronize do
    @closed = true
    @arr.slice!(0..)
  end
end

#delete(ticket) ⇒ Object

Removes by object identity. Returns true if removed, false if not found.



35
36
37
38
39
40
41
42
43
# File 'lib/jrpc/shared_client/outbound_queue.rb', line 35

def delete(ticket)
  @mutex.synchronize do
    idx = @arr.index { |t| t.equal?(ticket) }
    return false unless idx

    @arr.delete_at(idx)
    true
  end
end

#each_snapshotObject

Yields each ticket from a snapshot; does not hold the mutex during the block.



29
30
31
32
# File 'lib/jrpc/shared_client/outbound_queue.rb', line 29

def each_snapshot(&)
  snapshot = @mutex.synchronize { @arr.dup }
  snapshot.each(&)
end

#earliest_deadlineObject

Returns the earliest expires_at across all queued tickets, or nil.



46
47
48
49
50
51
# File 'lib/jrpc/shared_client/outbound_queue.rb', line 46

def earliest_deadline
  @mutex.synchronize do
    deadlines = @arr.filter_map(&:expires_at)
    deadlines.min
  end
end

#empty?Boolean

Returns:

  • (Boolean)


53
54
55
# File 'lib/jrpc/shared_client/outbound_queue.rb', line 53

def empty?
  @mutex.synchronize { @arr.empty? }
end

#pop_nonblockObject

Returns next Ticket or nil if empty.



24
25
26
# File 'lib/jrpc/shared_client/outbound_queue.rb', line 24

def pop_nonblock
  @mutex.synchronize { @arr.shift }
end

#push_nonblock(ticket) ⇒ Object

Raises ClientError(“queue full”) or ClientError(“queue closed”) on failure.



14
15
16
17
18
19
20
21
# File 'lib/jrpc/shared_client/outbound_queue.rb', line 14

def push_nonblock(ticket)
  @mutex.synchronize do
    raise Errors::ClientError, 'queue closed' if @closed
    raise Errors::ClientError, 'queue full' if @capacity && @arr.size >= @capacity

    @arr << ticket
  end
end

#sizeObject



57
58
59
# File 'lib/jrpc/shared_client/outbound_queue.rb', line 57

def size
  @mutex.synchronize { @arr.size }
end