Class: Pindo::TaskSystem::TaskManager
- Inherits:
-
Object
- Object
- Pindo::TaskSystem::TaskManager
- Includes:
- Singleton
- Defined in:
- lib/pindo/module/task/task_manager.rb
Overview
TaskManager - 任务管理器
职责:
-
提供公共 API
-
组合各个模块(队列、依赖检查、执行器、报告、资源锁)
-
创建执行策略
Instance Attribute Summary collapse
-
#dependency_checker ⇒ Object
readonly
Returns the value of attribute dependency_checker.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#reporter ⇒ Object
readonly
Returns the value of attribute reporter.
-
#resource_lock_manager ⇒ Object
readonly
Returns the value of attribute resource_lock_manager.
Instance Method Summary collapse
-
#add_task(task, options = {}) ⇒ String
添加任务.
-
#add_tasks(tasks) ⇒ Array<String>
批量添加任务.
-
#cancel_task(task_id) ⇒ Object
取消任务.
-
#clear_all ⇒ Object
清空所有队列.
-
#disable_output_management ⇒ Object
禁用输出管理系统.
-
#enable_output_management(options = {}) ⇒ Object
启用输出管理系统.
-
#execution_report ⇒ Hash
获取执行报告.
-
#executor ⇒ TaskExecutor
获取执行器(延迟初始化).
-
#find_task(task_id) ⇒ PindoTask?
查找任务.
-
#initialize ⇒ TaskManager
constructor
A new instance of TaskManager.
-
#start(options = {}) ⇒ Object
开始执行任务.
-
#task_status(task_id) ⇒ Symbol?
获取任务状态.
Constructor Details
#initialize ⇒ TaskManager
Returns a new instance of TaskManager.
24 25 26 27 28 29 30 |
# File 'lib/pindo/module/task/task_manager.rb', line 24 def initialize @queue = TaskQueue.new # 队列管理 @resource_lock_manager = ResourceLockManager.new # 资源锁管理 @dependency_checker = DependencyChecker.new(@queue, @resource_lock_manager) # 依赖检查 @reporter = TaskReporter.new(@queue) # 报告输出 @output_manager = nil # 输出管理器 end |
Instance Attribute Details
#dependency_checker ⇒ Object (readonly)
Returns the value of attribute dependency_checker.
22 23 24 |
# File 'lib/pindo/module/task/task_manager.rb', line 22 def dependency_checker @dependency_checker end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
22 23 24 |
# File 'lib/pindo/module/task/task_manager.rb', line 22 def queue @queue end |
#reporter ⇒ Object (readonly)
Returns the value of attribute reporter.
22 23 24 |
# File 'lib/pindo/module/task/task_manager.rb', line 22 def reporter @reporter end |
#resource_lock_manager ⇒ Object (readonly)
Returns the value of attribute resource_lock_manager.
22 23 24 |
# File 'lib/pindo/module/task/task_manager.rb', line 22 def resource_lock_manager @resource_lock_manager end |
Instance Method Details
#add_task(task, options = {}) ⇒ String
添加任务
66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/pindo/module/task/task_manager.rb', line 66 def add_task(task, = {}) raise ArgumentError, "Task must be a PindoTask" unless task.is_a?(PindoTask) unless task.validate raise ArgumentError, "Task validation failed: #{task.name}" end if [:wait_for] task.dependencies.concat(Array([:wait_for])) end @queue.add(task, sort_by_priority: true) end |
#add_tasks(tasks) ⇒ Array<String>
批量添加任务
83 84 85 |
# File 'lib/pindo/module/task/task_manager.rb', line 83 def add_tasks(tasks) tasks.map { |task| add_task(task) } end |
#cancel_task(task_id) ⇒ Object
取消任务
169 170 171 172 |
# File 'lib/pindo/module/task/task_manager.rb', line 169 def cancel_task(task_id) task = find_task(task_id) task&.cancel end |
#clear_all ⇒ Object
清空所有队列
155 156 157 |
# File 'lib/pindo/module/task/task_manager.rb', line 155 def clear_all @queue.clear_all end |
#disable_output_management ⇒ Object
禁用输出管理系统
57 58 59 60 |
# File 'lib/pindo/module/task/task_manager.rb', line 57 def disable_output_management @output_manager = nil @executor.output_manager = nil if @executor end |
#enable_output_management(options = {}) ⇒ Object
启用输出管理系统
42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/pindo/module/task/task_manager.rb', line 42 def enable_output_management( = {}) # 如果已经有一个输出管理器,先关闭所有日志文件(避免文件句柄泄漏) @output_manager.close_all_logs if @output_manager @output_manager = MultiLineOutputManager.new( log_dir: [:log_dir] || './pindo_logs', max_lines_per_task: [:max_lines_per_task] || 5, max_recent_completed: [:max_recent_completed] || 3, auto_adjust: .fetch(:auto_adjust, true) ) # 更新执行器的输出管理器(如果已初始化) @executor.output_manager = @output_manager if @executor end |
#execution_report ⇒ Hash
获取执行报告
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/pindo/module/task/task_manager.rb', line 132 def execution_report pending = @queue.pending_snapshot completed = @queue.completed_snapshot { pending: pending.count, completed: completed.count, success: completed.count { |t| t.status == TaskStatus::SUCCESS }, failed: completed.count { |t| t.status == TaskStatus::FAILED }, tasks: (pending + completed).map do |task| { id: task.id, name: task.name, type: task.type, status: task.status, error: task.error&., execution_time: task.execution_time } end } end |
#executor ⇒ TaskExecutor
获取执行器(延迟初始化)
36 37 38 |
# File 'lib/pindo/module/task/task_manager.rb', line 36 def executor @executor ||= TaskExecutor.new(@queue, @reporter, @resource_lock_manager, @output_manager) end |
#find_task(task_id) ⇒ PindoTask?
查找任务
177 178 179 |
# File 'lib/pindo/module/task/task_manager.rb', line 177 def find_task(task_id) @queue.find(task_id) end |
#start(options = {}) ⇒ Object
开始执行任务
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/pindo/module/task/task_manager.rb', line 92 def start( = {}) mode = parse_execution_mode() strategy = ExecutionStrategy.create(mode, ) # 并发模式必须启用输出管理器,否则多线程输出会混乱 if mode == :concurrent && @output_manager.nil? enable_output_management( log_dir: [:log_dir] || './pindo_logs', max_lines_per_task: [:max_lines_per_task] || 5, max_recent_completed: [:max_recent_completed] || 3 ) end # 如果配置了输出管理器,注册所有任务 if @output_manager @queue.pending_snapshot.each do |task| @output_manager.register_task(task) end end # 输出任务执行计划 @reporter.print_execution_plan(strategy) # 执行策略 strategy.execute(self) # 输出执行摘要 @reporter.print_execution_summary # 检查失败任务,有失败则抛出异常(确保进程返回非零退出码) report = execution_report if report[:failed] > 0 failed_tasks = @queue.completed_snapshot.select { |t| t.status == TaskStatus::FAILED } failed_names = failed_tasks.map(&:name).join(', ') raise Informative, "#{report[:failed]} 个任务执行失败: #{failed_names}" end end |
#task_status(task_id) ⇒ Symbol?
获取任务状态
162 163 164 165 |
# File 'lib/pindo/module/task/task_manager.rb', line 162 def task_status(task_id) task = find_task(task_id) task&.status end |