Module: Pgbus::Concurrency::BlockedExecution
- Defined in:
- lib/pgbus/concurrency/blocked_execution.rb
Class Method Summary collapse
-
.count_for(concurrency_key) ⇒ Object
Count blocked executions for a given key.
-
.expire_stale ⇒ Object
Delete blocked executions that have expired.
-
.insert(concurrency_key:, queue_name:, payload:, duration:, priority: 0) ⇒ Object
Insert a blocked execution for a job that hit the concurrency limit.
-
.promote_next(concurrency_key, client:, delay: 0) ⇒ Object
Atomically promote the next blocked execution: delete the row and enqueue the job in a single transaction.
-
.release_next(concurrency_key) ⇒ Object
Release the next blocked execution for a given concurrency key.
Class Method Details
.count_for(concurrency_key) ⇒ Object
Count blocked executions for a given key. Useful for testing/monitoring.
52 53 54 |
# File 'lib/pgbus/concurrency/blocked_execution.rb', line 52 def count_for(concurrency_key) Pgbus::BlockedExecution.where(concurrency_key: concurrency_key).count end |
.expire_stale ⇒ Object
Delete blocked executions that have expired. Returns the count of deleted rows.
47 48 49 |
# File 'lib/pgbus/concurrency/blocked_execution.rb', line 47 def expire_stale Pgbus::BlockedExecution.expired(Time.current).delete_all end |
.insert(concurrency_key:, queue_name:, payload:, duration:, priority: 0) ⇒ Object
Insert a blocked execution for a job that hit the concurrency limit.
10 11 12 13 14 15 16 17 18 |
# File 'lib/pgbus/concurrency/blocked_execution.rb', line 10 def insert(concurrency_key:, queue_name:, payload:, duration:, priority: 0) Pgbus::BlockedExecution.create!( concurrency_key: concurrency_key, queue_name: queue_name, payload: JSON.generate(payload), priority: priority, expires_at: Time.current + duration ) end |
.promote_next(concurrency_key, client:, delay: 0) ⇒ Object
Atomically promote the next blocked execution: delete the row and enqueue the job in a single transaction. Returns true if a job was promoted, false otherwise. This avoids losing a blocked row if enqueue fails.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/pgbus/concurrency/blocked_execution.rb', line 29 def promote_next(concurrency_key, client:, delay: 0) released = nil Pgbus::BlockedExecution.transaction do released = release_next(concurrency_key) raise ActiveRecord::Rollback unless released actual_delay = resolve_delay(released[:payload], delay) client.(released[:queue_name], released[:payload], delay: actual_delay) end !!released rescue StandardError => e Pgbus.logger.warn { "[Pgbus] Promote blocked execution failed for #{concurrency_key}: #{e.}" } false end |
.release_next(concurrency_key) ⇒ Object
Release the next blocked execution for a given concurrency key. Returns the released row (queue_name, payload) or nil if none.
22 23 24 |
# File 'lib/pgbus/concurrency/blocked_execution.rb', line 22 def release_next(concurrency_key) Pgbus::BlockedExecution.release_next!(concurrency_key) end |