Module: Gouda

Defined in:
lib/gouda.rb,
lib/gouda/bulk.rb,
lib/gouda/worker.rb,
lib/gouda/railtie.rb,
lib/gouda/version.rb,
lib/gouda/queue_constraints.rb,
lib/generators/gouda/install_generator.rb,
lib/gouda/active_job_extensions/interrupts.rb,
lib/gouda/active_job_extensions/concurrency.rb

Defined Under Namespace

Modules: ActiveJobExtensions, AnyQueue, BulkAdapterExtension, Scheduler Classes: Adapter, CombinedShutdownCheck, ConcurrencyExceededError, Configuration, EmptyQueueShutdownCheck, ExceptQueueConstraint, InstallGenerator, InterruptError, JobFuse, OnlyQueuesConstraint, Railtie, ThreadSafeSet, TimerShutdownCheck, TrapShutdownCheck, Workload

Constant Summary collapse

POLL_INTERVAL_DURATION_SECONDS =
1
VERSION =
"0.1.2"

Class Method Summary collapse

Class Method Details

.configObject



63
64
65
# File 'lib/gouda.rb', line 63

def self.config
  @config ||= Configuration.new
end

.configure {|config| ... } ⇒ Object

Yields:



67
68
69
# File 'lib/gouda.rb', line 67

def self.configure
  yield config
end

.create_tables(active_record_schema) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/gouda.rb', line 75

def self.create_tables(active_record_schema)
  active_record_schema.create_enum :gouda_workload_state, %w[enqueued executing finished]
  active_record_schema.create_table :gouda_workloads, id: :uuid do |t|
    t.uuid :active_job_id, null: false
    t.timestamp :scheduled_at, null: false
    t.timestamp :execution_started_at
    t.timestamp :execution_finished_at
    t.timestamp :last_execution_heartbeat_at
    t.timestamp :interrupted_at, null: true

    t.string :scheduler_key, null: true
    t.string :queue_name, null: false, default: "default"
    t.integer :priority
    t.string :active_job_class_name, null: false
    t.jsonb :serialized_params
    t.jsonb :error, default: {}, null: false
    t.enum :state, enum_type: :gouda_workload_state, default: "enqueued", null: false
    t.string :execution_concurrency_key
    t.string :enqueue_concurrency_key
    t.string :executing_on
    t.integer :position_in_bulk

    t.timestamps
  end

  active_record_schema.add_index :gouda_workloads, [:priority, :id, :scheduled_at], where: "state = 'enqueued'", name: :gouda_checkout_all_index
  active_record_schema.add_index :gouda_workloads, [:id, :last_execution_heartbeat_at], where: "state = 'executing'", name: :gouda_last_heartbeat_index
  active_record_schema.add_index :gouda_workloads, [:enqueue_concurrency_key], where: "state = 'enqueued' AND enqueue_concurrency_key IS NOT NULL", unique: true, name: :guard_double_enqueue
  active_record_schema.add_index :gouda_workloads, [:scheduler_key], where: "state = 'enqueued' AND scheduler_key IS NOT NULL", unique: true, name: :guard_double_schedule
  active_record_schema.add_index :gouda_workloads, [:execution_concurrency_key], where: "state = 'executing' AND execution_concurrency_key IS NOT NULL", unique: true, name: :guard_double_exec
  active_record_schema.add_index :gouda_workloads, [:active_job_id], name: :same_job_display_idx
  active_record_schema.add_index :gouda_workloads, [:priority], order: {priority: "ASC NULLS LAST"}, name: :ordered_priority_idx
  active_record_schema.add_index :gouda_workloads, [:last_execution_heartbeat_at], name: :index_gouda_workloads_on_last_execution_heartbeat_at
  active_record_schema.add_index :gouda_workloads, [:scheduler_key], name: :index_gouda_workloads_on_scheduler_key

  active_record_schema.create_table :gouda_job_fuses, id: false do |t|
    t.string :active_job_class_name, null: false

    t.timestamps
  end
end

.enqueue_jobs_via_their_adapters(active_jobs) ⇒ Object

This method exists in edge Rails so probably can be replaced later: github.com/rails/rails/commit/9b62f88a2fde0d2bf8c4f6e3bcd06ecba7ca9d8d



18
19
20
21
22
23
24
25
26
27
# File 'lib/gouda/bulk.rb', line 18

def self.enqueue_jobs_via_their_adapters(active_jobs)
  jobs_per_adapter = active_jobs.compact.group_by { |aj| aj.class.queue_adapter }
  jobs_per_adapter.each_pair do |adapter, active_jobs|
    if adapter.respond_to?(:enqueue_all)
      adapter.enqueue_all(active_jobs)
    else
      active_jobs.each { |aj| adapter.enqueue(aj) }
    end
  end
end

.in_bulk(&blk) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
# File 'lib/gouda/bulk.rb', line 4

def self.in_bulk(&blk)
  if Thread.current[:gouda_bulk_buffer].nil?
    Thread.current[:gouda_bulk_buffer] = []
    retval = yield
    buf, Thread.current[:gouda_bulk_buffer] = Thread.current[:gouda_bulk_buffer], nil
    enqueue_jobs_via_their_adapters(buf)
    retval
  else # There already is an open bulk
    yield
  end
end

.loggerObject



71
72
73
# File 'lib/gouda.rb', line 71

def self.logger
  Gouda.config.logger
end

.parse_queue_constraint(constraint_str_from_envvar) ⇒ Object



28
29
30
31
32
33
34
35
36
37
# File 'lib/gouda/queue_constraints.rb', line 28

def self.parse_queue_constraint(constraint_str_from_envvar)
  parsed = queue_parser(constraint_str_from_envvar)
  if parsed[:include]
    OnlyQueuesConstraint.new(parsed[:include])
  elsif parsed[:exclude]
    ExceptQueueConstraint.new(parsed[:exclude])
  else
    AnyQueue
  end
end

.queue_parser(string) ⇒ Hash

Parse a string representing a group of queues into a more readable data structure.

Examples:

Gouda::QueueConstraints.queue_parser('-queue1,queue2')
=> { exclude: [ 'queue1', 'queue2' ] }

Parameters:

  • string (String)

    Queue string

Returns:

  • (Hash)

    How to match a given queue. It can have the following keys and values:

    • { all: true } indicates that all queues match.

    • { exclude: Array<String> } indicates the listed queue names should not match.

    • { include: Array<String> } indicates the listed queue names should match.



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/gouda/queue_constraints.rb', line 52

def self.queue_parser(string)
  string = string.presence || "*"

  case string.first
  when "-"
    exclude_queues = true
    string = string[1..]
  when "+"
    string = string[1..]
  end

  queues = string.split(",").map(&:strip)

  if queues.include?("*")
    {all: true}
  elsif exclude_queues
    {exclude: queues}
  else
    {include: queues}
  end
end

.sleep_with_interruptions(n_seconds, must_abort_proc) ⇒ Object



178
179
180
181
182
183
184
185
186
187
# File 'lib/gouda/worker.rb', line 178

def self.sleep_with_interruptions(n_seconds, must_abort_proc)
  start_time_seconds = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  # remaining_seconds = n_seconds
  check_interval_seconds = Gouda.config.polling_sleep_interval_seconds
  loop do
    return if must_abort_proc.call
    return if Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time_seconds >= n_seconds
    sleep(check_interval_seconds)
  end
end

.startObject



48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/gouda.rb', line 48

def self.start
  Gouda::Scheduler.upsert_workloads_from_entries_list!

  queue_constraint = if ENV["GOUDA_QUEUES"]
    Gouda.parse_queue_constraint(ENV["GOUDA_QUEUES"])
  else
    Gouda::AnyQueue
  end

  Gouda.logger.info("Gouda version: #{Gouda::VERSION}")
  Gouda.logger.info("Worker threads: #{Gouda.config.worker_thread_count}")

  Gouda.worker_loop(n_threads: Gouda.config.worker_thread_count, queue_constraint: queue_constraint)
end

.worker_loop(n_threads:, check_shutdown: TrapShutdownCheck.new, queue_constraint: Gouda::AnyQueue) ⇒ Object

Start looping, taking work from the queue and performing it, over multiple worker threads. Once the ‘check_shutdown` callable returns `true` the threads will cleanly terminate and the method will return (so it is blocking).

Parameters:

  • n_threads (Integer)

    how many worker threads to start. Another thread will be started for housekeeping, so ideally this should be the size of your connection pool minus 1

  • check_shutdown (#call) (defaults to: TrapShutdownCheck.new)

    A callable object (can be a Proc etc.). Once starts returning ‘true` the worker threads and the housekeeping thread will cleanly exit



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/gouda/worker.rb', line 116

def self.worker_loop(n_threads:, check_shutdown: TrapShutdownCheck.new, queue_constraint: Gouda::AnyQueue)
  # We need quite a few things when starting the loop - we have to be far enough into the Rails bootup sequence
  # that both the application and the executor are available
  #
  # raise "Rails is not loaded yet" unless defined?(Rails) && Rails.respond_to?(:application)
  # raise "Rails application is not loaded yet" unless Rails.application
  # raise "Rails executor not available yet" unless Rails.application.executor

  check_shutdown = CombinedShutdownCheck.new(*check_shutdown) if !check_shutdown.respond_to?(:call) && check_shutdown.is_a?(Array)

  worker_id = [Socket.gethostname, Process.pid, SecureRandom.uuid].join("-")

  executing_workload_ids = ThreadSafeSet.new

  raise ArgumentError, "You need at least 1 worker thread, but you requested #{n_threads}" if n_threads < 1
  worker_threads = n_threads.times.map do
    Thread.new do
      worker_id_and_thread_id = [worker_id, "t0x#{Thread.current.object_id.to_s(16)}"].join("-")
      loop do
        break if check_shutdown.call

        did_process = Gouda.config.app_executor.wrap do
          Gouda::Workload.checkout_and_perform_one(executing_on: worker_id_and_thread_id, queue_constraint: queue_constraint, in_progress: executing_workload_ids)
        end

        # If no job was retrieved the queue is likely empty. Relax the polling then and ease off.
        # If a job was retrieved it is likely that a burst has just been enqueued, and we do not
        # sleep but proceed to attempt to retrieve the next job right after.
        jitter_sleep_interval = POLL_INTERVAL_DURATION_SECONDS + (POLL_INTERVAL_DURATION_SECONDS * 0.25)
        sleep_with_interruptions(jitter_sleep_interval, check_shutdown) unless did_process
      rescue => e
        warn "Uncaught exception during perform (#{e.class} - #{e}"
      end
    end
  end

  # Do the housekeeping tasks on main
  loop do
    break if check_shutdown.call

    Gouda.config.app_executor.wrap do
      # Mark known executing jobs as such. If a worker process is killed or the machine it is running on dies,
      # a stale timestamp can indicate to us that the job was orphaned and is marked as "executing"
      # even though the worker it was running on has failed for whatever reason.
      # Later on we can figure out what to do with those jobs (re-enqueue them or toss them)
      Gouda::Workload.where(id: executing_workload_ids.to_a, state: "executing").update_all(executing_on: worker_id, last_execution_heartbeat_at: Time.now.utc)

      # Find jobs which just hung and clean them up (mark them as "finished" and enqueue replacement workloads if possible)
      Gouda::Workload.reap_zombie_workloads
    rescue => e
      # Appsignal.add_exception(e)
      warn "Uncaught exception during housekeeping (#{e.class} - #{e}"
    end

    # Jitter the sleep so that the workers booted at the same time do not all dogpile
    randomized_sleep_duration_s = POLL_INTERVAL_DURATION_SECONDS + (POLL_INTERVAL_DURATION_SECONDS.to_f * rand)
    sleep_with_interruptions(randomized_sleep_duration_s, check_shutdown)
  end
ensure
  worker_threads&.map(&:join)
end