Class: Karafka::Pro::Processing::JobsQueue

Inherits:
Karafka::Processing::JobsQueue show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/pro/processing/jobs_queue.rb

Overview

Enhanced processing queue that provides ability to build complex work-distribution schedulers dedicated to particular job types

Aside from the OSS queue capabilities it allows for jobless locking for advanced schedulers

Instance Attribute Summary collapse

Attributes inherited from Karafka::Processing::JobsQueue

#pool

Instance Method Summary collapse

Methods inherited from Karafka::Processing::JobsQueue

#<<, #close, #complete, #pop, #statistics, #tick

Constructor Details

#initializeKarafka::Pro::Processing::JobsQueue



49
50
51
52
53
54
55
56
57
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 49

def initialize
  super

  @in_waiting = Hash.new { |h, k| h[k] = [] }
  @locks = Hash.new { |h, k| h[k] = {} }
  @async_locking = false

  @statistics[:waiting] = 0
end

Instance Attribute Details

#in_processingObject

Returns the value of attribute in_processing.



41
42
43
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 41

def in_processing
  @in_processing
end

Instance Method Details

#clear(group_id) ⇒ Object

Clears the processing states for a provided group. Useful when a recovery happens and we need to clean up state but only for a given subscription group.

Parameters:

  • group_id (String)


147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 147

def clear(group_id)
  @mutex.synchronize do
    @in_processing[group_id].clear

    @statistics[:waiting] -= @in_waiting[group_id].size
    @in_waiting[group_id].clear
    @locks[group_id].clear
    @async_locking = false

    # We unlock it just in case it was blocked when clearing started
    tick(group_id)
  end
end

#empty?(group_id) ⇒ Boolean

a given group.

Parameters:

  • group_id (String)

Returns:

  • (Boolean)

    tell us if we have anything in the processing (or for processing) from



165
166
167
168
169
170
171
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 165

def empty?(group_id)
  @mutex.synchronize do
    @in_processing[group_id].empty? &&
      @in_waiting[group_id].empty? &&
      !locked_async?(group_id)
  end
end

#lock(job) ⇒ Object

Method that allows us to lock queue on a given subscription group without enqueuing the a job. This can be used when building complex schedulers that want to postpone enqueuing before certain conditions are met.

Parameters:

  • job (Jobs::Base)

    job used for locking



74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 74

def lock(job)
  @mutex.synchronize do
    group = @in_waiting[job.group_id]

    # This should never happen. Same job should not be locked twice
    raise(Errors::JobsQueueSynchronizationError, job.group_id) if group.include?(job)

    @statistics[:waiting] += 1

    group << job
  end
end

#lock_async(group_id, lock_id, timeout: WAIT_TIMEOUT) ⇒ Object

Note:

We do not raise ‘Errors::JobsQueueSynchronizationError` similar to `#lock` here because we want to have ability to prolong time limited locks

Allows for explicit locking of the queue of a given subscription group.

This can be used for cross-topic synchronization.

Parameters:

  • group_id (String)

    id of the group we want to lock

  • lock_id (Object)

    unique id we want to use to identify our lock

  • timeout (Integer) (defaults to: WAIT_TIMEOUT)

    number of ms how long this lock should be valid. Useful for auto-expiring locks used to delay further processing without explicit pausing on the consumer



115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 115

def lock_async(group_id, lock_id, timeout: WAIT_TIMEOUT)
  return if @queue.closed?

  @async_locking = true

  @mutex.synchronize do
    @locks[group_id][lock_id] = monotonic_now + timeout

    # We need to tick so our new time sensitive lock can reload time constraints on sleep
    tick(group_id)
  end
end

#register(group_id) ⇒ Object

Registers semaphore and a lock hash

Parameters:

  • group_id (String)


62
63
64
65
66
67
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 62

def register(group_id)
  super
  @mutex.synchronize do
    @locks[group_id]
  end
end

#unlock(job) ⇒ Object

Method for unlocking the given subscription group queue space that was locked with a given job that was not added to the queue but used via ‘#lock`.

Parameters:

  • job (Jobs::Base)

    job that locked the queue



91
92
93
94
95
96
97
98
99
100
101
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 91

def unlock(job)
  @mutex.synchronize do
    @statistics[:waiting] -= 1

    return if @in_waiting[job.group_id].delete(job)

    # This should never happen. It means there was a job being unlocked that was never
    # locked in the first place
    raise(Errors::JobsQueueSynchronizationError, job.group_id)
  end
end

#unlock_async(group_id, lock_id) ⇒ Object

Allows for explicit unlocking of locked queue of a group

Parameters:

  • group_id (String)

    id of the group we want to unlock

  • lock_id (Object)

    unique id we want to use to identify our lock



131
132
133
134
135
136
137
138
139
140
141
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 131

def unlock_async(group_id, lock_id)
  @mutex.synchronize do
    if @locks[group_id].delete(lock_id)
      tick(group_id)

      return
    end

    raise(Errors::JobsQueueSynchronizationError, [group_id, lock_id])
  end
end

#wait(group_id) ⇒ Object

Note:

Because checking that async locking is on happens on regular ticking, first lock on a group can take up to one tick. That is expected.

Note:

This implementation takes into consideration temporary async locks that can happen. Thanks to the fact that we use the minimum lock time as a timeout, we do not have to wait a whole ticking period to unlock async locks.

Blocks when there are things in the queue in a given group and waits until all the

blocking jobs from a given group are completed or any of the locks times out

Parameters:

  • group_id (String)

    id of the group in which jobs we’re interested.

See Also:

  • Karafka::Pro::Processing::JobsQueue.`Karafka`Karafka::Processing`Karafka::Processing::JobsQueue`


184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 184

def wait(group_id)
  return super unless @async_locking

  # We do not generalize this flow because this one is more expensive as it has to allocate
  # extra objects. That's why we only use it when locks are actually in use
  base_interval = tick_interval / 1_000.0

  while wait?(group_id)
    yield if block_given?

    now = monotonic_now

    wait_times = @locks[group_id].values.map! do |lock_time|
      # Convert ms to seconds, seconds are required by Ruby queue engine
      (lock_time - now) / 1_000
    end

    wait_times.delete_if(&:negative?)
    wait_times << base_interval

    @semaphores.fetch(group_id).pop(timeout: wait_times.min)
  end
end