Class: JobWorkflow::JobStatus
- Inherits:
-
Object
- Object
- JobWorkflow::JobStatus
- Defined in:
- lib/job_workflow/job_status.rb
Class Method Summary collapse
-
.deserialize(hash) ⇒ Object
: (Hash[String, untyped]) -> JobStatus.
-
.from_hash_array(array) ⇒ Object
: (Array[Hash[untyped, untyped]]) -> JobStatus.
Instance Method Summary collapse
-
#fetch(task_name:, index:) ⇒ Object
: (task_name: Symbol, index: Integer) -> TaskJobStatus?.
-
#fetch_all(task_name:) ⇒ Object
: (task_name: Symbol) -> Array.
-
#finished_job_ids(task_name:) ⇒ Object
: (task_name: Symbol) -> Array.
-
#flat_task_job_statuses ⇒ Object
: () -> Array.
-
#initialize(task_job_statuses: []) ⇒ JobStatus
constructor
: (?task_job_statuses: Array) -> void.
-
#needs_waiting?(task_name) ⇒ Boolean
: (Symbol) -> bool.
-
#update_task_job_status(task_job_status) ⇒ Object
: (TaskJobStatus) -> void.
-
#update_task_job_statuses_from_db(task_name) ⇒ Object
: (Symbol) -> void.
-
#update_task_job_statuses_from_jobs(task_name:, jobs:) ⇒ Object
: (task_name: Symbol, jobs: Array) -> void.
Constructor Details
#initialize(task_job_statuses: []) ⇒ JobStatus
: (?task_job_statuses: Array) -> void
18 19 20 21 |
# File 'lib/job_workflow/job_status.rb', line 18 def initialize(task_job_statuses: []) self.task_job_statuses = {} task_job_statuses.each { |task_job_status| update_task_job_status(task_job_status) } end |
Class Method Details
.deserialize(hash) ⇒ Object
: (Hash[String, untyped]) -> JobStatus
12 13 14 |
# File 'lib/job_workflow/job_status.rb', line 12 def deserialize(hash) new(task_job_statuses: hash.fetch("task_job_statuses", []).map { |shash| TaskJobStatus.deserialize(shash) }) end |
.from_hash_array(array) ⇒ Object
: (Array[Hash[untyped, untyped]]) -> JobStatus
7 8 9 |
# File 'lib/job_workflow/job_status.rb', line 7 def from_hash_array(array) new(task_job_statuses: array.map { |hash| TaskJobStatus.from_hash(hash) }) end |
Instance Method Details
#fetch(task_name:, index:) ⇒ Object
: (task_name: Symbol, index: Integer) -> TaskJobStatus?
29 30 31 |
# File 'lib/job_workflow/job_status.rb', line 29 def fetch(task_name:, index:) task_job_statuses.fetch(task_name, [])[index] end |
#fetch_all(task_name:) ⇒ Object
: (task_name: Symbol) -> Array
24 25 26 |
# File 'lib/job_workflow/job_status.rb', line 24 def fetch_all(task_name:) task_job_statuses.fetch(task_name, []).compact end |
#finished_job_ids(task_name:) ⇒ Object
: (task_name: Symbol) -> Array
34 35 36 |
# File 'lib/job_workflow/job_status.rb', line 34 def finished_job_ids(task_name:) fetch_all(task_name:).filter(&:finished?).map(&:job_id) end |
#flat_task_job_statuses ⇒ Object
: () -> Array
39 40 41 |
# File 'lib/job_workflow/job_status.rb', line 39 def flat_task_job_statuses task_job_statuses.values.flatten end |
#needs_waiting?(task_name) ⇒ Boolean
-
If the array is empty, the task is not enqueued and is considered completed.
-
If we add a task existence check in the future, we’ll check here.
: (Symbol) -> bool
48 49 50 |
# File 'lib/job_workflow/job_status.rb', line 48 def needs_waiting?(task_name) task_job_statuses.fetch(task_name, []).all?(&:finished?) end |
#update_task_job_status(task_job_status) ⇒ Object
: (TaskJobStatus) -> void
53 54 55 56 |
# File 'lib/job_workflow/job_status.rb', line 53 def update_task_job_status(task_job_status) task_job_statuses[task_job_status.task_name] ||= [] task_job_statuses[task_job_status.task_name][task_job_status.each_index] = task_job_status end |
#update_task_job_statuses_from_db(task_name) ⇒ Object
: (Symbol) -> void
73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/job_workflow/job_status.rb', line 73 def update_task_job_statuses_from_db(task_name) statuses = task_job_statuses.fetch(task_name, []).reject(&:finished?).index_by(&:job_id) return if statuses.empty? task_jobs = QueueAdapter.current.fetch_job_statuses(statuses.keys) statuses.each do |job_id, task_job_status| task_job = task_jobs[job_id] next unless task_job task_job_status.update_status(QueueAdapter.current.job_status(task_job)) update_task_job_status(task_job_status) end end |
#update_task_job_statuses_from_jobs(task_name:, jobs:) ⇒ Object
: (task_name: Symbol, jobs: Array) -> void
59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/job_workflow/job_status.rb', line 59 def update_task_job_statuses_from_jobs(task_name:, jobs:) jobs.each.with_index do |job, index| update_task_job_status( TaskJobStatus.new( task_name:, job_id: job.job_id, each_index: index, status: :pending ) ) end end |