Module: Cosmo::Job::ClassMethods
- Defined in:
- lib/cosmo/job.rb
Instance Method Summary collapse
-
#concurrency_key(args) ⇒ Object
Derive the fully-scoped concurrency key for a given args array.
-
#concurrency_options ⇒ Object
Returns a normalized concurrency config hash, or
nilwhen not configured. - #default_options ⇒ Object
- #limits_concurrency? ⇒ Boolean
- #options(**config) ⇒ Object (also: #cosmo_options)
- #perform(*args, async: true, **options) ⇒ Object
- #perform_async(*args) ⇒ Object
- #perform_at(timestamp, *args) ⇒ Object
- #perform_in(interval, *args) ⇒ Object
- #perform_sync(*args) ⇒ Object
Instance Method Details
#concurrency_key(args) ⇒ Object
Derive the fully-scoped concurrency key for a given args array.
57 58 59 60 61 62 63 64 |
# File 'lib/cosmo/job.rb', line 57 def concurrency_key(args) config = return unless config base = Utils::String.underscore(name) suffix = config[:key]&.call(*args) suffix ? "#{base}/#{suffix}" : base end |
#concurrency_options ⇒ Object
Returns a normalized concurrency config hash, or nil when not configured. Always contains :limit, :key, and :duration.
45 46 47 48 49 50 51 52 53 54 |
# File 'lib/cosmo/job.rb', line 45 def value = .dig(:limit, :concurrency) duration = .dig(:limit, :duration).to_i return unless value case value when Integer then { limit: value, key: nil, duration: duration } when Hash then { limit: value.fetch(:to), key: value[:key], duration: duration } end end |
#default_options ⇒ Object
95 96 97 |
# File 'lib/cosmo/job.rb', line 95 def @default_options ||= (superclass.respond_to?(:default_options) ? superclass. : Data::DEFAULTS).dup end |
#limits_concurrency? ⇒ Boolean
39 40 41 |
# File 'lib/cosmo/job.rb', line 39 def limits_concurrency? !! end |
#options(**config) ⇒ Object Also known as: cosmo_options
30 31 32 33 34 35 36 |
# File 'lib/cosmo/job.rb', line 30 def (**config) if config[:limit] && config.dig(:limit, :concurrency) && !config.dig(:limit, :duration).to_i.positive? raise ArgumentError, "limit: duration is required when concurrency is set" end .merge!(config) end |
#perform(*args, async: true, **options) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/cosmo/job.rb', line 66 def perform(*args, async: true, **) data = Data.new(name, args, .merge()) unless async payload = Utils::Json.parse(data.to_args[1]) raise ArgumentError, "Cannot parse payload" unless payload new.perform(*payload[:args]) return end Publisher.publish_job(data) end |
#perform_async(*args) ⇒ Object
79 80 81 |
# File 'lib/cosmo/job.rb', line 79 def perform_async(*args) perform(*args) end |
#perform_at(timestamp, *args) ⇒ Object
83 84 85 |
# File 'lib/cosmo/job.rb', line 83 def perform_at(, *args) perform(*args, at: ) end |
#perform_in(interval, *args) ⇒ Object
87 88 89 |
# File 'lib/cosmo/job.rb', line 87 def perform_in(interval, *args) perform(*args, in: interval) end |
#perform_sync(*args) ⇒ Object
91 92 93 |
# File 'lib/cosmo/job.rb', line 91 def perform_sync(*args) perform(*args, async: false) end |