Class: SpreeCmCommissioner::InventoryHolds::Acquire
- Inherits:
-
Object
- Object
- SpreeCmCommissioner::InventoryHolds::Acquire
- Extended by:
- ServiceModuleThrowable
- Includes:
- Spree::ServiceModule::Base
- Defined in:
- app/services/spree_cm_commissioner/inventory_holds/acquire.rb
Constant Summary collapse
- AcquireRedisDeductionError =
Class.new(StandardError)
- AcquireHoldSaveError =
Class.new(StandardError)
- AcquireHoldLockConflictError =
Class.new(StandardError)
- AcquireInsufficientStockError =
Class.new(StandardError)
- AcquireLimitExceededError =
Class.new(StandardError)
Instance Method Summary collapse
-
#acquire_hold_for_order!(order, line_items, expires_at) ⇒ Object
Acquires active hold for the order, or returns existing if active.
-
#acquire_lua_script ⇒ Object
Lua script atomically reserves inventory across items: 1.
- #acquire_redis_stock!(line_items) ⇒ Object
- #call(order:, expires_at:) ⇒ Object
- #enqueue_release_job(hold) ⇒ Object
- #enqueue_sync_inventory_on_hold(order) ⇒ Object
- #warm_redis_keys!(inventory_items) ⇒ Object
-
#with_advisory_lock!(order) ⇒ Object
Uses PostgreSQL advisory locks to ensure only one process per order can acquire a hold at a time.
Methods included from ServiceModuleThrowable
Instance Method Details
#acquire_hold_for_order!(order, line_items, expires_at) ⇒ Object
Acquires active hold for the order, or returns existing if active.
Creates new hold with :active status and deducts from Redis stock. On DB save failure: transaction rolls back. On Redis failure: transaction rolls back hold (Redis untouched, no manual release needed).
Risk: Redis deduction could orphan if exception raised after Redis commit but before DB. Accepted—advisory lock ensures single process per order prevents impact.
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'app/services/spree_cm_commissioner/inventory_holds/acquire.rb', line 66 def acquire_hold_for_order!(order, line_items, expires_at) hold = SpreeCmCommissioner::InventoryHold.find_or_initialize_by(order: order) return hold if hold.persisted? && hold.active? return hold if hold.persisted? && hold.payment_locked? SpreeCmCommissioner::InventoryHold.transaction do begin hold.assign_attributes( user: order.user, status: :active, held_at: hold.held_at || Time.current, expires_at: expires_at, payment_locked_at: nil, release_reason: nil, total_quantity_on_hold: line_items.sum(&:quantity), ip_address: order.last_ip_address ) hold.save! rescue ActiveRecord::RecordInvalid, ActiveRecord::RecordNotUnique raise AcquireHoldSaveError, I18n.t('inventory_hold.acquire.db_save_error') end begin positive = acquire_redis_stock!(line_items) raise AcquireInsufficientStockError, I18n.t('inventory_hold.acquire.insufficient_stock') unless positive rescue Redis::BaseError, Redis::CommandError, IOError raise AcquireRedisDeductionError, I18n.t('inventory_hold.acquire.redis_error') end hold end end |
#acquire_lua_script ⇒ Object
Lua script atomically reserves inventory across items:
-
Validate: Ensure available stock (stock - held) >= quantity for ALL items
-
Commit: If all pass, increment hold counters atomically
-
Return: 1 (success) or 0 (failure/insufficient stock)
Atomicity prevents overselling and race conditions.
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'app/services/spree_cm_commissioner/inventory_holds/acquire.rb', line 158 def acquire_lua_script <<~LUA -- KEYS = [stock_key_1, hold_key_1, stock_key_2, hold_key_2, ...] -- ARGV = [qty_1, qty_2, ...] for i = 1, #ARGV do local stock = tonumber(redis.call('GET', KEYS[(i-1)*2+1]) or 0) local held = tonumber(redis.call('GET', KEYS[(i-1)*2+2]) or 0) if stock - held < tonumber(ARGV[i]) then return 0 end end for i = 1, #ARGV do redis.call('INCRBY', KEYS[(i-1)*2+2], tonumber(ARGV[i])) end return 1 LUA end |
#acquire_redis_stock!(line_items) ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'app/services/spree_cm_commissioner/inventory_holds/acquire.rb', line 100 def acquire_redis_stock!(line_items) stock_keys = [] hold_keys = [] quantities = [] inventory_items = [] line_items.each do |li| li.inventory_items.active.each do |item| stock_keys << item.redis_key hold_keys << item.redis_hold_key quantities << li.quantity inventory_items << item end end return true if hold_keys.empty? warm_redis_keys!(inventory_items) interleaved_keys = stock_keys.zip(hold_keys).flatten result = SpreeCmCommissioner.inventory_redis_pool.with do |redis| redis.eval(acquire_lua_script, keys: interleaved_keys, argv: quantities) end result.positive? end |
#call(order:, expires_at:) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'app/services/spree_cm_commissioner/inventory_holds/acquire.rb', line 15 def call(order:, expires_at:) line_items = order.line_items.select(&:should_hold_inventory?) return success(nil) if line_items.empty? result = SpreeCmCommissioner::InventoryHolds::ValidateLimits.call(order: order) raise AcquireLimitExceededError, result.error.to_s unless result.success? with_advisory_lock!(order) do hold = acquire_hold_for_order!(order, line_items, expires_at) enqueue_release_job(hold) enqueue_sync_inventory_on_hold(order) CmAppLogger.log( label: "#{self.class.name}#call success", data: { order_id: order.id, hold_id: hold.id } ) success(hold) end rescue StandardError => e error = { error_type: e.class.name.demodulize, order_id: order.id, message: e. } CmAppLogger.error(label: "#{self.class.name}#call failed", data: error) failure(nil, e.) end |
#enqueue_release_job(hold) ⇒ Object
131 132 133 |
# File 'app/services/spree_cm_commissioner/inventory_holds/acquire.rb', line 131 def enqueue_release_job(hold) SpreeCmCommissioner::InventoryHolds::ReleaseJob.set(wait_until: hold.expires_at).perform_later(hold_id: hold.id) end |
#enqueue_sync_inventory_on_hold(order) ⇒ Object
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'app/services/spree_cm_commissioner/inventory_holds/acquire.rb', line 135 def enqueue_sync_inventory_on_hold(order) inventory_id_and_quantities = order.line_items.flat_map do |line_item| next [] unless line_item.should_hold_inventory? line_item.inventory_items.active.map do |inventory_item| { inventory_id: inventory_item.id, quantity: line_item.quantity } end end SpreeCmCommissioner::InventoryItems::BulkAdjustQuantitiesOnHoldJob.perform_later( order_id: order.id, inventory_id_and_quantities: inventory_id_and_quantities ) end |
#warm_redis_keys!(inventory_items) ⇒ Object
127 128 129 |
# File 'app/services/spree_cm_commissioner/inventory_holds/acquire.rb', line 127 def warm_redis_keys!(inventory_items) SpreeCmCommissioner::RedisStock::CachedInventoryItemsBuilder.new(inventory_items).call end |
#with_advisory_lock!(order) ⇒ Object
Uses PostgreSQL advisory locks to ensure only one process per order can acquire a hold at a time.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'app/services/spree_cm_commissioner/inventory_holds/acquire.rb', line 42 def with_advisory_lock!(order) lock_id = Zlib.crc32("spree_cm_commissioner:inventory_hold_acquire:#{order.id}") # select_value returns a Ruby boolean (pg adapter >= 1.0) or string "t"/"f" (older adapters). # Explicitly check both forms so lock failures are never silently treated as success. locked = [true, 't'].include?(ActiveRecord::Base.connection.select_value("SELECT pg_try_advisory_lock(#{lock_id})")) raise AcquireHoldLockConflictError, I18n.t('inventory_hold.acquire.lock_timeout') unless locked begin yield ensure ActiveRecord::Base.connection.execute("SELECT pg_advisory_unlock(#{lock_id})") end end |