Class: SpreeCmCommissioner::InventoryHolds::Acquire

Inherits:
Object
  • Object
show all
Extended by:
ServiceModuleThrowable
Includes:
Spree::ServiceModule::Base
Defined in:
app/services/spree_cm_commissioner/inventory_holds/acquire.rb

Constant Summary collapse

AcquireRedisDeductionError =
Class.new(StandardError)
AcquireHoldSaveError =
Class.new(StandardError)
AcquireHoldLockConflictError =
Class.new(StandardError)
AcquireInsufficientStockError =
Class.new(StandardError)
AcquireLimitExceededError =
Class.new(StandardError)

Instance Method Summary collapse

Methods included from ServiceModuleThrowable

call!

Instance Method Details

#acquire_hold_for_order!(order, line_items, expires_at) ⇒ Object

Acquires active hold for the order, or returns existing if active.

Creates new hold with :active status and deducts from Redis stock. On DB save failure: transaction rolls back. On Redis failure: transaction rolls back hold (Redis untouched, no manual release needed).

Risk: Redis deduction could orphan if exception raised after Redis commit but before DB. Accepted—advisory lock ensures single process per order prevents impact.



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'app/services/spree_cm_commissioner/inventory_holds/acquire.rb', line 66

def acquire_hold_for_order!(order, line_items, expires_at)
  hold = SpreeCmCommissioner::InventoryHold.find_or_initialize_by(order: order)
  return hold if hold.persisted? && hold.active?
  return hold if hold.persisted? && hold.payment_locked?

  SpreeCmCommissioner::InventoryHold.transaction do
    begin
      hold.assign_attributes(
        user: order.user,
        status: :active,
        held_at: hold.held_at || Time.current,
        expires_at: expires_at,
        payment_locked_at: nil,
        release_reason: nil,
        total_quantity_on_hold: line_items.sum(&:quantity),
        ip_address: order.last_ip_address
      )

      hold.save!
    rescue ActiveRecord::RecordInvalid, ActiveRecord::RecordNotUnique
      raise AcquireHoldSaveError, I18n.t('inventory_hold.acquire.db_save_error')
    end

    begin
      positive = acquire_redis_stock!(line_items)
      raise AcquireInsufficientStockError, I18n.t('inventory_hold.acquire.insufficient_stock') unless positive
    rescue Redis::BaseError, Redis::CommandError, IOError
      raise AcquireRedisDeductionError, I18n.t('inventory_hold.acquire.redis_error')
    end

    hold
  end
end

#acquire_lua_scriptObject

Lua script atomically reserves inventory across items:

  1. Validate: Ensure available stock (stock - held) >= quantity for ALL items

  2. Commit: If all pass, increment hold counters atomically

  3. Return: 1 (success) or 0 (failure/insufficient stock)

Atomicity prevents overselling and race conditions.



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'app/services/spree_cm_commissioner/inventory_holds/acquire.rb', line 158

def acquire_lua_script
  <<~LUA
    -- KEYS = [stock_key_1, hold_key_1, stock_key_2, hold_key_2, ...]
    -- ARGV = [qty_1, qty_2, ...]
    for i = 1, #ARGV do
      local stock = tonumber(redis.call('GET', KEYS[(i-1)*2+1]) or 0)
      local held  = tonumber(redis.call('GET', KEYS[(i-1)*2+2]) or 0)
      if stock - held < tonumber(ARGV[i]) then return 0 end
    end
    for i = 1, #ARGV do
      redis.call('INCRBY', KEYS[(i-1)*2+2], tonumber(ARGV[i]))
    end
    return 1
  LUA
end

#acquire_redis_stock!(line_items) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'app/services/spree_cm_commissioner/inventory_holds/acquire.rb', line 100

def acquire_redis_stock!(line_items)
  stock_keys = []
  hold_keys = []
  quantities = []
  inventory_items = []

  line_items.each do |li|
    li.inventory_items.active.each do |item|
      stock_keys << item.redis_key
      hold_keys << item.redis_hold_key
      quantities << li.quantity
      inventory_items << item
    end
  end

  return true if hold_keys.empty?

  warm_redis_keys!(inventory_items)
  interleaved_keys = stock_keys.zip(hold_keys).flatten

  result = SpreeCmCommissioner.inventory_redis_pool.with do |redis|
    redis.eval(acquire_lua_script, keys: interleaved_keys, argv: quantities)
  end

  result.positive?
end

#call(order:, expires_at:) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'app/services/spree_cm_commissioner/inventory_holds/acquire.rb', line 15

def call(order:, expires_at:)
  line_items = order.line_items.select(&:should_hold_inventory?)
  return success(nil) if line_items.empty?

  result = SpreeCmCommissioner::InventoryHolds::ValidateLimits.call(order: order)
  raise AcquireLimitExceededError, result.error.to_s unless result.success?

  with_advisory_lock!(order) do
    hold = acquire_hold_for_order!(order, line_items, expires_at)

    enqueue_release_job(hold)
    enqueue_sync_inventory_on_hold(order)

    CmAppLogger.log(
      label: "#{self.class.name}#call success",
      data: { order_id: order.id, hold_id: hold.id }
    )

    success(hold)
  end
rescue StandardError => e
  error = { error_type: e.class.name.demodulize, order_id: order.id, message: e.message }
  CmAppLogger.error(label: "#{self.class.name}#call failed", data: error)
  failure(nil, e.message)
end

#enqueue_release_job(hold) ⇒ Object



131
132
133
# File 'app/services/spree_cm_commissioner/inventory_holds/acquire.rb', line 131

def enqueue_release_job(hold)
  SpreeCmCommissioner::InventoryHolds::ReleaseJob.set(wait_until: hold.expires_at).perform_later(hold_id: hold.id)
end

#enqueue_sync_inventory_on_hold(order) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'app/services/spree_cm_commissioner/inventory_holds/acquire.rb', line 135

def enqueue_sync_inventory_on_hold(order)
  inventory_id_and_quantities = order.line_items.flat_map do |line_item|
    next [] unless line_item.should_hold_inventory?

    line_item.inventory_items.active.map do |inventory_item|
      {
        inventory_id: inventory_item.id,
        quantity: line_item.quantity
      }
    end
  end

  SpreeCmCommissioner::InventoryItems::BulkAdjustQuantitiesOnHoldJob.perform_later(
    order_id: order.id,
    inventory_id_and_quantities: inventory_id_and_quantities
  )
end

#warm_redis_keys!(inventory_items) ⇒ Object



127
128
129
# File 'app/services/spree_cm_commissioner/inventory_holds/acquire.rb', line 127

def warm_redis_keys!(inventory_items)
  SpreeCmCommissioner::RedisStock::CachedInventoryItemsBuilder.new(inventory_items).call
end

#with_advisory_lock!(order) ⇒ Object

Uses PostgreSQL advisory locks to ensure only one process per order can acquire a hold at a time.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'app/services/spree_cm_commissioner/inventory_holds/acquire.rb', line 42

def with_advisory_lock!(order)
  lock_id = Zlib.crc32("spree_cm_commissioner:inventory_hold_acquire:#{order.id}")

  # select_value returns a Ruby boolean (pg adapter >= 1.0) or string "t"/"f" (older adapters).
  # Explicitly check both forms so lock failures are never silently treated as success.
  locked = [true, 't'].include?(ActiveRecord::Base.connection.select_value("SELECT pg_try_advisory_lock(#{lock_id})"))

  raise AcquireHoldLockConflictError, I18n.t('inventory_hold.acquire.lock_timeout') unless locked

  begin
    yield
  ensure
    ActiveRecord::Base.connection.execute("SELECT pg_advisory_unlock(#{lock_id})")
  end
end