Module: Cosmo::Job::ClassMethods

Defined in:
lib/cosmo/job.rb

Instance Method Summary collapse

Instance Method Details

#concurrency_key(args) ⇒ Object

Derive the fully-scoped concurrency key for a given args array.



57
58
59
60
61
62
63
64
# File 'lib/cosmo/job.rb', line 57

def concurrency_key(args)
  config = concurrency_options
  return unless config

  base = Utils::String.underscore(name)
  suffix = config[:key]&.call(*args)
  suffix ? "#{base}/#{suffix}" : base
end

#concurrency_optionsObject

Returns a normalized concurrency config hash, or nil when not configured. Always contains :limit, :key, and :duration.



45
46
47
48
49
50
51
52
53
54
# File 'lib/cosmo/job.rb', line 45

def concurrency_options
  value = default_options.dig(:limit, :concurrency)
  duration = default_options.dig(:limit, :duration).to_i
  return unless value

  case value
  when Integer then { limit: value, key: nil, duration: duration }
  when Hash    then { limit: value.fetch(:to), key: value[:key], duration: duration }
  end
end

#default_optionsObject



95
96
97
# File 'lib/cosmo/job.rb', line 95

def default_options
  @default_options ||= (superclass.respond_to?(:default_options) ? superclass.default_options : Data::DEFAULTS).dup
end

#limits_concurrency?Boolean

Returns:

  • (Boolean)


39
40
41
# File 'lib/cosmo/job.rb', line 39

def limits_concurrency?
  !!concurrency_options
end

#options(**config) ⇒ Object Also known as: cosmo_options

Parameters:

  • config (Hash)

    a customizable set of options

Options Hash (**config):

  • :stream (Symbol)

    NATS stream to publish to (default: :default)

  • :retry (Integer)

    max delivery attempts before giving up (default: 3)

  • :dead (Boolean)

    move to dead-letter stream after retries exhausted (default: true)

  • :limit (Hash)

    execution limits:

    limit: { duration: 30 } limit: { duration: 30, concurrency: 3 } limit: { duration: 30, concurrency: { to: 3, key: ->(id) { id } } }

  • :"limit[:duration]" (Integer)

    hard execution timeout in seconds. The job thread is killed after this many seconds and counts as a failed attempt (retried with exponential backoff, moved to DLQ after retries exhausted).

  • :"limit[:concurrency]" (Integer, Hash)

    caps how many instances run at once across all workers. Jobs that cannot acquire a slot are NAK’d with a delay equal to duration so they are not re-delivered until the slot is guaranteed free. Requires duration. Pass an Integer for a class-wide cap, or { to: N, key: ->(args) {} } to scope per key.



30
31
32
33
34
35
36
# File 'lib/cosmo/job.rb', line 30

def options(**config)
  if config[:limit] && config.dig(:limit, :concurrency) && !config.dig(:limit, :duration).to_i.positive?
    raise ArgumentError, "limit: duration is required when concurrency is set"
  end

  default_options.merge!(config)
end

#perform(*args, async: true, **options) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/cosmo/job.rb', line 66

def perform(*args, async: true, **options)
  data = Data.new(name, args, default_options.merge(options))
  unless async
    payload = Utils::Json.parse(data.to_args[1])
    raise ArgumentError, "Cannot parse payload" unless payload

    new.perform(*payload[:args])
    return
  end

  Publisher.publish_job(data)
end

#perform_async(*args) ⇒ Object



79
80
81
# File 'lib/cosmo/job.rb', line 79

def perform_async(*args)
  perform(*args)
end

#perform_at(timestamp, *args) ⇒ Object



83
84
85
# File 'lib/cosmo/job.rb', line 83

def perform_at(timestamp, *args)
  perform(*args, at: timestamp)
end

#perform_in(interval, *args) ⇒ Object



87
88
89
# File 'lib/cosmo/job.rb', line 87

def perform_in(interval, *args)
  perform(*args, in: interval)
end

#perform_sync(*args) ⇒ Object



91
92
93
# File 'lib/cosmo/job.rb', line 91

def perform_sync(*args)
  perform(*args, async: false)
end