Class: Karafka::Pro::Processing::JobsQueue
- Inherits:
-
Karafka::Processing::JobsQueue
- Object
- Karafka::Processing::JobsQueue
- Karafka::Pro::Processing::JobsQueue
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/pro/processing/jobs_queue.rb
Overview
Enhanced processing queue that provides ability to build complex work-distribution schedulers dedicated to particular job types
Aside from the OSS queue capabilities it allows for jobless locking for advanced schedulers
Instance Attribute Summary collapse
-
#in_processing ⇒ Object
Returns the value of attribute in_processing.
Attributes inherited from Karafka::Processing::JobsQueue
Instance Method Summary collapse
-
#clear(group_id) ⇒ Object
Clears the processing states for a provided group.
-
#empty?(group_id) ⇒ Boolean
a given group.
- #initialize ⇒ Karafka::Pro::Processing::JobsQueue constructor
-
#lock(job) ⇒ Object
Method that allows us to lock queue on a given subscription group without enqueuing the a job.
-
#lock_async(group_id, lock_id, timeout: WAIT_TIMEOUT) ⇒ Object
Allows for explicit locking of the queue of a given subscription group.
-
#register(group_id) ⇒ Object
Registers semaphore and a lock hash.
-
#unlock(job) ⇒ Object
Method for unlocking the given subscription group queue space that was locked with a given job that was not added to the queue but used via ‘#lock`.
-
#unlock_async(group_id, lock_id) ⇒ Object
Allows for explicit unlocking of locked queue of a group.
-
#wait(group_id) ⇒ Object
Blocks when there are things in the queue in a given group and waits until all the blocking jobs from a given group are completed or any of the locks times out.
Methods inherited from Karafka::Processing::JobsQueue
#<<, #close, #complete, #pop, #statistics, #tick
Constructor Details
#initialize ⇒ Karafka::Pro::Processing::JobsQueue
49 50 51 52 53 54 55 56 57 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 49 def initialize super @in_waiting = Hash.new { |h, k| h[k] = [] } @locks = Hash.new { |h, k| h[k] = {} } @async_locking = false @statistics[:waiting] = 0 end |
Instance Attribute Details
#in_processing ⇒ Object
Returns the value of attribute in_processing.
41 42 43 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 41 def in_processing @in_processing end |
Instance Method Details
#clear(group_id) ⇒ Object
Clears the processing states for a provided group. Useful when a recovery happens and we need to clean up state but only for a given subscription group.
147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 147 def clear(group_id) @mutex.synchronize do @in_processing[group_id].clear @statistics[:waiting] -= @in_waiting[group_id].size @in_waiting[group_id].clear @locks[group_id].clear @async_locking = false # We unlock it just in case it was blocked when clearing started tick(group_id) end end |
#empty?(group_id) ⇒ Boolean
a given group.
165 166 167 168 169 170 171 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 165 def empty?(group_id) @mutex.synchronize do @in_processing[group_id].empty? && @in_waiting[group_id].empty? && !locked_async?(group_id) end end |
#lock(job) ⇒ Object
Method that allows us to lock queue on a given subscription group without enqueuing the a job. This can be used when building complex schedulers that want to postpone enqueuing before certain conditions are met.
74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 74 def lock(job) @mutex.synchronize do group = @in_waiting[job.group_id] # This should never happen. Same job should not be locked twice raise(Errors::JobsQueueSynchronizationError, job.group_id) if group.include?(job) @statistics[:waiting] += 1 group << job end end |
#lock_async(group_id, lock_id, timeout: WAIT_TIMEOUT) ⇒ Object
We do not raise ‘Errors::JobsQueueSynchronizationError` similar to `#lock` here because we want to have ability to prolong time limited locks
Allows for explicit locking of the queue of a given subscription group.
This can be used for cross-topic synchronization.
115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 115 def lock_async(group_id, lock_id, timeout: WAIT_TIMEOUT) return if @queue.closed? @async_locking = true @mutex.synchronize do @locks[group_id][lock_id] = monotonic_now + timeout # We need to tick so our new time sensitive lock can reload time constraints on sleep tick(group_id) end end |
#register(group_id) ⇒ Object
Registers semaphore and a lock hash
62 63 64 65 66 67 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 62 def register(group_id) super @mutex.synchronize do @locks[group_id] end end |
#unlock(job) ⇒ Object
Method for unlocking the given subscription group queue space that was locked with a given job that was not added to the queue but used via ‘#lock`.
91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 91 def unlock(job) @mutex.synchronize do @statistics[:waiting] -= 1 return if @in_waiting[job.group_id].delete(job) # This should never happen. It means there was a job being unlocked that was never # locked in the first place raise(Errors::JobsQueueSynchronizationError, job.group_id) end end |
#unlock_async(group_id, lock_id) ⇒ Object
Allows for explicit unlocking of locked queue of a group
131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 131 def unlock_async(group_id, lock_id) @mutex.synchronize do if @locks[group_id].delete(lock_id) tick(group_id) return end raise(Errors::JobsQueueSynchronizationError, [group_id, lock_id]) end end |
#wait(group_id) ⇒ Object
Because checking that async locking is on happens on regular ticking, first lock on a group can take up to one tick. That is expected.
This implementation takes into consideration temporary async locks that can happen. Thanks to the fact that we use the minimum lock time as a timeout, we do not have to wait a whole ticking period to unlock async locks.
Blocks when there are things in the queue in a given group and waits until all the
blocking jobs from a given group are completed or any of the locks times out
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 184 def wait(group_id) return super unless @async_locking # We do not generalize this flow because this one is more expensive as it has to allocate # extra objects. That's why we only use it when locks are actually in use base_interval = tick_interval / 1_000.0 while wait?(group_id) yield if block_given? now = monotonic_now wait_times = @locks[group_id].values.map! do |lock_time| # Convert ms to seconds, seconds are required by Ruby queue engine (lock_time - now) / 1_000 end wait_times.delete_if(&:negative?) wait_times << base_interval @semaphores.fetch(group_id).pop(timeout: wait_times.min) end end |