Class: Upkeep::Delivery::BroadcastTransport

Inherits:
Object
  • Object
show all
Defined in:
lib/upkeep/delivery/broadcast_transport.rb

Instance Method Summary collapse

Constructor Details

#initialize(adapter: ActionCableAdapter.new, max_queue_size: 100, retry_limit: 3) ⇒ BroadcastTransport

Returns a new instance of BroadcastTransport.



6
7
8
9
10
11
12
# File 'lib/upkeep/delivery/broadcast_transport.rb', line 6

def initialize(adapter: ActionCableAdapter.new, max_queue_size: 100, retry_limit: 3)
  @adapter = adapter
  @max_queue_size = max_queue_size
  @retry_limit = retry_limit
  @adapter_overrides = {}
  @queue = []
end

Instance Method Details

#connect(subscriber_id:, adapter:) ⇒ Object



14
15
16
# File 'lib/upkeep/delivery/broadcast_transport.rb', line 14

def connect(subscriber_id:, adapter:)
  adapter_overrides[subscriber_id] = adapter
end

#connected?(subscriber_id) ⇒ Boolean

Returns:

  • (Boolean)


39
40
41
# File 'lib/upkeep/delivery/broadcast_transport.rb', line 39

def connected?(subscriber_id)
  adapter_overrides.key?(subscriber_id)
end

#deliver(batch) ⇒ Object



26
27
28
# File 'lib/upkeep/delivery/broadcast_transport.rb', line 26

def deliver(batch)
  Transport::DispatchReport.new(batch.envelopes.map { |envelope| deliver_envelope(envelope, attempts: 0) })
end

#disconnect(subscriber_id) ⇒ Object



18
19
20
21
22
23
24
# File 'lib/upkeep/delivery/broadcast_transport.rb', line 18

def disconnect(subscriber_id)
  adapter_overrides.delete(subscriber_id)
  retained, dropped = queue.partition { |item| item.envelope.subscriber_id != subscriber_id }
  @queue = retained

  Transport::Cleanup.new(subscriber_id, :disconnected, dropped.size)
end

#retry_pending(subscriber_id: nil) ⇒ Object



30
31
32
33
34
35
36
37
# File 'lib/upkeep/delivery/broadcast_transport.rb', line 30

def retry_pending(subscriber_id: nil)
  selected, retained = queue.partition do |item|
    subscriber_id.nil? || item.envelope.subscriber_id == subscriber_id
  end
  @queue = retained

  Transport::DispatchReport.new(selected.map { |item| deliver_envelope(item.envelope, attempts: item.attempts) })
end

#summaryObject



43
44
45
46
47
48
49
50
# File 'lib/upkeep/delivery/broadcast_transport.rb', line 43

def summary
  {
    adapter_overrides: adapter_overrides.size,
    queued_envelopes: queue.size,
    max_queue_size: max_queue_size,
    retry_limit: retry_limit
  }
end