Class: Karafka::Pro::ActiveJob::Dispatcher
- Inherits:
-
ActiveJob::Dispatcher
- Object
- ActiveJob::Dispatcher
- Karafka::Pro::ActiveJob::Dispatcher
- Defined in:
- lib/karafka/pro/active_job/dispatcher.rb
Overview
Pro dispatcher that sends the ActiveJob job to a proper topic based on the queue name and that allows to inject additional options into the producer, effectively allowing for a much better and more granular control over the dispatch and consumption process.
Instance Method Summary collapse
- #dispatch(job) ⇒ Object
-
#dispatch_many(jobs) ⇒ Object
Bulk dispatches multiple jobs using the Rails 7.1+ API.
Instance Method Details
#dispatch(job) ⇒ Object
46 47 48 49 50 51 52 53 54 |
# File 'lib/karafka/pro/active_job/dispatcher.rb', line 46 def dispatch(job) producer(job).public_send( fetch_option(job, :dispatch_method, DEFAULTS), dispatch_details(job).merge!( topic: job.queue_name, payload: ::ActiveSupport::JSON.encode(serialize_job(job)) ) ) end |
#dispatch_many(jobs) ⇒ Object
Bulk dispatches multiple jobs using the Rails 7.1+ API
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/karafka/pro/active_job/dispatcher.rb', line 58 def dispatch_many(jobs) # First level is type of dispatch and second is the producer we want to use to dispatch dispatches = Hash.new do |hash, key| hash[key] = Hash.new do |hash2, key2| hash2[key2] = [] end end jobs.each do |job| d_method = fetch_option(job, :dispatch_many_method, DEFAULTS) producer = producer(job) dispatches[d_method][producer] << dispatch_details(job).merge!( topic: job.queue_name, payload: ::ActiveSupport::JSON.encode(serialize_job(job)) ) end dispatches.each do |d_method, producers| producers.each do |producer, | producer.public_send( d_method, ) end end end |