Class: Pindo::TaskSystem::TaskManager

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/pindo/module/task/task_manager.rb

Overview

TaskManager - 任务管理器

职责:

  • 提供公共 API

  • 组合各个模块(队列、依赖检查、执行器、报告、资源锁)

  • 创建执行策略

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeTaskManager

Returns a new instance of TaskManager.



24
25
26
27
28
29
30
# File 'lib/pindo/module/task/task_manager.rb', line 24

def initialize
  @queue = TaskQueue.new                               # 队列管理
  @resource_lock_manager = ResourceLockManager.new     # 资源锁管理
  @dependency_checker = DependencyChecker.new(@queue, @resource_lock_manager)  # 依赖检查
  @reporter = TaskReporter.new(@queue)                 # 报告输出
  @output_manager = nil                                 # 输出管理器
end

Instance Attribute Details

#dependency_checkerObject (readonly)

Returns the value of attribute dependency_checker.



22
23
24
# File 'lib/pindo/module/task/task_manager.rb', line 22

def dependency_checker
  @dependency_checker
end

#queueObject (readonly)

Returns the value of attribute queue.



22
23
24
# File 'lib/pindo/module/task/task_manager.rb', line 22

def queue
  @queue
end

#reporterObject (readonly)

Returns the value of attribute reporter.



22
23
24
# File 'lib/pindo/module/task/task_manager.rb', line 22

def reporter
  @reporter
end

#resource_lock_managerObject (readonly)

Returns the value of attribute resource_lock_manager.



22
23
24
# File 'lib/pindo/module/task/task_manager.rb', line 22

def resource_lock_manager
  @resource_lock_manager
end

Instance Method Details

#add_task(task, options = {}) ⇒ String

添加任务

Parameters:

  • task (PindoTask)

    任务对象

  • options (Hash) (defaults to: {})

    选项

Returns:

  • (String)

    任务 ID

Raises:

  • (ArgumentError)


66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/pindo/module/task/task_manager.rb', line 66

def add_task(task, options = {})
  raise ArgumentError, "Task must be a PindoTask" unless task.is_a?(PindoTask)

  unless task.validate
    raise ArgumentError, "Task validation failed: #{task.name}"
  end

  if options[:wait_for]
    task.dependencies.concat(Array(options[:wait_for]))
  end

  @queue.add(task, sort_by_priority: true)
end

#add_tasks(tasks) ⇒ Array<String>

批量添加任务

Parameters:

Returns:

  • (Array<String>)

    任务 ID 数组



83
84
85
# File 'lib/pindo/module/task/task_manager.rb', line 83

def add_tasks(tasks)
  tasks.map { |task| add_task(task) }
end

#cancel_task(task_id) ⇒ Object

取消任务

Parameters:

  • task_id (String)

    任务 ID



169
170
171
172
# File 'lib/pindo/module/task/task_manager.rb', line 169

def cancel_task(task_id)
  task = find_task(task_id)
  task&.cancel
end

#clear_allObject

清空所有队列



155
156
157
# File 'lib/pindo/module/task/task_manager.rb', line 155

def clear_all
  @queue.clear_all
end

#disable_output_managementObject

禁用输出管理系统



57
58
59
60
# File 'lib/pindo/module/task/task_manager.rb', line 57

def disable_output_management
  @output_manager = nil
  @executor.output_manager = nil if @executor
end

#enable_output_management(options = {}) ⇒ Object

启用输出管理系统

Parameters:

  • options (Hash) (defaults to: {})

    配置选项



42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/pindo/module/task/task_manager.rb', line 42

def enable_output_management(options = {})
  # 如果已经有一个输出管理器,先关闭所有日志文件(避免文件句柄泄漏)
  @output_manager.close_all_logs if @output_manager
  
  @output_manager = MultiLineOutputManager.new(
    log_dir: options[:log_dir] || './pindo_logs',
    max_lines_per_task: options[:max_lines_per_task] || 5,
    max_recent_completed: options[:max_recent_completed] || 3,
    auto_adjust: options.fetch(:auto_adjust, true)
  )
  # 更新执行器的输出管理器(如果已初始化)
  @executor.output_manager = @output_manager if @executor
end

#execution_reportHash

获取执行报告

Returns:

  • (Hash)

    执行报告



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/pindo/module/task/task_manager.rb', line 132

def execution_report
  pending = @queue.pending_snapshot
  completed = @queue.completed_snapshot

  {
    pending: pending.count,
    completed: completed.count,
    success: completed.count { |t| t.status == TaskStatus::SUCCESS },
    failed: completed.count { |t| t.status == TaskStatus::FAILED },
    tasks: (pending + completed).map do |task|
      {
        id: task.id,
        name: task.name,
        type: task.type,
        status: task.status,
        error: task.error&.message,
        execution_time: task.execution_time
      }
    end
  }
end

#executorTaskExecutor

获取执行器(延迟初始化)

Returns:



36
37
38
# File 'lib/pindo/module/task/task_manager.rb', line 36

def executor
  @executor ||= TaskExecutor.new(@queue, @reporter, @resource_lock_manager, @output_manager)
end

#find_task(task_id) ⇒ PindoTask?

查找任务

Parameters:

  • task_id (String)

    任务 ID

Returns:



177
178
179
# File 'lib/pindo/module/task/task_manager.rb', line 177

def find_task(task_id)
  @queue.find(task_id)
end

#start(options = {}) ⇒ Object

开始执行任务

Parameters:

  • options (Hash) (defaults to: {})

    执行选项

Options Hash (options):

  • :mode (Symbol)

    执行模式 (:serial, :concurrent)

  • :concurrent (Boolean)

    快捷参数

  • :max_workers (Integer)

    最大工作线程数



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/pindo/module/task/task_manager.rb', line 92

def start(options = {})
  mode = parse_execution_mode(options)
  strategy = ExecutionStrategy.create(mode, options)

  # 并发模式必须启用输出管理器,否则多线程输出会混乱
  if mode == :concurrent && @output_manager.nil?
    enable_output_management(
      log_dir: options[:log_dir] || './pindo_logs',
      max_lines_per_task: options[:max_lines_per_task] || 5,
      max_recent_completed: options[:max_recent_completed] || 3
    )
  end

  # 如果配置了输出管理器,注册所有任务
  if @output_manager
    @queue.pending_snapshot.each do |task|
      @output_manager.register_task(task)
    end
  end

  # 输出任务执行计划
  @reporter.print_execution_plan(strategy)

  # 执行策略
  strategy.execute(self)

  # 输出执行摘要
  @reporter.print_execution_summary

  # 检查失败任务,有失败则抛出异常(确保进程返回非零退出码)
  report = execution_report
  if report[:failed] > 0
    failed_tasks = @queue.completed_snapshot.select { |t| t.status == TaskStatus::FAILED }
    failed_names = failed_tasks.map(&:name).join(', ')
    raise Informative, "#{report[:failed]} 个任务执行失败: #{failed_names}"
  end
end

#task_status(task_id) ⇒ Symbol?

获取任务状态

Parameters:

  • task_id (String)

    任务 ID

Returns:

  • (Symbol, nil)

    任务状态



162
163
164
165
# File 'lib/pindo/module/task/task_manager.rb', line 162

def task_status(task_id)
  task = find_task(task_id)
  task&.status
end