Class: LcpRuby::BackgroundJobs::Enqueue

Inherits:
Object
  • Object
show all
Defined in:
lib/lcp_ruby/background_jobs/enqueue.rb

Class Method Summary collapse

Class Method Details

.call(job_type:, params: {}, target_model: nil, target_id: nil, triggered_by: nil, scheduled_at: nil) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/lcp_ruby/background_jobs/enqueue.rb', line 8

def self.call(job_type:, params: {}, target_model: nil, target_id: nil,
              triggered_by: nil, scheduled_at: nil)
  definition = Registry.definition_by_name(job_type)
  unless definition
    raise JobDefinitionNotFoundError,
      "Job definition '#{job_type}' not found. " \
      "Define it in config/lcp_ruby/jobs/ or run the appropriate generator."
  end

  execution_model = resolve_execution_model
  params = params.is_a?(Hash) ? params : {}

  # Compute unique key
  unique_key = nil
  if definition.has_unique_by?
    unique_attrs = {
      "job_type" => job_type,
      "target_model" => target_model,
      "target_id" => target_id,
      "triggered_by_id" => triggered_by&.id,
      "params" => params
    }
    unique_key = UniqueKeyBuilder.build(definition.unique_by, unique_attrs)

    # Check for existing pending/running job with same key
    existing = execution_model
      .where(job_type: job_type, status: %w[pending running])
      .where(unique_key: unique_key)
      .first
    return existing if existing
  end

  # Capture user snapshot
  snapshot = if triggered_by
               UserSnapshot.capture(triggered_by)
  else
               { "system" => true, "source" => "api" }
  end

  # Create execution record
  execution = execution_model.create!(
    job_type: job_type,
    status: :pending,
    progress: 0,
    params: params,
    target_model: target_model,
    target_id: target_id,
    triggered_by_id: triggered_by&.id,
    triggered_by_snapshot: snapshot,
    unique_key: unique_key,
    scheduled_at: scheduled_at,
    attempt: 0,
    log: []
  )

  # Enqueue ActiveJob
  job_options = { queue: definition.queue }
  job = if scheduled_at
          ExecutorJob.set(**job_options, wait_until: scheduled_at).perform_later(execution.id)
  else
          ExecutorJob.set(**job_options).perform_later(execution.id)
  end

  execution.update_columns(active_job_id: job.provider_job_id) if job.respond_to?(:provider_job_id)

  execution
end