Module: Pgbus::Concurrency::BlockedExecution

Defined in:
lib/pgbus/concurrency/blocked_execution.rb

Class Method Summary collapse

Class Method Details

.count_for(concurrency_key) ⇒ Object

Count blocked executions for a given key. Useful for testing/monitoring.



52
53
54
# File 'lib/pgbus/concurrency/blocked_execution.rb', line 52

def count_for(concurrency_key)
  Pgbus::BlockedExecution.where(concurrency_key: concurrency_key).count
end

.expire_staleObject

Delete blocked executions that have expired. Returns the count of deleted rows.



47
48
49
# File 'lib/pgbus/concurrency/blocked_execution.rb', line 47

def expire_stale
  Pgbus::BlockedExecution.expired(Time.current).delete_all
end

.insert(concurrency_key:, queue_name:, payload:, duration:, priority: 0) ⇒ Object

Insert a blocked execution for a job that hit the concurrency limit.



10
11
12
13
14
15
16
17
18
# File 'lib/pgbus/concurrency/blocked_execution.rb', line 10

def insert(concurrency_key:, queue_name:, payload:, duration:, priority: 0)
  Pgbus::BlockedExecution.create!(
    concurrency_key: concurrency_key,
    queue_name: queue_name,
    payload: JSON.generate(payload),
    priority: priority,
    expires_at: Time.current + duration
  )
end

.promote_next(concurrency_key, client:, delay: 0) ⇒ Object

Atomically promote the next blocked execution: delete the row and enqueue the job in a single transaction. Returns true if a job was promoted, false otherwise. This avoids losing a blocked row if enqueue fails.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/pgbus/concurrency/blocked_execution.rb', line 29

def promote_next(concurrency_key, client:, delay: 0)
  released = nil
  Pgbus::BlockedExecution.transaction do
    released = release_next(concurrency_key)
    raise ActiveRecord::Rollback unless released

    actual_delay = resolve_delay(released[:payload], delay)
    client.send_message(released[:queue_name], released[:payload], delay: actual_delay)
  end

  !!released
rescue StandardError => e
  Pgbus.logger.warn { "[Pgbus] Promote blocked execution failed for #{concurrency_key}: #{e.message}" }
  false
end

.release_next(concurrency_key) ⇒ Object

Release the next blocked execution for a given concurrency key. Returns the released row (queue_name, payload) or nil if none.



22
23
24
# File 'lib/pgbus/concurrency/blocked_execution.rb', line 22

def release_next(concurrency_key)
  Pgbus::BlockedExecution.release_next!(concurrency_key)
end