Class: DispatchPolicy::ThrottleBucket

Inherits:
ApplicationRecord show all
Defined in:
app/models/dispatch_policy/throttle_bucket.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.lock(policy_name:, gate_name:, partition_key:, burst:) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'app/models/dispatch_policy/throttle_bucket.rb', line 7

def self.lock(policy_name:, gate_name:, partition_key:, burst:)
  now = Time.current
  seed_sql = <<~SQL.squish
    INSERT INTO #{quoted_table_name}
      (policy_name, gate_name, partition_key, tokens, refilled_at, created_at, updated_at)
    VALUES (?, ?, ?, ?, ?, ?, ?)
    ON CONFLICT (policy_name, gate_name, partition_key) DO NOTHING
  SQL
  connection.exec_update(
    sanitize_sql_array([
      seed_sql, policy_name, gate_name.to_s, partition_key.to_s,
      burst.to_f, now, now, now
    ])
  )

  where(policy_name: policy_name, gate_name: gate_name.to_s, partition_key: partition_key.to_s)
    .lock("FOR UPDATE")
    .first!
end

Instance Method Details

#consume(n = 1) ⇒ Object



35
36
37
38
39
# File 'app/models/dispatch_policy/throttle_bucket.rb', line 35

def consume(n = 1)
  return false if tokens < n
  self.tokens -= n
  true
end

#refill!(rate:, per:, burst:) ⇒ Object



27
28
29
30
31
32
33
# File 'app/models/dispatch_policy/throttle_bucket.rb', line 27

def refill!(rate:, per:, burst:)
  now = Time.current
  elapsed = (now - refilled_at).to_f
  new_tokens = tokens + (rate * elapsed / per)
  self.tokens = [ new_tokens, burst.to_f ].min
  self.refilled_at = now
end