Class: JPSClient::UploadClient
- Inherits:
-
Object
- Object
- JPSClient::UploadClient
- Defined in:
- lib/jpsclient/upload/upload_client.rb
Instance Attribute Summary collapse
-
#file_size ⇒ Object
Returns the value of attribute file_size.
-
#jps_client ⇒ Object
readonly
Returns the value of attribute jps_client.
-
#progress_bar ⇒ Object
Returns the value of attribute progress_bar.
-
#upload_binary_file ⇒ Object
Returns the value of attribute upload_binary_file.
Instance Method Summary collapse
- #calculate_chunk_timeout(chunk_size) ⇒ Object
-
#cleanup_worker_threads ⇒ Object
清理所有工作线程.
- #continuous_upload_data_req(concurrency: 1) ⇒ Object
-
#convert_to_cloudflare_fetch_url(original_url) ⇒ Object
将 S3 预签名 URL 转换为 Cloudflare fetch 代理 URL 格式: fetch.goosoul.dev/filename?url=percent_encoded_original_url.
- #handle_retry(upload_params_item, error_reason) ⇒ Object
-
#initialize(jps_client) ⇒ UploadClient
constructor
A new instance of UploadClient.
- #process_upload_task(upload_params_item) ⇒ Object
-
#set_upload_failed(error_msg = nil) ⇒ Object
安全地设置上传失败状态.
-
#upload_failed? ⇒ Boolean
安全地检查上传失败状态.
- #upload_file(binary_file: nil, isAttach: false, enable_cloudflare: false) ⇒ Object
-
#worker_loop ⇒ Object
工作线程的主循环.
Constructor Details
#initialize(jps_client) ⇒ UploadClient
Returns a new instance of UploadClient.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/jpsclient/upload/upload_client.rb', line 23 def initialize(jps_client) raise ExceptionError, "必须提供 Client 实例" unless jps_client @jps_client = jps_client # 从 Client 获取所需资源 config_json = @jps_client.config_json # 加载上传配置 @upload_config = UploadConfig.from_json(config_json["upload_config"]) unless @upload_config && @upload_config.valid? raise ExceptionError, "上传配置无效或不完整" end # 添加互斥锁用于线程安全 @upload_eTags_mutex = Mutex.new @tasks_queue_mutex = Mutex.new @active_tasks_mutex = Mutex.new @upload_failed_mutex = Mutex.new @upload_failed = false end |
Instance Attribute Details
#file_size ⇒ Object
Returns the value of attribute file_size.
19 20 21 |
# File 'lib/jpsclient/upload/upload_client.rb', line 19 def file_size @file_size end |
#jps_client ⇒ Object (readonly)
Returns the value of attribute jps_client.
21 22 23 |
# File 'lib/jpsclient/upload/upload_client.rb', line 21 def jps_client @jps_client end |
#progress_bar ⇒ Object
Returns the value of attribute progress_bar.
20 21 22 |
# File 'lib/jpsclient/upload/upload_client.rb', line 20 def @progress_bar end |
#upload_binary_file ⇒ Object
Returns the value of attribute upload_binary_file.
18 19 20 |
# File 'lib/jpsclient/upload/upload_client.rb', line 18 def upload_binary_file @upload_binary_file end |
Instance Method Details
#calculate_chunk_timeout(chunk_size) ⇒ Object
404 405 406 407 408 |
# File 'lib/jpsclient/upload/upload_client.rb', line 404 def calculate_chunk_timeout(chunk_size) # 使用配置的 timeout_seconds 作为单个分片的超时时间 # 默认600秒(10分钟) @upload_config.timeout_seconds || 600 end |
#cleanup_worker_threads ⇒ Object
清理所有工作线程
186 187 188 189 190 191 192 193 194 |
# File 'lib/jpsclient/upload/upload_client.rb', line 186 def cleanup_worker_threads return unless @worker_threads # 防止未初始化 @worker_threads.each do |thread| # 尝试安全终止线程 thread.exit if thread.alive? end @worker_threads.clear end |
#continuous_upload_data_req(concurrency: 1) ⇒ Object
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/jpsclient/upload/upload_client.rb', line 196 def continuous_upload_data_req(concurrency:1) # 使用固定大小的线程池,避免线程无限增长 @worker_threads = [] @active_tasks = 0 @stop_workers = false @task_complete_cv = ConditionVariable.new # 创建固定数量的工作线程 concurrency.times do @worker_threads << Thread.new { worker_loop } end # 主线程等待所有任务完成 # 没有总体超时限制,会一直等待直到: # 1. 所有分片上传成功 # 2. 某个分片在重试后彻底失败 start_time = Time.now @tasks_queue_mutex.synchronize do while (@active_tasks > 0 || !@tasks_queue.empty?) && !upload_failed? # 等待任务完成通知,每30秒唤醒一次检查状态 @task_complete_cv.wait(@tasks_queue_mutex, 30) end end # 停止所有工作线程 @stop_workers = true @tasks_queue_mutex.synchronize { @task_complete_cv.broadcast } # 等待所有线程结束(最多等待5秒) @worker_threads.each do |t| t.join(5) t.kill if t.alive? # 如果仍然存活,强制终止 end # 只在非失败情况下检查分片完整性 if !upload_failed? && @upload_eTags.length != @expected_parts set_upload_failed("部分分片上传失败,已上传#{@upload_eTags.length}/#{@expected_parts}") end end |
#convert_to_cloudflare_fetch_url(original_url) ⇒ Object
将 S3 预签名 URL 转换为 Cloudflare fetch 代理 URL 格式: fetch.goosoul.dev/filename?url=percent_encoded_original_url
428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 |
# File 'lib/jpsclient/upload/upload_client.rb', line 428 def convert_to_cloudflare_fetch_url(original_url) return original_url if original_url.nil? || original_url.empty? begin uri = URI.parse(original_url) # 从路径中提取文件名 path_parts = uri.path.split('/') filename = path_parts.last || 'file' # 对原始完整 URL 进行 percent 编码 encoded_url = CGI.escape(original_url) # 构建 Cloudflare fetch URL "https://fetch.goosoul.dev/#{filename}?url=#{encoded_url}" rescue => e # 解析失败时返回原始 URL Logger.instance.info("URL 转换失败: #{e.},使用原始 URL") original_url end end |
#handle_retry(upload_params_item, error_reason) ⇒ Object
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 |
# File 'lib/jpsclient/upload/upload_client.rb', line 410 def handle_retry(upload_params_item, error_reason) part_no = upload_params_item["partNo"] @progress_bar.delete_upload_index(upload_part:part_no) upload_params_item["retryCount"] = upload_params_item["retryCount"] - 1 if upload_params_item["retryCount"] > 0 # 简单重试,直接放回队列(避免创建额外线程) # Logger.instance.info("分片 ##{part_no} 准备重试,剩余重试次数: #{upload_params_item["retryCount"]}") # 避免打断进度条 # 直接放回队列末尾,让其他分片先执行 @tasks_queue_mutex.synchronize { @tasks_queue.push(upload_params_item) } else set_upload_failed("文件#{@upload_binary_file} 分片#{part_no}上传失败: #{error_reason},已达最大重试次数") end end |
#process_upload_task(upload_params_item) ⇒ Object
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 |
# File 'lib/jpsclient/upload/upload_client.rb', line 275 def process_upload_task(upload_params_item) part_no = upload_params_item["partNo"] s3_key = upload_params_item["s3Key"] upload_id = upload_params_item["uploadId"] retry_count = upload_params_item["retryCount"] # 记录重试信息(仅在重试时记录) # 注释掉以避免打断进度条 # current_attempt = @upload_config.max_retry_times - retry_count + 1 # if current_attempt > 1 # Logger.instance.info("上传分片 ##{part_no},第 #{current_attempt} 次尝试(共 #{@upload_config.max_retry_times + 1} 次机会)") # end # 步骤2: 获取分片的预签名URL sign_result = @jps_client.get_file_sign_url( s3_key: s3_key, upload_id: upload_id, part_id: part_no, upload_config: @upload_config ) if sign_result.nil? || !sign_result.dig("data", "url") # Logger.instance.info("分片 ##{part_no} 获取上传URL失败") # 避免打断进度条 handle_retry(upload_params_item, "获取上传URL失败") return end original_url = sign_result["data"]["url"] upload_url = original_url # 如果启用 Cloudflare 加速,转换 URL if @enable_cloudflare upload_url = convert_to_cloudflare_fetch_url(original_url) end # Debug 模式下打印加速前后 URL 对比 if ENV['PINDO_DEBUG'] == '1' puts "[PINDO_DEBUG] 分片 ##{part_no} 原始 URL: #{original_url}" if @enable_cloudflare puts "[PINDO_DEBUG] 分片 ##{part_no} 加速 URL: #{upload_url}" end end # 计算分片数据范围(使用配置的分片大小) chunk_size = @upload_config.chunk_size_bytes start_position = chunk_size * (part_no - 1) if part_no * chunk_size > @file_size read_length = @file_size - start_position else read_length = chunk_size end # 读取文件数据(每个线程独立打开文件,无需互斥锁) put_data = nil begin file = File.open(@upload_binary_file, "rb") begin file.seek(start_position) put_data = file.read(read_length) ensure file.close if file end rescue => e # Logger.instance.info("分片 ##{part_no} 读取文件失败: #{e.message}") # 避免打断进度条 handle_retry(upload_params_item, "文件读取失败: #{e.}") return end # 创建上传请求(直接使用预签名URL) # 对于单个分片,使用更合理的超时时间(根据分片大小动态调整) # 5MB 分片在慢速网络(100KB/s)下需要约50秒,所以基础超时应该更长 chunk_timeout = calculate_chunk_timeout(read_length) request = Typhoeus::Request.new( upload_url, method: :put, body: put_data, headers: { 'Content-Type' => 'application/octet-stream', 'Content-Length' => read_length.to_s }, timeout: chunk_timeout, connecttimeout: 30 # 连接超时30秒 ) # 设置上传进度回调 upload_size_last = 0 last_progress_time = Time.now request.on_progress do |dltotal, dlnow, ultotal, ulnow| if ulnow && ulnow > upload_size_last upload_size_last = ulnow # 限制进度更新频率,避免过于频繁的更新 if Time.now - last_progress_time > 0.5 # 每0.5秒更新一次 @progress_bar.update_upload_index(upload_part:part_no, upload_size:ulnow) @progress_bar.update_upload_progress() last_progress_time = Time.now end end end # 执行请求并等待完成 response = request.run # 处理响应结果 if response.success? @progress_bar.complete_upload_index(upload_part:part_no, complete_size:read_length) # 只有在未完成时才更新进度条,避免100%时重复显示 unless @progress_bar.is_done @progress_bar.update_upload_progress() end # 新API不再需要收集ETag,只记录成功的分片号 @upload_eTags_mutex.synchronize { @upload_eTags << part_no } # Logger.instance.info("分片 ##{part_no} 上传成功") # 避免打断进度条 elsif response.timed_out? # 超时处理 # Logger.instance.info("分片 ##{part_no} 上传超时,响应时间: #{response.time}秒") # 避免打断进度条 handle_retry(upload_params_item, "超时") elsif response.code == 0 # 网络错误 error_msg = response. || "未知网络错误" # Logger.instance.info("分片 ##{part_no} 网络错误: #{error_msg}") # 避免打断进度条 handle_retry(upload_params_item, error_msg) else # HTTP错误 # Logger.instance.info("分片 ##{part_no} HTTP错误: #{response.code}, Body: #{response.body[0..200] if response.body}") # 避免打断进度条 handle_retry(upload_params_item, "HTTP #{response.code}") end end |
#set_upload_failed(error_msg = nil) ⇒ Object
安全地设置上传失败状态
178 179 180 181 182 183 |
# File 'lib/jpsclient/upload/upload_client.rb', line 178 def set_upload_failed(error_msg = nil) @upload_failed_mutex.synchronize do @upload_failed = true Logger.instance.("上传失败: #{error_msg}") if error_msg end end |
#upload_failed? ⇒ Boolean
安全地检查上传失败状态
173 174 175 |
# File 'lib/jpsclient/upload/upload_client.rb', line 173 def upload_failed? @upload_failed_mutex.synchronize { @upload_failed } end |
#upload_file(binary_file: nil, isAttach: false, enable_cloudflare: false) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/jpsclient/upload/upload_client.rb', line 44 def upload_file(binary_file:nil, isAttach:false, enable_cloudflare:false) raise ExceptionError, "上传文件不能为空" if binary_file.nil? || !File.exist?(binary_file) @upload_binary_file = binary_file @file_size = File.size(@upload_binary_file) @enable_cloudflare = enable_cloudflare # 处理空文件 if @file_size == 0 raise ExceptionError, "不能上传空文件: #{binary_file}" end @progress_bar = UploadProgress.new(upload_total_size:@file_size) @upload_failed = false # 重置上传失败标志 # 生成S3 Key (保持UUID方式) file_uuid = SecureRandom.uuid extension = File.extname(@upload_binary_file) filename = File.basename(@upload_binary_file) # 使用配置中的路径前缀 path_prefix = isAttach ? @upload_config.attach_url : @upload_config.default_url s3_key = "#{path_prefix}#{file_uuid}#{extension}" # 计算文件大小和分片信息 total_mb = sprintf("%.2f", @file_size / 1024.0 / 1024.0) chunk_size = @upload_config.chunk_size_bytes chunks = (@file_size.to_f / chunk_size).ceil # 使用配置的并发工作线程数和重试次数 task_num = @upload_config.concurrent_workers retry_count = @upload_config.max_retry_times puts "文件路径: #{@upload_binary_file}" puts "文件大小: #{total_mb}M" puts "上传路径: #{s3_key}" puts puts "切片大小: #{@upload_config.chunk_size_mb}MB" upload_result = nil begin # 步骤1: 初始化多部分上传 Logger.instance.("初始化上传...") init_result = @jps_client.init_file_multipart( s3_key: s3_key, content_type: "", upload_config: @upload_config ) # Debug 模式下打印初始化结果 if ENV['PINDO_DEBUG'] == '1' puts "[PINDO_DEBUG] init_file_multipart 返回: #{init_result.inspect}" end if init_result.nil? || !init_result.dig("data", "upload_id") raise ExceptionError, "初始化上传失败,请检查网络或服务器状态" end upload_id = init_result["data"]["upload_id"] # 准备分片任务队列 @tasks_queue = Queue.new @worker_threads = [] @expected_parts = chunks # 保存预期的分片数量 # 创建分片任务 chunks.times do |i| task_item = { "partNo" => i + 1, "s3Key" => s3_key, "uploadId" => upload_id, "retryCount" => retry_count } @tasks_queue.push(task_item) end puts "文件分片数: #{chunks}" puts "并发上传线程数: #{task_num} (CPU核心数: #{Etc.nprocessors})" puts "失败重试次数: #{retry_count}" puts Logger.instance.("开始上传...") @upload_eTags = [] @active_tasks = 0 # 跟踪活动任务数量 continuous_upload_data_req(concurrency: task_num) # 检查上传是否全部成功 if upload_failed? || @upload_eTags.length != @expected_parts upload_result = nil Logger.instance.("文件#{@upload_binary_file} 上传失败! 😭😭😭") return upload_result end # 进度条已经在上传过程中自动更新到100%,不需要再手动设置 # 步骤3: 完成多部分上传 Logger.instance.("完成上传...") complete_result = @jps_client.complete_file_multipart( s3_key: s3_key, upload_id: upload_id, upload_config: @upload_config ) if complete_result && (complete_result["code"] == 0 || complete_result["code"] == 200) # 返回 URL 字符串以保持向后兼容 upload_result = complete_result.dig("data", "url") || s3_key Logger.instance.("文件#{@upload_binary_file} 上传成功! 😎😎😎") else upload_result = nil error_msg = complete_result ? (complete_result["msg"] || complete_result["message"] || "未知错误") : "未知错误" Logger.instance.("文件#{@upload_binary_file} 上传失败: #{error_msg} 😭😭😭") end rescue => e upload_result = nil Logger.instance.("文件上传过程发生异常: #{e.} 😭😭😭") ensure # 确保所有工作线程都被清理 cleanup_worker_threads end return upload_result end |
#worker_loop ⇒ Object
工作线程的主循环
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 |
# File 'lib/jpsclient/upload/upload_client.rb', line 238 def worker_loop loop do upload_params_item = nil # 从队列获取任务 @tasks_queue_mutex.synchronize do return if @stop_workers || (upload_failed? && @tasks_queue.empty?) if @tasks_queue.empty? # 队列为空,等待新任务 @task_complete_cv.wait(@tasks_queue_mutex, 1) next end upload_params_item = @tasks_queue.pop end # 在锁外增加活跃任务计数,避免嵌套锁 @active_tasks_mutex.synchronize { @active_tasks += 1 } if upload_params_item # 处理任务 if upload_params_item begin process_upload_task(upload_params_item) rescue => e set_upload_failed("处理分片#{upload_params_item["partNo"]}时出错: #{e.}") ensure @active_tasks_mutex.synchronize { @active_tasks -= 1 } @tasks_queue_mutex.synchronize { @task_complete_cv.broadcast } end end # 检查是否应该退出 break if upload_failed? end end |