Class: Pindo::TaskSystem::TaskManager

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/pindo/module/task/task_manager.rb

Overview

TaskManager - 任务管理器

职责:

  • 提供公共 API
  • 组合各个模块(队列、依赖检查、执行器、报告、资源锁)
  • 创建执行策略

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeTaskManager

Returns a new instance of TaskManager.



24
25
26
27
28
29
30
# File 'lib/pindo/module/task/task_manager.rb', line 24

def initialize
  @queue = TaskQueue.new                               # 队列管理
  @resource_lock_manager = ResourceLockManager.new     # 资源锁管理
  @dependency_checker = DependencyChecker.new(@queue, @resource_lock_manager)  # 依赖检查
  @reporter = TaskReporter.new(@queue)                 # 报告输出
  @output_manager = nil                                 # 输出管理器
end

Instance Attribute Details

#dependency_checkerObject (readonly)

Returns the value of attribute dependency_checker.



22
23
24
# File 'lib/pindo/module/task/task_manager.rb', line 22

def dependency_checker
  @dependency_checker
end

#queueObject (readonly)

Returns the value of attribute queue.



22
23
24
# File 'lib/pindo/module/task/task_manager.rb', line 22

def queue
  @queue
end

#reporterObject (readonly)

Returns the value of attribute reporter.



22
23
24
# File 'lib/pindo/module/task/task_manager.rb', line 22

def reporter
  @reporter
end

#resource_lock_managerObject (readonly)

Returns the value of attribute resource_lock_manager.



22
23
24
# File 'lib/pindo/module/task/task_manager.rb', line 22

def resource_lock_manager
  @resource_lock_manager
end

Instance Method Details

#add_task(task, options = {}) ⇒ String

添加任务

Parameters:

  • task (PindoTask)

    任务对象

  • options (Hash) (defaults to: {})

    选项

Returns:

  • (String)

    任务 ID

Raises:

  • (ArgumentError)


66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/pindo/module/task/task_manager.rb', line 66

def add_task(task, options = {})
  raise ArgumentError, "Task must be a PindoTask" unless task.is_a?(PindoTask)

  unless task.validate
    raise ArgumentError, "Task validation failed: #{task.name}"
  end

  if options[:wait_for]
    task.dependencies.concat(Array(options[:wait_for]))
  end

  @queue.add(task, sort_by_priority: true)
end

#add_tasks(tasks) ⇒ Array<String>

批量添加任务

Parameters:

Returns:

  • (Array<String>)

    任务 ID 数组



83
84
85
# File 'lib/pindo/module/task/task_manager.rb', line 83

def add_tasks(tasks)
  tasks.map { |task| add_task(task) }
end

#cancel_task(task_id) ⇒ Object

取消任务

Parameters:

  • task_id (String)

    任务 ID



177
178
179
180
# File 'lib/pindo/module/task/task_manager.rb', line 177

def cancel_task(task_id)
  task = find_task(task_id)
  task&.cancel
end

#clear_allObject

清空所有队列



163
164
165
# File 'lib/pindo/module/task/task_manager.rb', line 163

def clear_all
  @queue.clear_all
end

#disable_output_managementObject

禁用输出管理系统



57
58
59
60
# File 'lib/pindo/module/task/task_manager.rb', line 57

def disable_output_management
  @output_manager = nil
  @executor.output_manager = nil if @executor
end

#enable_output_management(options = {}) ⇒ Object

启用输出管理系统

Parameters:

  • options (Hash) (defaults to: {})

    配置选项



42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/pindo/module/task/task_manager.rb', line 42

def enable_output_management(options = {})
  # 如果已经有一个输出管理器,先关闭所有日志文件(避免文件句柄泄漏)
  @output_manager.close_all_logs if @output_manager
  
  @output_manager = MultiLineOutputManager.new(
    log_dir: options[:log_dir] || './pindo_logs',
    max_lines_per_task: options[:max_lines_per_task] || 5,
    max_recent_completed: options[:max_recent_completed] || 3,
    auto_adjust: options.fetch(:auto_adjust, true)
  )
  # 更新执行器的输出管理器(如果已初始化)
  @executor.output_manager = @output_manager if @executor
end

#execution_reportHash

获取执行报告

Returns:

  • (Hash)

    执行报告



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/pindo/module/task/task_manager.rb', line 139

def execution_report
  pending = @queue.pending_snapshot
  completed = @queue.completed_snapshot

  {
    pending: pending.count,
    completed: completed.count,
    success: completed.count { |t| t.status == TaskStatus::SUCCESS },
    failed: completed.count { |t| t.status == TaskStatus::FAILED },
    cancelled: completed.count { |t| t.status == TaskStatus::CANCELLED },
    tasks: (pending + completed).map do |task|
      {
        id: task.id,
        name: task.name,
        type: task.type,
        status: task.status,
        error: task.error&.message,
        execution_time: task.execution_time
      }
    end
  }
end

#executorTaskExecutor

获取执行器(延迟初始化)

Returns:



36
37
38
# File 'lib/pindo/module/task/task_manager.rb', line 36

def executor
  @executor ||= TaskExecutor.new(@queue, @reporter, @resource_lock_manager, @output_manager)
end

#find_task(task_id) ⇒ PindoTask?

查找任务

Parameters:

  • task_id (String)

    任务 ID

Returns:



185
186
187
# File 'lib/pindo/module/task/task_manager.rb', line 185

def find_task(task_id)
  @queue.find(task_id)
end

#start(options = {}) ⇒ Object

开始执行任务

Parameters:

  • options (Hash) (defaults to: {})

    执行选项

Options Hash (options):

  • :mode (Symbol)

    执行模式 (:serial, :concurrent)

  • :concurrent (Boolean)

    快捷参数

  • :max_workers (Integer)

    最大工作线程数



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/pindo/module/task/task_manager.rb', line 92

def start(options = {})
  mode = parse_execution_mode(options)
  strategy = ExecutionStrategy.create(mode, options)

  # 并发模式必须启用输出管理器,否则多线程输出会混乱
  if mode == :concurrent && @output_manager.nil?
    enable_output_management(
      log_dir: options[:log_dir] || './pindo_logs',
      max_lines_per_task: options[:max_lines_per_task] || 5,
      max_recent_completed: options[:max_recent_completed] || 3
    )
  end

  # 如果配置了输出管理器,注册所有任务
  if @output_manager
    @queue.pending_snapshot.each do |task|
      @output_manager.register_task(task)
    end
  end

  # 输出任务执行计划
  @reporter.print_execution_plan(strategy)

  # 执行策略
  strategy.execute(self)

  # 输出执行摘要
  @reporter.print_execution_summary

  # 失败闸门:FAILED / CANCELLED / 残留 PENDING(依赖阻塞等导致从未执行)都视为
  # 「未成功完成」,统一抛异常确保进程非零退出码,避免任务没跑成却静默成功
  completed = @queue.completed_snapshot
  failed_tasks = completed.select { |t| t.status == TaskStatus::FAILED }
  cancelled_tasks = completed.select { |t| t.status == TaskStatus::CANCELLED }
  unfinished_tasks = @queue.pending_snapshot

  if failed_tasks.any? || cancelled_tasks.any? || unfinished_tasks.any?
    parts = []
    parts << "失败: #{failed_tasks.map(&:name).join(', ')}" if failed_tasks.any?
    parts << "取消: #{cancelled_tasks.map(&:name).join(', ')}" if cancelled_tasks.any?
    parts << "未执行: #{unfinished_tasks.map(&:name).join(', ')}" if unfinished_tasks.any?
    raise Informative, "存在未成功完成的任务 —— #{parts.join('')}"
  end
end

#task_status(task_id) ⇒ Symbol?

获取任务状态

Parameters:

  • task_id (String)

    任务 ID

Returns:

  • (Symbol, nil)

    任务状态



170
171
172
173
# File 'lib/pindo/module/task/task_manager.rb', line 170

def task_status(task_id)
  task = find_task(task_id)
  task&.status
end