8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
# File 'lib/lcp_ruby/background_jobs/enqueue.rb', line 8
def self.call(job_type:, params: {}, target_model: nil, target_id: nil,
triggered_by: nil, scheduled_at: nil)
definition = Registry.definition_by_name(job_type)
unless definition
raise JobDefinitionNotFoundError,
"Job definition '#{job_type}' not found. " \
"Define it in config/lcp_ruby/jobs/ or run the appropriate generator."
end
execution_model = resolve_execution_model
params = params.is_a?(Hash) ? params : {}
unique_key = nil
if definition.has_unique_by?
unique_attrs = {
"job_type" => job_type,
"target_model" => target_model,
"target_id" => target_id,
"triggered_by_id" => triggered_by&.id,
"params" => params
}
unique_key = UniqueKeyBuilder.build(definition.unique_by, unique_attrs)
existing = execution_model
.where(job_type: job_type, status: %w[pending running])
.where(unique_key: unique_key)
.first
return existing if existing
end
snapshot = if triggered_by
UserSnapshot.capture(triggered_by)
else
{ "system" => true, "source" => "api" }
end
execution = execution_model.create!(
job_type: job_type,
status: :pending,
progress: 0,
params: params,
target_model: target_model,
target_id: target_id,
triggered_by_id: triggered_by&.id,
triggered_by_snapshot: snapshot,
unique_key: unique_key,
scheduled_at: scheduled_at,
attempt: 0,
log: []
)
job_options = { queue: definition.queue }
job = if scheduled_at
ExecutorJob.set(**job_options, wait_until: scheduled_at).perform_later(execution.id)
else
ExecutorJob.set(**job_options).perform_later(execution.id)
end
execution.update_columns(active_job_id: job.provider_job_id) if job.respond_to?(:provider_job_id)
execution
end
|