Class: Upkeep::Delivery::Transport

Inherits:
Object
  • Object
show all
Defined in:
lib/upkeep/delivery/transport.rb

Defined Under Namespace

Classes: Cleanup, Connection, DispatchReport, Outcome, RetryItem

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_queue_size: 100, retry_limit: 3) ⇒ Transport

Returns a new instance of Transport.



121
122
123
124
125
# File 'lib/upkeep/delivery/transport.rb', line 121

def initialize(max_queue_size: 100, retry_limit: 3)
  @max_queue_size = max_queue_size
  @retry_limit = retry_limit
  @connections = {}
end

Class Method Details

.envelope_digest(envelope) ⇒ Object



116
117
118
# File 'lib/upkeep/delivery/transport.rb', line 116

def envelope_digest(envelope)
  Digest::SHA256.hexdigest(envelope.body)
end

Instance Method Details

#connect(subscriber_id:, adapter:) ⇒ Object



127
128
129
130
131
132
133
134
# File 'lib/upkeep/delivery/transport.rb', line 127

def connect(subscriber_id:, adapter:)
  Connection.new(
    subscriber_id: subscriber_id,
    adapter: adapter,
    max_queue_size: max_queue_size,
    retry_limit: retry_limit
  ).tap { |connection| connections[subscriber_id] = connection }
end

#connected?(subscriber_id) ⇒ Boolean

Returns:

  • (Boolean)


157
158
159
# File 'lib/upkeep/delivery/transport.rb', line 157

def connected?(subscriber_id)
  connections.key?(subscriber_id)
end

#deliver(batch) ⇒ Object



143
144
145
# File 'lib/upkeep/delivery/transport.rb', line 143

def deliver(batch)
  DispatchReport.new(batch.envelopes.map { |envelope| deliver_envelope(envelope) })
end

#disconnect(subscriber_id) ⇒ Object



136
137
138
139
140
141
# File 'lib/upkeep/delivery/transport.rb', line 136

def disconnect(subscriber_id)
  connection = connections.delete(subscriber_id)
  dropped = connection&.disconnect || 0

  Cleanup.new(subscriber_id, :disconnected, dropped)
end

#retry_pending(subscriber_id: nil) ⇒ Object



147
148
149
150
151
152
153
154
155
# File 'lib/upkeep/delivery/transport.rb', line 147

def retry_pending(subscriber_id: nil)
  selected_connections = if subscriber_id
    Array(connections[subscriber_id])
  else
    connections.values
  end

  DispatchReport.new(selected_connections.flat_map(&:retry_pending))
end

#summaryObject



161
162
163
164
165
166
167
168
# File 'lib/upkeep/delivery/transport.rb', line 161

def summary
  {
    connections: connections.size,
    queued_envelopes: connections.values.sum(&:queue_depth),
    max_queue_size: max_queue_size,
    retry_limit: retry_limit
  }
end