Class: Karafka::Processing::JobsQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/processing/jobs_queue.rb

Overview

Note:

This job queue also keeps track / understands number of busy workers. This is because we use a single workers poll that can have granular scheduling.

This is the key work component for Karafka jobs distribution. It provides API for running jobs in parallel while operating within more than one subscription group.

We need to take into consideration fact, that more than one subscription group can operate on this queue, that’s why internally we keep track of processing per group.

We work with the assumption, that partitions data is evenly distributed.

Direct Known Subclasses

Karafka::Pro::Processing::JobsQueue

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeKarafka::Processing::JobsQueue



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/karafka/processing/jobs_queue.rb', line 25

def initialize
  @queue = Queue.new
  # Those queues will act as semaphores internally. Since we need an indicator for waiting
  # we could use Thread.pass but this is expensive. Instead we can just lock until any
  # of the workers finishes their work and we can re-check. This means that in the worse
  # scenario, we will context switch 10 times per poll instead of getting this thread
  # scheduled by Ruby hundreds of thousands of times per group.
  # We cannot use a single semaphore as it could potentially block in listeners that should
  # process with their data and also could unlock when a given group needs to remain locked
  @semaphores = {}
  @in_processing = Hash.new { |h, k| h[k] = [] }
  @statistics = { busy: 0, enqueued: 0 }

  @mutex = Mutex.new
end

Instance Attribute Details

#pool=(value) ⇒ Object (writeonly)

Set via writer because of a circular dependency: the queue must exist before the pool (workers need the queue at construction), but the queue needs the pool for concurrency.



22
23
24
# File 'lib/karafka/processing/jobs_queue.rb', line 22

def pool=(value)
  @pool = value
end

Instance Method Details

#<<(job) ⇒ Object

Adds the job to the internal main queue, scheduling it for execution in a worker and marks this job as in processing pipeline.

Parameters:



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/karafka/processing/jobs_queue.rb', line 61

def <<(job)
  return if @queue.closed?

  # nil is used by WorkersPool to signal a worker to exit during downscaling.
  # Passed straight through to the raw queue, bypassing statistics and tracking.
  unless job
    @queue << job
    return
  end

  @mutex.synchronize do
    group = @in_processing[job.group_id]

    raise(Errors::JobsQueueSynchronizationError, job.group_id) if group.include?(job)

    group << job

    # Assume that moving to queue means being picked up immediately not to create stats
    # race conditions because of pop overhead. If there are workers available, we assume
    # work is going to be handled as we never reject enqueued jobs
    if @statistics[:busy] < @pool.size
      @statistics[:busy] += 1
    else
      # If system is fully loaded, it means this job is indeed enqueued
      @statistics[:enqueued] += 1
    end

    @queue << job
  end
end

#clear(group_id) ⇒ Object

Clears the processing states for a provided group. Useful when a recovery happens and we need to clean up state but only for a given subscription group.

Parameters:

  • group_id (String)


131
132
133
134
135
136
137
# File 'lib/karafka/processing/jobs_queue.rb', line 131

def clear(group_id)
  @mutex.synchronize do
    @in_processing[group_id].clear
    # We unlock it just in case it was blocked when clearing started
    tick(group_id)
  end
end

#closeObject

Stops the whole processing queue.



140
141
142
143
144
145
146
147
# File 'lib/karafka/processing/jobs_queue.rb', line 140

def close
  @mutex.synchronize do
    return if @queue.closed?

    @queue.close
    @semaphores.each_value(&:close)
  end
end

#complete(job) ⇒ Object

Marks a given job from a given group as completed. When there are no more jobs from a given group to be executed, we won’t wait.

Parameters:



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/karafka/processing/jobs_queue.rb', line 111

def complete(job)
  @mutex.synchronize do
    # We finish one job and if there is another, we pick it up
    if @statistics[:enqueued].positive?
      @statistics[:enqueued] -= 1
    # If no more enqueued jobs, we will be just less busy
    else
      @statistics[:busy] -= 1
    end

    @in_processing[job.group_id].delete(job)

    tick(job.group_id)
  end
end

#empty?(group_id) ⇒ Boolean

a given group.

Parameters:

  • group_id (String)

Returns:

  • (Boolean)

    tell us if we have anything in the processing (or for processing) from



153
154
155
156
157
# File 'lib/karafka/processing/jobs_queue.rb', line 153

def empty?(group_id)
  @mutex.synchronize do
    @in_processing[group_id].empty?
  end
end

#in_processingHash{String => Array<Jobs::Base>}

Returns a snapshot of all jobs currently in processing per group. Useful for diagnostics during forceful shutdown to understand what is blocking.

Returns:

  • (Hash{String => Array<Jobs::Base>})

    hash mapping group ids to arrays of jobs



195
196
197
198
199
# File 'lib/karafka/processing/jobs_queue.rb', line 195

def in_processing
  @mutex.synchronize do
    @in_processing.transform_values(&:dup).freeze
  end
end

#popJobs::Base?

Note:

This command is blocking and will wait until any job is available on the main queue

Returns waits for a job from the main queue and returns it once available or returns nil if the queue has been stopped and there won’t be anything more to process ever.

Returns:

  • (Jobs::Base, nil)

    waits for a job from the main queue and returns it once available or returns nil if the queue has been stopped and there won’t be anything more to process ever.



96
97
98
# File 'lib/karafka/processing/jobs_queue.rb', line 96

def pop
  @queue.pop
end

#register(group_id) ⇒ Object

Registers given subscription group id in the queue. It is needed so we do not dynamically create semaphore, hence avoiding potential race conditions

Parameters:

  • group_id (String)


45
46
47
48
49
50
51
52
53
54
55
# File 'lib/karafka/processing/jobs_queue.rb', line 45

def register(group_id)
  # Ruby prior to 3.2 did not have queue with a timeout on `#pop`, that is why for those
  @mutex.synchronize do
    # versions we use our custom queue wrapper
    #
    # Initializes this semaphore from the mutex, so it is never auto-created
    # Since we always schedule a job before waiting using semaphores, there won't be any
    # concurrency problems
    @semaphores[group_id] = Queue.new
  end
end

#statisticsHash

  • ‘busy` - number of jobs that are currently being processed (active work)

  • ‘enqueued` - number of jobs in the queue that are waiting to be picked up by a worker

Returns:

  • (Hash)

    hash with basic usage statistics of this queue.



184
185
186
187
188
189
# File 'lib/karafka/processing/jobs_queue.rb', line 184

def statistics
  # Ensures there are no race conditions when returning this data
  @mutex.synchronize do
    @statistics.dup.freeze
  end
end

#tick(group_id) ⇒ Object

Note:

This does not release the wait lock. It just causes a conditions recheck

Causes the wait lock to re-check the lock conditions and potential unlock.

Parameters:

  • group_id (String)

    id of the group we want to unlock for one tick



103
104
105
# File 'lib/karafka/processing/jobs_queue.rb', line 103

def tick(group_id)
  @semaphores.fetch(group_id) << true
end

#wait(group_id) {|block| ... } ⇒ Object

Note:

This method is blocking.

Blocks when there are things in the queue in a given group and waits until all the blocking

jobs from a given group are completed

Parameters:

  • group_id (String)

    id of the group in which jobs we’re interested.

Yield Parameters:

  • block (Block)

    we want to run before each pop (in case of Ruby pre 3.2) or before each pop and on every tick interval. This allows us to run extra code that needs to be executed even when we are waiting on the work to be finished.



168
169
170
171
172
173
174
175
176
177
178
# File 'lib/karafka/processing/jobs_queue.rb', line 168

def wait(group_id)
  interval_in_seconds = tick_interval / 1_000.0

  # Go doing other things while we cannot process and wait for anyone to finish their work
  # and re-check the wait status
  while wait?(group_id)
    yield if block_given?

    @semaphores.fetch(group_id).pop(timeout: interval_in_seconds)
  end
end