Class: DispatchPolicy::PartitionInflightCount

Inherits:
ApplicationRecord show all
Defined in:
app/models/dispatch_policy/partition_inflight_count.rb

Class Method Summary collapse

Class Method Details

.decrement(policy_name:, gate_name:, partition_key:, by: 1) ⇒ Object



35
36
37
38
39
40
# File 'app/models/dispatch_policy/partition_inflight_count.rb', line 35

def self.decrement(policy_name:, gate_name:, partition_key:, by: 1)
  where(policy_name: policy_name, gate_name: gate_name.to_s, partition_key: partition_key.to_s)
    .update_all([
      "in_flight = GREATEST(in_flight - ?, 0), updated_at = ?", by, Time.current
    ])
end

.fetch_many(policy_name:, gate_name:, partition_keys:) ⇒ Object



7
8
9
10
11
12
13
# File 'app/models/dispatch_policy/partition_inflight_count.rb', line 7

def self.fetch_many(policy_name:, gate_name:, partition_keys:)
  return {} if partition_keys.empty?

  where(policy_name: policy_name, gate_name: gate_name.to_s, partition_key: partition_keys)
    .pluck(:partition_key, :in_flight).to_h
    .tap { |h| partition_keys.each { |k| h[k] ||= 0 } }
end

.increment(policy_name:, gate_name:, partition_key:, by: 1) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'app/models/dispatch_policy/partition_inflight_count.rb', line 19

def self.increment(policy_name:, gate_name:, partition_key:, by: 1)
  now = Time.current
  sql = <<~SQL.squish
    INSERT INTO #{quoted_table_name}
      (policy_name, gate_name, partition_key, in_flight, created_at, updated_at)
    VALUES (?, ?, ?, ?, ?, ?)
    ON CONFLICT (policy_name, gate_name, partition_key)
    DO UPDATE SET
      in_flight  = #{quoted_table_name}.in_flight + EXCLUDED.in_flight,
      updated_at = EXCLUDED.updated_at
  SQL
  connection.exec_update(
    sanitize_sql_array([ sql, policy_name, gate_name.to_s, partition_key.to_s, by, now, now ])
  )
end

.total_for(policy_name:, gate_name:) ⇒ Object



15
16
17
# File 'app/models/dispatch_policy/partition_inflight_count.rb', line 15

def self.total_for(policy_name:, gate_name:)
  where(policy_name: policy_name, gate_name: gate_name.to_s).sum(:in_flight)
end