Class: CanvasSync::JobBatches::ManagedBatchJob

Inherits:
BaseJob
  • Object
show all
Defined in:
lib/canvas_sync/job_batches/jobs/managed_batch_job.rb

Defined Under Namespace

Classes: ManagedBatchProxy

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.cleanup_redis(status, options) ⇒ Object



76
77
78
79
80
81
82
83
84
# File 'lib/canvas_sync/job_batches/jobs/managed_batch_job.rb', line 76

def self.cleanup_redis(status, options)
  man_batch_id = options['managed_batch_id']
  Batch.redis do |r|
    r.del(
      "MNGBID-#{man_batch_id}",
      "MNGBID-#{man_batch_id}-jobs",
    )
  end
end

.job_succeeded_callback(status, options) ⇒ Object



86
87
88
89
# File 'lib/canvas_sync/job_batches/jobs/managed_batch_job.rb', line 86

def self.job_succeeded_callback(status, options)
  man_batch_id = options['managed_batch_id']
  perform_next_sequence_job(man_batch_id)
end

.make_batch(sub_jobs, ordered: true, concurrency: nil, context: nil, preflight_check: nil, desc_prefix: nil, &blk) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/canvas_sync/job_batches/jobs/managed_batch_job.rb', line 5

def self.make_batch(sub_jobs, ordered: true, concurrency: nil, context: nil, preflight_check: nil, desc_prefix: nil, &blk)
  desc_prefix ||= ''

  if concurrency == 0 || concurrency == nil || concurrency == true
    concurrency = sub_jobs.count
  elsif concurrency == false
    concurrency = 1
  end

  root_batch = Batch.new
  man_batch_id = nil

  if concurrency < sub_jobs.count
    man_batch_id = SecureRandom.urlsafe_base64(10)

    Batch.redis do |r|
      r.multi do |r|
        r.hset("MNGBID-#{man_batch_id}", "root_bid", root_batch.bid)
        r.hset("MNGBID-#{man_batch_id}", "ordered", ordered ? 1 : 0)
        r.hset("MNGBID-#{man_batch_id}", "concurrency", concurrency)
        r.hset("MNGBID-#{man_batch_id}", "preflight_check", preflight_check) if preflight_check.present?
        r.expire("MNGBID-#{man_batch_id}", Batch::BID_EXPIRE_TTL)

        mapped_sub_jobs = sub_jobs.each_with_index.map do |j, i|
          j['_mngbid_index_'] = i # This allows duplicate jobs when a Redis Set is used
          j = ::ActiveJob::Arguments.serialize([j])
          JSON.unparse(j)
        end
        if ordered
          r.rpush("MNGBID-#{man_batch_id}-jobs", mapped_sub_jobs)
        else
          r.sadd("MNGBID-#{man_batch_id}-jobs", mapped_sub_jobs)
        end
        r.expire("MNGBID-#{man_batch_id}-jobs", Batch::BID_EXPIRE_TTL)
      end
    end

    root_batch.allow_context_changes = (concurrency == 1)
    root_batch.on(:success, "#{to_s}.cleanup_redis", managed_batch_id: man_batch_id)

    desc_prefix = "MGD(#{man_batch_id}): #{desc_prefix}"
  end

  root_batch.context = context

  blk.call(ManagedBatchProxy.new(root_batch)) if blk.present?

  root_batch.description = "#{desc_prefix}#{root_batch.description || 'Root'}"

  root_batch.context["managed_batch_bid"] = man_batch_id if man_batch_id

  if concurrency < sub_jobs.count
    root_batch.jobs {}
    concurrency.times do
      perform_next_sequence_job(man_batch_id, skip_preflight: true)
    end
  else
    root_batch.jobs do
      sub_jobs.each do |j|
        ChainBuilder.enqueue_job(j)
      end
    end
  end

  root_batch
end

Instance Method Details

#perform(sub_jobs, **kwargs) ⇒ Object



72
73
74
# File 'lib/canvas_sync/job_batches/jobs/managed_batch_job.rb', line 72

def perform(sub_jobs, **kwargs)
  self.class.make_batch(sub_jobs, **kwargs)
end