Module: Pgbus::Concurrency
- Extended by:
- ActiveSupport::Concern
- Defined in:
- lib/pgbus/concurrency.rb,
lib/pgbus/concurrency/semaphore.rb,
lib/pgbus/concurrency/blocked_execution.rb
Defined Under Namespace
Modules: BlockedExecution, Semaphore
Constant Summary collapse
- METADATA_KEY =
"pgbus_concurrency_key"
Class Method Summary collapse
-
.extract_key(payload) ⇒ Object
Extract the concurrency key from a deserialized payload.
-
.inject_metadata(active_job, payload_hash) ⇒ Object
Inject the resolved concurrency key into the job’s serialized payload.
-
.resolve_key(active_job) ⇒ Object
Resolve the concurrency key for a given job instance.
Class Method Details
.extract_key(payload) ⇒ Object
Extract the concurrency key from a deserialized payload.
68 69 70 |
# File 'lib/pgbus/concurrency.rb', line 68 def extract_key(payload) payload[METADATA_KEY] end |
.inject_metadata(active_job, payload_hash) ⇒ Object
Inject the resolved concurrency key into the job’s serialized payload.
60 61 62 63 64 65 |
# File 'lib/pgbus/concurrency.rb', line 60 def (active_job, payload_hash) key = resolve_key(active_job) return payload_hash unless key payload_hash.merge(METADATA_KEY => key) end |
.resolve_key(active_job) ⇒ Object
Resolve the concurrency key for a given job instance. Returns nil if the job class has no concurrency config.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/pgbus/concurrency.rb', line 44 def resolve_key(active_job) return nil unless active_job.class.respond_to?(:pgbus_concurrency) config = active_job.class.pgbus_concurrency return nil unless config args = active_job.arguments last = args.last if last.is_a?(Hash) && last.each_key.all?(Symbol) config[:key].call(*args[...-1], **last) else config[:key].call(*args) end end |