Module: Pgbus::Concurrency

Extended by:
ActiveSupport::Concern
Defined in:
lib/pgbus/concurrency.rb,
lib/pgbus/concurrency/semaphore.rb,
lib/pgbus/concurrency/blocked_execution.rb

Defined Under Namespace

Modules: BlockedExecution, Semaphore

Constant Summary collapse

METADATA_KEY =
"pgbus_concurrency_key"

Class Method Summary collapse

Class Method Details

.extract_key(payload) ⇒ Object

Extract the concurrency key from a deserialized payload.



68
69
70
# File 'lib/pgbus/concurrency.rb', line 68

def extract_key(payload)
  payload[METADATA_KEY]
end

.inject_metadata(active_job, payload_hash) ⇒ Object

Inject the resolved concurrency key into the job’s serialized payload.



60
61
62
63
64
65
# File 'lib/pgbus/concurrency.rb', line 60

def (active_job, payload_hash)
  key = resolve_key(active_job)
  return payload_hash unless key

  payload_hash.merge(METADATA_KEY => key)
end

.resolve_key(active_job) ⇒ Object

Resolve the concurrency key for a given job instance. Returns nil if the job class has no concurrency config.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/pgbus/concurrency.rb', line 44

def resolve_key(active_job)
  return nil unless active_job.class.respond_to?(:pgbus_concurrency)

  config = active_job.class.pgbus_concurrency
  return nil unless config

  args = active_job.arguments
  last = args.last
  if last.is_a?(Hash) && last.each_key.all?(Symbol)
    config[:key].call(*args[...-1], **last)
  else
    config[:key].call(*args)
  end
end