Module: Gouda
- Defined in:
- lib/gouda.rb,
lib/gouda/bulk.rb,
lib/gouda/worker.rb,
lib/gouda/railtie.rb,
lib/gouda/version.rb,
lib/gouda/queue_constraints.rb,
lib/generators/gouda/install_generator.rb,
lib/gouda/active_job_extensions/interrupts.rb,
lib/gouda/active_job_extensions/concurrency.rb
Defined Under Namespace
Modules: ActiveJobExtensions, AnyQueue, BulkAdapterExtension, Scheduler Classes: Adapter, CombinedShutdownCheck, ConcurrencyExceededError, Configuration, EmptyQueueShutdownCheck, ExceptQueueConstraint, InstallGenerator, InterruptError, JobFuse, OnlyQueuesConstraint, Railtie, ThreadSafeSet, TimerShutdownCheck, TrapShutdownCheck, Workload
Constant Summary collapse
- POLL_INTERVAL_DURATION_SECONDS =
1
- VERSION =
"0.1.2"
Class Method Summary collapse
- .config ⇒ Object
- .configure {|config| ... } ⇒ Object
- .create_tables(active_record_schema) ⇒ Object
-
.enqueue_jobs_via_their_adapters(active_jobs) ⇒ Object
This method exists in edge Rails so probably can be replaced later: github.com/rails/rails/commit/9b62f88a2fde0d2bf8c4f6e3bcd06ecba7ca9d8d.
- .in_bulk(&blk) ⇒ Object
- .logger ⇒ Object
- .parse_queue_constraint(constraint_str_from_envvar) ⇒ Object
-
.queue_parser(string) ⇒ Hash
Parse a string representing a group of queues into a more readable data structure.
- .sleep_with_interruptions(n_seconds, must_abort_proc) ⇒ Object
- .start ⇒ Object
-
.worker_loop(n_threads:, check_shutdown: TrapShutdownCheck.new, queue_constraint: Gouda::AnyQueue) ⇒ Object
Start looping, taking work from the queue and performing it, over multiple worker threads.
Class Method Details
.config ⇒ Object
63 64 65 |
# File 'lib/gouda.rb', line 63 def self.config @config ||= Configuration.new end |
.configure {|config| ... } ⇒ Object
67 68 69 |
# File 'lib/gouda.rb', line 67 def self.configure yield config end |
.create_tables(active_record_schema) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/gouda.rb', line 75 def self.create_tables(active_record_schema) active_record_schema.create_enum :gouda_workload_state, %w[enqueued executing finished] active_record_schema.create_table :gouda_workloads, id: :uuid do |t| t.uuid :active_job_id, null: false t. :scheduled_at, null: false t. :execution_started_at t. :execution_finished_at t. :last_execution_heartbeat_at t. :interrupted_at, null: true t.string :scheduler_key, null: true t.string :queue_name, null: false, default: "default" t.integer :priority t.string :active_job_class_name, null: false t.jsonb :serialized_params t.jsonb :error, default: {}, null: false t.enum :state, enum_type: :gouda_workload_state, default: "enqueued", null: false t.string :execution_concurrency_key t.string :enqueue_concurrency_key t.string :executing_on t.integer :position_in_bulk t. end active_record_schema.add_index :gouda_workloads, [:priority, :id, :scheduled_at], where: "state = 'enqueued'", name: :gouda_checkout_all_index active_record_schema.add_index :gouda_workloads, [:id, :last_execution_heartbeat_at], where: "state = 'executing'", name: :gouda_last_heartbeat_index active_record_schema.add_index :gouda_workloads, [:enqueue_concurrency_key], where: "state = 'enqueued' AND enqueue_concurrency_key IS NOT NULL", unique: true, name: :guard_double_enqueue active_record_schema.add_index :gouda_workloads, [:scheduler_key], where: "state = 'enqueued' AND scheduler_key IS NOT NULL", unique: true, name: :guard_double_schedule active_record_schema.add_index :gouda_workloads, [:execution_concurrency_key], where: "state = 'executing' AND execution_concurrency_key IS NOT NULL", unique: true, name: :guard_double_exec active_record_schema.add_index :gouda_workloads, [:active_job_id], name: :same_job_display_idx active_record_schema.add_index :gouda_workloads, [:priority], order: {priority: "ASC NULLS LAST"}, name: :ordered_priority_idx active_record_schema.add_index :gouda_workloads, [:last_execution_heartbeat_at], name: :index_gouda_workloads_on_last_execution_heartbeat_at active_record_schema.add_index :gouda_workloads, [:scheduler_key], name: :index_gouda_workloads_on_scheduler_key active_record_schema.create_table :gouda_job_fuses, id: false do |t| t.string :active_job_class_name, null: false t. end end |
.enqueue_jobs_via_their_adapters(active_jobs) ⇒ Object
This method exists in edge Rails so probably can be replaced later: github.com/rails/rails/commit/9b62f88a2fde0d2bf8c4f6e3bcd06ecba7ca9d8d
18 19 20 21 22 23 24 25 26 27 |
# File 'lib/gouda/bulk.rb', line 18 def self.enqueue_jobs_via_their_adapters(active_jobs) jobs_per_adapter = active_jobs.compact.group_by { |aj| aj.class.queue_adapter } jobs_per_adapter.each_pair do |adapter, active_jobs| if adapter.respond_to?(:enqueue_all) adapter.enqueue_all(active_jobs) else active_jobs.each { |aj| adapter.enqueue(aj) } end end end |
.in_bulk(&blk) ⇒ Object
4 5 6 7 8 9 10 11 12 13 14 |
# File 'lib/gouda/bulk.rb', line 4 def self.in_bulk(&blk) if Thread.current[:gouda_bulk_buffer].nil? Thread.current[:gouda_bulk_buffer] = [] retval = yield buf, Thread.current[:gouda_bulk_buffer] = Thread.current[:gouda_bulk_buffer], nil enqueue_jobs_via_their_adapters(buf) retval else # There already is an open bulk yield end end |
.parse_queue_constraint(constraint_str_from_envvar) ⇒ Object
28 29 30 31 32 33 34 35 36 37 |
# File 'lib/gouda/queue_constraints.rb', line 28 def self.parse_queue_constraint(constraint_str_from_envvar) parsed = queue_parser(constraint_str_from_envvar) if parsed[:include] OnlyQueuesConstraint.new(parsed[:include]) elsif parsed[:exclude] ExceptQueueConstraint.new(parsed[:exclude]) else AnyQueue end end |
.queue_parser(string) ⇒ Hash
Parse a string representing a group of queues into a more readable data structure.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/gouda/queue_constraints.rb', line 52 def self.queue_parser(string) string = string.presence || "*" case string.first when "-" exclude_queues = true string = string[1..] when "+" string = string[1..] end queues = string.split(",").map(&:strip) if queues.include?("*") {all: true} elsif exclude_queues {exclude: queues} else {include: queues} end end |
.sleep_with_interruptions(n_seconds, must_abort_proc) ⇒ Object
178 179 180 181 182 183 184 185 186 187 |
# File 'lib/gouda/worker.rb', line 178 def self.sleep_with_interruptions(n_seconds, must_abort_proc) start_time_seconds = Process.clock_gettime(Process::CLOCK_MONOTONIC) # remaining_seconds = n_seconds check_interval_seconds = Gouda.config.polling_sleep_interval_seconds loop do return if must_abort_proc.call return if Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time_seconds >= n_seconds sleep(check_interval_seconds) end end |
.start ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/gouda.rb', line 48 def self.start Gouda::Scheduler.upsert_workloads_from_entries_list! queue_constraint = if ENV["GOUDA_QUEUES"] Gouda.parse_queue_constraint(ENV["GOUDA_QUEUES"]) else Gouda::AnyQueue end Gouda.logger.info("Gouda version: #{Gouda::VERSION}") Gouda.logger.info("Worker threads: #{Gouda.config.worker_thread_count}") Gouda.worker_loop(n_threads: Gouda.config.worker_thread_count, queue_constraint: queue_constraint) end |
.worker_loop(n_threads:, check_shutdown: TrapShutdownCheck.new, queue_constraint: Gouda::AnyQueue) ⇒ Object
Start looping, taking work from the queue and performing it, over multiple worker threads. Once the ‘check_shutdown` callable returns `true` the threads will cleanly terminate and the method will return (so it is blocking).
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/gouda/worker.rb', line 116 def self.worker_loop(n_threads:, check_shutdown: TrapShutdownCheck.new, queue_constraint: Gouda::AnyQueue) # We need quite a few things when starting the loop - we have to be far enough into the Rails bootup sequence # that both the application and the executor are available # # raise "Rails is not loaded yet" unless defined?(Rails) && Rails.respond_to?(:application) # raise "Rails application is not loaded yet" unless Rails.application # raise "Rails executor not available yet" unless Rails.application.executor check_shutdown = CombinedShutdownCheck.new(*check_shutdown) if !check_shutdown.respond_to?(:call) && check_shutdown.is_a?(Array) worker_id = [Socket.gethostname, Process.pid, SecureRandom.uuid].join("-") executing_workload_ids = ThreadSafeSet.new raise ArgumentError, "You need at least 1 worker thread, but you requested #{n_threads}" if n_threads < 1 worker_threads = n_threads.times.map do Thread.new do worker_id_and_thread_id = [worker_id, "t0x#{Thread.current.object_id.to_s(16)}"].join("-") loop do break if check_shutdown.call did_process = Gouda.config.app_executor.wrap do Gouda::Workload.checkout_and_perform_one(executing_on: worker_id_and_thread_id, queue_constraint: queue_constraint, in_progress: executing_workload_ids) end # If no job was retrieved the queue is likely empty. Relax the polling then and ease off. # If a job was retrieved it is likely that a burst has just been enqueued, and we do not # sleep but proceed to attempt to retrieve the next job right after. jitter_sleep_interval = POLL_INTERVAL_DURATION_SECONDS + (POLL_INTERVAL_DURATION_SECONDS * 0.25) sleep_with_interruptions(jitter_sleep_interval, check_shutdown) unless did_process rescue => e warn "Uncaught exception during perform (#{e.class} - #{e}" end end end # Do the housekeeping tasks on main loop do break if check_shutdown.call Gouda.config.app_executor.wrap do # Mark known executing jobs as such. If a worker process is killed or the machine it is running on dies, # a stale timestamp can indicate to us that the job was orphaned and is marked as "executing" # even though the worker it was running on has failed for whatever reason. # Later on we can figure out what to do with those jobs (re-enqueue them or toss them) Gouda::Workload.where(id: executing_workload_ids.to_a, state: "executing").update_all(executing_on: worker_id, last_execution_heartbeat_at: Time.now.utc) # Find jobs which just hung and clean them up (mark them as "finished" and enqueue replacement workloads if possible) Gouda::Workload.reap_zombie_workloads rescue => e # Appsignal.add_exception(e) warn "Uncaught exception during housekeeping (#{e.class} - #{e}" end # Jitter the sleep so that the workers booted at the same time do not all dogpile randomized_sleep_duration_s = POLL_INTERVAL_DURATION_SECONDS + (POLL_INTERVAL_DURATION_SECONDS.to_f * rand) sleep_with_interruptions(randomized_sleep_duration_s, check_shutdown) end ensure worker_threads&.map(&:join) end |