Class: Karafka::Processing::JobsQueue
- Inherits:
-
Object
- Object
- Karafka::Processing::JobsQueue
- Defined in:
- lib/karafka/processing/jobs_queue.rb
Overview
This job queue also keeps track / understands number of busy workers. This is because we use a single workers poll that can have granular scheduling.
This is the key work component for Karafka jobs distribution. It provides API for running jobs in parallel while operating within more than one subscription group.
We need to take into consideration fact, that more than one subscription group can operate on this queue, that’s why internally we keep track of processing per group.
We work with the assumption, that partitions data is evenly distributed.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#pool ⇒ Object
writeonly
Set via writer because of a circular dependency: the queue must exist before the pool (workers need the queue at construction), but the queue needs the pool for concurrency.
Instance Method Summary collapse
-
#<<(job) ⇒ Object
Adds the job to the internal main queue, scheduling it for execution in a worker and marks this job as in processing pipeline.
-
#clear(group_id) ⇒ Object
Clears the processing states for a provided group.
-
#close ⇒ Object
Stops the whole processing queue.
-
#complete(job) ⇒ Object
Marks a given job from a given group as completed.
-
#empty?(group_id) ⇒ Boolean
a given group.
-
#in_processing ⇒ Hash{String => Array<Jobs::Base>}
Returns a snapshot of all jobs currently in processing per group.
- #initialize ⇒ Karafka::Processing::JobsQueue constructor
-
#pop ⇒ Jobs::Base?
Waits for a job from the main queue and returns it once available or returns nil if the queue has been stopped and there won’t be anything more to process ever.
-
#register(group_id) ⇒ Object
Registers given subscription group id in the queue.
-
#statistics ⇒ Hash
-
‘busy` - number of jobs that are currently being processed (active work) - `enqueued` - number of jobs in the queue that are waiting to be picked up by a worker.
-
-
#tick(group_id) ⇒ Object
Causes the wait lock to re-check the lock conditions and potential unlock.
-
#wait(group_id) {|block| ... } ⇒ Object
Blocks when there are things in the queue in a given group and waits until all the blocking jobs from a given group are completed.
Constructor Details
#initialize ⇒ Karafka::Processing::JobsQueue
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/karafka/processing/jobs_queue.rb', line 25 def initialize @queue = Queue.new # Those queues will act as semaphores internally. Since we need an indicator for waiting # we could use Thread.pass but this is expensive. Instead we can just lock until any # of the workers finishes their work and we can re-check. This means that in the worse # scenario, we will context switch 10 times per poll instead of getting this thread # scheduled by Ruby hundreds of thousands of times per group. # We cannot use a single semaphore as it could potentially block in listeners that should # process with their data and also could unlock when a given group needs to remain locked @semaphores = {} @in_processing = Hash.new { |h, k| h[k] = [] } @statistics = { busy: 0, enqueued: 0 } @mutex = Mutex.new end |
Instance Attribute Details
#pool=(value) ⇒ Object (writeonly)
Set via writer because of a circular dependency: the queue must exist before the pool (workers need the queue at construction), but the queue needs the pool for concurrency.
22 23 24 |
# File 'lib/karafka/processing/jobs_queue.rb', line 22 def pool=(value) @pool = value end |
Instance Method Details
#<<(job) ⇒ Object
Adds the job to the internal main queue, scheduling it for execution in a worker and marks this job as in processing pipeline.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/karafka/processing/jobs_queue.rb', line 61 def <<(job) return if @queue.closed? # nil is used by WorkersPool to signal a worker to exit during downscaling. # Passed straight through to the raw queue, bypassing statistics and tracking. unless job @queue << job return end @mutex.synchronize do group = @in_processing[job.group_id] raise(Errors::JobsQueueSynchronizationError, job.group_id) if group.include?(job) group << job # Assume that moving to queue means being picked up immediately not to create stats # race conditions because of pop overhead. If there are workers available, we assume # work is going to be handled as we never reject enqueued jobs if @statistics[:busy] < @pool.size @statistics[:busy] += 1 else # If system is fully loaded, it means this job is indeed enqueued @statistics[:enqueued] += 1 end @queue << job end end |
#clear(group_id) ⇒ Object
Clears the processing states for a provided group. Useful when a recovery happens and we need to clean up state but only for a given subscription group.
131 132 133 134 135 136 137 |
# File 'lib/karafka/processing/jobs_queue.rb', line 131 def clear(group_id) @mutex.synchronize do @in_processing[group_id].clear # We unlock it just in case it was blocked when clearing started tick(group_id) end end |
#close ⇒ Object
Stops the whole processing queue.
140 141 142 143 144 145 146 147 |
# File 'lib/karafka/processing/jobs_queue.rb', line 140 def close @mutex.synchronize do return if @queue.closed? @queue.close @semaphores.each_value(&:close) end end |
#complete(job) ⇒ Object
Marks a given job from a given group as completed. When there are no more jobs from a given group to be executed, we won’t wait.
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/karafka/processing/jobs_queue.rb', line 111 def complete(job) @mutex.synchronize do # We finish one job and if there is another, we pick it up if @statistics[:enqueued].positive? @statistics[:enqueued] -= 1 # If no more enqueued jobs, we will be just less busy else @statistics[:busy] -= 1 end @in_processing[job.group_id].delete(job) tick(job.group_id) end end |
#empty?(group_id) ⇒ Boolean
a given group.
153 154 155 156 157 |
# File 'lib/karafka/processing/jobs_queue.rb', line 153 def empty?(group_id) @mutex.synchronize do @in_processing[group_id].empty? end end |
#in_processing ⇒ Hash{String => Array<Jobs::Base>}
Returns a snapshot of all jobs currently in processing per group. Useful for diagnostics during forceful shutdown to understand what is blocking.
195 196 197 198 199 |
# File 'lib/karafka/processing/jobs_queue.rb', line 195 def in_processing @mutex.synchronize do @in_processing.transform_values(&:dup).freeze end end |
#pop ⇒ Jobs::Base?
This command is blocking and will wait until any job is available on the main queue
Returns waits for a job from the main queue and returns it once available or returns nil if the queue has been stopped and there won’t be anything more to process ever.
96 97 98 |
# File 'lib/karafka/processing/jobs_queue.rb', line 96 def pop @queue.pop end |
#register(group_id) ⇒ Object
Registers given subscription group id in the queue. It is needed so we do not dynamically create semaphore, hence avoiding potential race conditions
45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/karafka/processing/jobs_queue.rb', line 45 def register(group_id) # Ruby prior to 3.2 did not have queue with a timeout on `#pop`, that is why for those @mutex.synchronize do # versions we use our custom queue wrapper # # Initializes this semaphore from the mutex, so it is never auto-created # Since we always schedule a job before waiting using semaphores, there won't be any # concurrency problems @semaphores[group_id] = Queue.new end end |
#statistics ⇒ Hash
-
‘busy` - number of jobs that are currently being processed (active work)
-
‘enqueued` - number of jobs in the queue that are waiting to be picked up by a worker
184 185 186 187 188 189 |
# File 'lib/karafka/processing/jobs_queue.rb', line 184 def statistics # Ensures there are no race conditions when returning this data @mutex.synchronize do @statistics.dup.freeze end end |
#tick(group_id) ⇒ Object
This does not release the wait lock. It just causes a conditions recheck
Causes the wait lock to re-check the lock conditions and potential unlock.
103 104 105 |
# File 'lib/karafka/processing/jobs_queue.rb', line 103 def tick(group_id) @semaphores.fetch(group_id) << true end |
#wait(group_id) {|block| ... } ⇒ Object
This method is blocking.
Blocks when there are things in the queue in a given group and waits until all the blocking
jobs from a given group are completed
168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/karafka/processing/jobs_queue.rb', line 168 def wait(group_id) interval_in_seconds = tick_interval / 1_000.0 # Go doing other things while we cannot process and wait for anyone to finish their work # and re-check the wait status while wait?(group_id) yield if block_given? @semaphores.fetch(group_id).pop(timeout: interval_in_seconds) end end |