Class: JobWorkflow::JobStatus

Inherits:
Object
  • Object
show all
Defined in:
lib/job_workflow/job_status.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(task_job_statuses: []) ⇒ JobStatus

: (?task_job_statuses: Array) -> void



18
19
20
21
# File 'lib/job_workflow/job_status.rb', line 18

def initialize(task_job_statuses: [])
  self.task_job_statuses = {}
  task_job_statuses.each { |task_job_status| update_task_job_status(task_job_status) }
end

Class Method Details

.deserialize(hash) ⇒ Object

: (Hash[String, untyped]) -> JobStatus



12
13
14
# File 'lib/job_workflow/job_status.rb', line 12

def deserialize(hash)
  new(task_job_statuses: hash.fetch("task_job_statuses", []).map { |shash| TaskJobStatus.deserialize(shash) })
end

.from_hash_array(array) ⇒ Object

: (Array[Hash[untyped, untyped]]) -> JobStatus



7
8
9
# File 'lib/job_workflow/job_status.rb', line 7

def from_hash_array(array)
  new(task_job_statuses: array.map { |hash| TaskJobStatus.from_hash(hash) })
end

Instance Method Details

#fetch(task_name:, index:) ⇒ Object

: (task_name: Symbol, index: Integer) -> TaskJobStatus?



29
30
31
# File 'lib/job_workflow/job_status.rb', line 29

def fetch(task_name:, index:)
  task_job_statuses.fetch(task_name, [])[index]
end

#fetch_all(task_name:) ⇒ Object

: (task_name: Symbol) -> Array



24
25
26
# File 'lib/job_workflow/job_status.rb', line 24

def fetch_all(task_name:)
  task_job_statuses.fetch(task_name, []).compact
end

#finished_job_ids(task_name:) ⇒ Object

: (task_name: Symbol) -> Array



34
35
36
# File 'lib/job_workflow/job_status.rb', line 34

def finished_job_ids(task_name:)
  fetch_all(task_name:).filter(&:finished?).map(&:job_id)
end

#flat_task_job_statusesObject

: () -> Array



39
40
41
# File 'lib/job_workflow/job_status.rb', line 39

def flat_task_job_statuses
  task_job_statuses.values.flatten
end

#needs_waiting?(task_name) ⇒ Boolean

Note:
  • If the array is empty, the task is not enqueued and is considered completed.

  • If we add a task existence check in the future, we’ll check here.

: (Symbol) -> bool

Returns:

  • (Boolean)


48
49
50
# File 'lib/job_workflow/job_status.rb', line 48

def needs_waiting?(task_name)
  task_job_statuses.fetch(task_name, []).all?(&:finished?)
end

#update_task_job_status(task_job_status) ⇒ Object

: (TaskJobStatus) -> void



53
54
55
56
# File 'lib/job_workflow/job_status.rb', line 53

def update_task_job_status(task_job_status)
  task_job_statuses[task_job_status.task_name] ||= []
  task_job_statuses[task_job_status.task_name][task_job_status.each_index] = task_job_status
end

#update_task_job_statuses_from_db(task_name) ⇒ Object

: (Symbol) -> void



73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/job_workflow/job_status.rb', line 73

def update_task_job_statuses_from_db(task_name)
  statuses = task_job_statuses.fetch(task_name, []).reject(&:finished?).index_by(&:job_id)
  return if statuses.empty?

  task_jobs = QueueAdapter.current.fetch_job_statuses(statuses.keys)

  statuses.each do |job_id, task_job_status|
    task_job = task_jobs[job_id]
    next unless task_job

    task_job_status.update_status(QueueAdapter.current.job_status(task_job))
    update_task_job_status(task_job_status)
  end
end

#update_task_job_statuses_from_jobs(task_name:, jobs:) ⇒ Object

: (task_name: Symbol, jobs: Array) -> void



59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/job_workflow/job_status.rb', line 59

def update_task_job_statuses_from_jobs(task_name:, jobs:)
  jobs.each.with_index do |job, index|
    update_task_job_status(
      TaskJobStatus.new(
        task_name:,
        job_id: job.job_id,
        each_index: index,
        status: :pending
      )
    )
  end
end