5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 
     | 
    
      # File 'lib/canvas_sync/job_batches/jobs/managed_batch_job.rb', line 5
def self.make_batch(sub_jobs, ordered: true, concurrency: nil, context: nil, preflight_check: nil, desc_prefix: nil, &blk)
  desc_prefix ||= ''
  if concurrency == 0 || concurrency == nil || concurrency == true
    concurrency = sub_jobs.count
  elsif concurrency == false
    concurrency = 1
  end
  root_batch = Batch.new
  man_batch_id = nil
  if concurrency < sub_jobs.count
    man_batch_id = SecureRandom.urlsafe_base64(10)
    Batch.redis do |r|
      r.multi do |r|
        r.hset("MNGBID-#{man_batch_id}", "root_bid", root_batch.bid)
        r.hset("MNGBID-#{man_batch_id}", "ordered", ordered ? 1 : 0)
        r.hset("MNGBID-#{man_batch_id}", "concurrency", concurrency)
        r.hset("MNGBID-#{man_batch_id}", "preflight_check", preflight_check) if preflight_check.present?
        r.expire("MNGBID-#{man_batch_id}", Batch::BID_EXPIRE_TTL)
        mapped_sub_jobs = sub_jobs.each_with_index.map do |j, i|
          j['_mngbid_index_'] = i           j = ::ActiveJob::Arguments.serialize([j])
          JSON.unparse(j)
        end
        if ordered
          r.rpush("MNGBID-#{man_batch_id}-jobs", mapped_sub_jobs)
        else
          r.sadd("MNGBID-#{man_batch_id}-jobs", mapped_sub_jobs)
        end
        r.expire("MNGBID-#{man_batch_id}-jobs", Batch::BID_EXPIRE_TTL)
      end
    end
    root_batch.allow_context_changes = (concurrency == 1)
    root_batch.on(:success, "#{to_s}.cleanup_redis", managed_batch_id: man_batch_id)
    desc_prefix = "MGD(#{man_batch_id}): #{desc_prefix}"
  end
  root_batch.context = context
  blk.call(ManagedBatchProxy.new(root_batch)) if blk.present?
  root_batch.description = "#{desc_prefix}#{root_batch.description || 'Root'}"
  root_batch.context["managed_batch_bid"] = man_batch_id if man_batch_id
  if concurrency < sub_jobs.count
    root_batch.placeholder!
    concurrency.times do
      perform_next_sequence_job(man_batch_id, skip_preflight: true)
    end
  else
    root_batch.jobs do
      sub_jobs.each do |j|
        ChainBuilder.enqueue_job(j)
      end
    end
  end
  root_batch
end
     |