Class: Karafka::Pro::Processing::Schedulers::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/processing/schedulers/base.rb

Overview

Note:

All the ‘on_` methods can be redefined with a non-thread-safe versions without locks if needed, however when doing so, ensure that your scheduler is stateless.

Base for all the Pro custom schedulers

It wraps the Scheduler API with mutex to ensure, that during scheduling we do not start scheduling other work that could impact the decision making in between multiple subscription groups running in separate threads.

Direct Known Subclasses

Default

Instance Method Summary collapse

Constructor Details

#initialize(queue) ⇒ Base

Returns a new instance of Base.

Parameters:



46
47
48
49
# File 'lib/karafka/pro/processing/schedulers/base.rb', line 46

def initialize(queue)
  @queue = queue
  @mutex = Mutex.new
end

Instance Method Details

#clear(_group_id) ⇒ Object

By default schedulers are stateless, so nothing to clear.

Parameters:

  • _group_id (String)

    Subscription group id



151
152
153
# File 'lib/karafka/pro/processing/schedulers/base.rb', line 151

def clear(_group_id)
  nil
end

#manageObject

Should manage scheduling on jobs state changes

By default does nothing as default schedulers are stateless



137
138
139
# File 'lib/karafka/pro/processing/schedulers/base.rb', line 137

def manage
  nil
end

#on_clear(group_id) ⇒ Object

Runs clearing under mutex

Parameters:

  • group_id (String)

    Subscription group id



144
145
146
# File 'lib/karafka/pro/processing/schedulers/base.rb', line 144

def on_clear(group_id)
  @mutex.synchronize { clear(group_id) }
end

#on_manageObject

Runs the manage tick under mutex



130
131
132
# File 'lib/karafka/pro/processing/schedulers/base.rb', line 130

def on_manage
  @mutex.synchronize { manage }
end

#on_schedule_consumption(jobs_array) ⇒ Object

Runs the consumption jobs scheduling flow under a mutex

Parameters:

  • jobs_array

    [Array<Karafka::Processing::ConsumerGroups::Jobs::Consume, Karafka::Pro::Processing::ConsumerGroups::Jobs::ConsumeNonBlocking>] jobs for scheduling



65
66
67
68
69
# File 'lib/karafka/pro/processing/schedulers/base.rb', line 65

def on_schedule_consumption(jobs_array)
  @mutex.synchronize do
    schedule_consumption(jobs_array)
  end
end

#on_schedule_idle(jobs_array) ⇒ Object

Runs the idle jobs scheduling flow under a mutex

Parameters:



105
106
107
108
109
# File 'lib/karafka/pro/processing/schedulers/base.rb', line 105

def on_schedule_idle(jobs_array)
  @mutex.synchronize do
    schedule_idle(jobs_array)
  end
end

#on_schedule_periodic(jobs_array) ⇒ Object

Runs the periodic jobs scheduling flow under a mutex

Parameters:

  • jobs_array

    [Array<Karafka::Pro::Processing::ConsumerGroups::Jobs::Periodic, Karafka::Pro::Processing::ConsumerGroups::Jobs::PeriodicNonBlocking>] jobs for scheduling



117
118
119
120
121
# File 'lib/karafka/pro/processing/schedulers/base.rb', line 117

def on_schedule_periodic(jobs_array)
  @mutex.synchronize do
    schedule_periodic(jobs_array)
  end
end

#on_schedule_revocation(jobs_array) ⇒ Object

Runs the revocation jobs scheduling flow under a mutex

Parameters:

  • jobs_array

    [Array<Karafka::Processing::ConsumerGroups::Jobs::Revoked, Karafka::Pro::Processing::ConsumerGroups::Jobs::RevokedNonBlocking>] jobs for scheduling



87
88
89
90
91
# File 'lib/karafka/pro/processing/schedulers/base.rb', line 87

def on_schedule_revocation(jobs_array)
  @mutex.synchronize do
    schedule_revocation(jobs_array)
  end
end

#on_schedule_shutdown(jobs_array) ⇒ Object

Runs the shutdown jobs scheduling flow under a mutex

Parameters:



96
97
98
99
100
# File 'lib/karafka/pro/processing/schedulers/base.rb', line 96

def on_schedule_shutdown(jobs_array)
  @mutex.synchronize do
    schedule_shutdown(jobs_array)
  end
end

#schedule_consumption(_jobs_array) ⇒ Object

Should schedule the consumption jobs

Parameters:

  • _jobs_array

    [Array<Karafka::Processing::ConsumerGroups::Jobs::Consume, Karafka::Pro::Processing::ConsumerGroups::Jobs::ConsumeNonBlocking>] jobs for scheduling

Raises:

  • (NotImplementedError)


77
78
79
# File 'lib/karafka/pro/processing/schedulers/base.rb', line 77

def schedule_consumption(_jobs_array)
  raise NotImplementedError, "Implement in a subclass"
end

#schedule_fifo(jobs_array) ⇒ Object Also known as: schedule_revocation, schedule_shutdown, schedule_idle, schedule_periodic

Schedules any jobs provided in a fifo order

Parameters:



53
54
55
56
57
# File 'lib/karafka/pro/processing/schedulers/base.rb', line 53

def schedule_fifo(jobs_array)
  jobs_array.each do |job|
    @queue << job
  end
end