Class: Joblin::Batching::ManagedBatchJob

Inherits:
BaseJob
  • Object
show all
Defined in:
lib/joblin/batching/jobs/managed_batch_job.rb

Defined Under Namespace

Classes: ManagedBatchProxy

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.cleanup_redis(status, options) ⇒ Object



77
78
79
80
81
82
83
84
85
# File 'lib/joblin/batching/jobs/managed_batch_job.rb', line 77

def self.cleanup_redis(status, options)
  man_batch_id = options['managed_batch_id']
  Batch.redis do |r|
    r.del(
      "MNGBID-#{man_batch_id}",
      "MNGBID-#{man_batch_id}-jobs",
    )
  end
end

.job_succeeded_callback(status, options) ⇒ Object



87
88
89
90
# File 'lib/joblin/batching/jobs/managed_batch_job.rb', line 87

def self.job_succeeded_callback(status, options)
  man_batch_id = options['managed_batch_id']
  perform_next_sequence_job(man_batch_id)
end

.make_batch(sub_jobs, ordered: true, concurrency: nil, context: nil, preflight_check: nil, description: nil, desc_prefix: nil, &blk) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/joblin/batching/jobs/managed_batch_job.rb', line 5

def self.make_batch(sub_jobs, ordered: true, concurrency: nil, context: nil, preflight_check: nil, description: nil, desc_prefix: nil, &blk)
  desc_prefix ||= ''

  if concurrency == 0 || concurrency == nil || concurrency == true
    concurrency = sub_jobs.count
  elsif concurrency == false
    concurrency = 1
  end

  root_batch = Batch.new
  man_batch_id = nil

  if concurrency < sub_jobs.count
    man_batch_id = SecureRandom.urlsafe_base64(10)

    Batch.redis do |r|
      r.multi do |r|
        r.hset("MNGBID-#{man_batch_id}", "root_bid", root_batch.bid)
        r.hset("MNGBID-#{man_batch_id}", "ordered", ordered ? 1 : 0)
        r.hset("MNGBID-#{man_batch_id}", "concurrency", concurrency)
        r.hset("MNGBID-#{man_batch_id}", "preflight_check", preflight_check) if preflight_check.present?
        r.expire("MNGBID-#{man_batch_id}", Batch::BID_EXPIRE_TTL)

        mapped_sub_jobs = sub_jobs.each_with_index.map do |j, i|
          j['_mngbid_index_'] = i # This allows duplicate jobs when a Redis Set is used
          j = ::ActiveJob::Arguments.serialize([j])
          JSON.unparse(j)
        end
        if ordered
          r.rpush("MNGBID-#{man_batch_id}-jobs", mapped_sub_jobs)
        else
          r.sadd("MNGBID-#{man_batch_id}-jobs", mapped_sub_jobs)
        end
        r.expire("MNGBID-#{man_batch_id}-jobs", Batch::BID_EXPIRE_TTL)
      end
    end

    root_batch.allow_context_changes = (concurrency == 1)
    root_batch.on(:success, "#{to_s}.cleanup_redis", managed_batch_id: man_batch_id)

    desc_prefix = "MGD(#{man_batch_id}): #{desc_prefix}"
  end

  root_batch.context = context
  root_batch.description = description if description.present?

  blk.call(ManagedBatchProxy.new(root_batch)) if blk.present?

  root_batch.description = "#{desc_prefix}#{root_batch.description || 'Root'}"

  root_batch.context["managed_batch_bid"] = man_batch_id if man_batch_id

  if concurrency < sub_jobs.count
    root_batch.placeholder!
    concurrency.times do
      perform_next_sequence_job(man_batch_id, skip_preflight: true)
    end
  else
    root_batch.jobs do
      sub_jobs.each do |j|
        ChainBuilder.enqueue_job(j)
      end
    end
  end

  root_batch
end

.perform_next_sequence_job(man_batch_id, skip_preflight: false) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/joblin/batching/jobs/managed_batch_job.rb', line 94

def self.perform_next_sequence_job(man_batch_id, skip_preflight: false)
  root_bid, ordered, preflight_check = Batch.redis do |r|
    r.multi do |r|
      r.hget("MNGBID-#{man_batch_id}", "root_bid")
      r.hget("MNGBID-#{man_batch_id}", "ordered")
      r.hget("MNGBID-#{man_batch_id}", "preflight_check")
    end
  end

  if !skip_preflight && preflight_check.present?
    if preflight_check.include?(".")
      clazz, method_name = preflight_check.split('.')
      clazz = clazz.constantize
    else
      clazz = Object
      method_name = preflight_check
    end
    preflight_check = ->(*args) { clazz.send(method_name, *args) }
  else
    preflight_check = ->(*args) { true }
  end

  ordered = Joblin::MiscHelper.to_boolean(ordered)

  loop do
    next_job_json = Batch.redis do |r|
      if ordered
        r.lpop("MNGBID-#{man_batch_id}-jobs")
      else
        r.spop("MNGBID-#{man_batch_id}-jobs")
      end
    end

    break unless next_job_json.present?

    next_job = JSON.parse(next_job_json)
    next_job = ::ActiveJob::Arguments.deserialize(next_job)[0]

    preflight_result = preflight_check.call(next_job)
    if preflight_result == :abort
      cleanup_redis(nil, { "managed_batch_id" => man_batch_id })
      break
    elsif !preflight_check
      next
    end

    Batch.new(root_bid).jobs do
      Batch.new.tap do |batch|
        batch.description = "Managed Batch Fiber (#{man_batch_id})"
        batch.on(:success, "#{self.to_s}.job_succeeded_callback", managed_batch_id: man_batch_id)

        if next_job[:chain_link].present?
          # Annotate Batch with chain-step info
          batch.context["csb:chain_link"] = next_job[:chain_link]
          batch.on(:complete, "#{ChainBuilder.to_s}.chain_step_complete", chain_link: next_job[:chain_link])
        end

        batch.jobs do
          ChainBuilder.enqueue_job(next_job)
        end
      end
    end

    break
  end
end

Instance Method Details

#perform(sub_jobs, **kwargs) ⇒ Object



73
74
75
# File 'lib/joblin/batching/jobs/managed_batch_job.rb', line 73

def perform(sub_jobs, **kwargs)
  self.class.make_batch(sub_jobs, **kwargs)
end