Class: JPSClient::UploadClient

Inherits:
Object
  • Object
show all
Defined in:
lib/jpsclient/upload/upload_client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(jps_client) ⇒ UploadClient

Returns a new instance of UploadClient.

Raises:



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/jpsclient/upload/upload_client.rb', line 23

def initialize(jps_client)
    raise ExceptionError, "必须提供 Client 实例" unless jps_client

    @jps_client = jps_client

    # 从 Client 获取所需资源
    config_json = @jps_client.config_json

    # 加载上传配置
    @upload_config = UploadConfig.from_json(config_json["upload_config"])
    unless @upload_config && @upload_config.valid?
      raise ExceptionError, "上传配置无效或不完整"
    end
    # 添加互斥锁用于线程安全
    @upload_eTags_mutex = Mutex.new
    @tasks_queue_mutex = Mutex.new
    @active_tasks_mutex = Mutex.new
    @upload_failed_mutex = Mutex.new
    @upload_failed = false
end

Instance Attribute Details

#file_sizeObject

Returns the value of attribute file_size.



19
20
21
# File 'lib/jpsclient/upload/upload_client.rb', line 19

def file_size
  @file_size
end

#jps_clientObject (readonly)

Returns the value of attribute jps_client.



21
22
23
# File 'lib/jpsclient/upload/upload_client.rb', line 21

def jps_client
  @jps_client
end

#progress_barObject

Returns the value of attribute progress_bar.



20
21
22
# File 'lib/jpsclient/upload/upload_client.rb', line 20

def progress_bar
  @progress_bar
end

#upload_binary_fileObject

Returns the value of attribute upload_binary_file.



18
19
20
# File 'lib/jpsclient/upload/upload_client.rb', line 18

def upload_binary_file
  @upload_binary_file
end

Instance Method Details

#calculate_chunk_timeout(chunk_size) ⇒ Object



404
405
406
407
408
# File 'lib/jpsclient/upload/upload_client.rb', line 404

def calculate_chunk_timeout(chunk_size)
    # 使用配置的 timeout_seconds 作为单个分片的超时时间
    # 默认600秒(10分钟)
    @upload_config.timeout_seconds || 600
end

#cleanup_worker_threadsObject

清理所有工作线程



186
187
188
189
190
191
192
193
194
# File 'lib/jpsclient/upload/upload_client.rb', line 186

def cleanup_worker_threads
  return unless @worker_threads  # 防止未初始化

  @worker_threads.each do |thread|
    # 尝试安全终止线程
    thread.exit if thread.alive?
  end
  @worker_threads.clear
end

#continuous_upload_data_req(concurrency: 1) ⇒ Object



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/jpsclient/upload/upload_client.rb', line 196

def continuous_upload_data_req(concurrency:1)
    # 使用固定大小的线程池,避免线程无限增长
    @worker_threads = []
    @active_tasks = 0
    @stop_workers = false
    @task_complete_cv = ConditionVariable.new

    # 创建固定数量的工作线程
    concurrency.times do
        @worker_threads << Thread.new { worker_loop }
    end

    # 主线程等待所有任务完成
    # 没有总体超时限制,会一直等待直到:
    # 1. 所有分片上传成功
    # 2. 某个分片在重试后彻底失败
    start_time = Time.now

    @tasks_queue_mutex.synchronize do
        while (@active_tasks > 0 || !@tasks_queue.empty?) && !upload_failed?
            # 等待任务完成通知,每30秒唤醒一次检查状态
            @task_complete_cv.wait(@tasks_queue_mutex, 30)
        end
    end

    # 停止所有工作线程
    @stop_workers = true
    @tasks_queue_mutex.synchronize { @task_complete_cv.broadcast }

    # 等待所有线程结束(最多等待5秒)
    @worker_threads.each do |t|
        t.join(5)
        t.kill if t.alive?  # 如果仍然存活,强制终止
    end

    # 只在非失败情况下检查分片完整性
    if !upload_failed? && @upload_eTags.length != @expected_parts
        set_upload_failed("部分分片上传失败,已上传#{@upload_eTags.length}/#{@expected_parts}")
    end
end

#convert_to_cloudflare_fetch_url(original_url) ⇒ Object

将 S3 预签名 URL 转换为 Cloudflare fetch 代理 URL 格式: fetch.goosoul.dev/filename?url=percent_encoded_original_url



428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
# File 'lib/jpsclient/upload/upload_client.rb', line 428

def convert_to_cloudflare_fetch_url(original_url)
  return original_url if original_url.nil? || original_url.empty?

  begin
    uri = URI.parse(original_url)

    # 从路径中提取文件名
    path_parts = uri.path.split('/')
    filename = path_parts.last || 'file'

    # 对原始完整 URL 进行 percent 编码
    encoded_url = CGI.escape(original_url)

    # 构建 Cloudflare fetch URL
    "https://fetch.goosoul.dev/#{filename}?url=#{encoded_url}"
  rescue => e
    # 解析失败时返回原始 URL
    Logger.instance.info("URL 转换失败: #{e.message},使用原始 URL")
    original_url
  end
end

#handle_retry(upload_params_item, error_reason) ⇒ Object



410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
# File 'lib/jpsclient/upload/upload_client.rb', line 410

def handle_retry(upload_params_item, error_reason)
    part_no = upload_params_item["partNo"]
    @progress_bar.delete_upload_index(upload_part:part_no)

    upload_params_item["retryCount"] = upload_params_item["retryCount"] - 1
    if upload_params_item["retryCount"] > 0
        # 简单重试,直接放回队列(避免创建额外线程)
        # Logger.instance.info("分片 ##{part_no} 准备重试,剩余重试次数: #{upload_params_item["retryCount"]}")  # 避免打断进度条

        # 直接放回队列末尾,让其他分片先执行
        @tasks_queue_mutex.synchronize { @tasks_queue.push(upload_params_item) }
    else
        set_upload_failed("文件#{@upload_binary_file} 分片#{part_no}上传失败: #{error_reason},已达最大重试次数")
    end
end

#process_upload_task(upload_params_item) ⇒ Object



275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
# File 'lib/jpsclient/upload/upload_client.rb', line 275

def process_upload_task(upload_params_item)
    part_no = upload_params_item["partNo"]
    s3_key = upload_params_item["s3Key"]
    upload_id = upload_params_item["uploadId"]
    retry_count = upload_params_item["retryCount"]

    # 记录重试信息(仅在重试时记录)
    # 注释掉以避免打断进度条
    # current_attempt = @upload_config.max_retry_times - retry_count + 1
    # if current_attempt > 1
    #     Logger.instance.info("上传分片 ##{part_no},第 #{current_attempt} 次尝试(共 #{@upload_config.max_retry_times + 1} 次机会)")
    # end

    # 步骤2: 获取分片的预签名URL
    sign_result = @jps_client.get_file_sign_url(
        s3_key: s3_key,
        upload_id: upload_id,
        part_id: part_no,
        upload_config: @upload_config
    )

    if sign_result.nil? || !sign_result.dig("data", "url")
        # Logger.instance.info("分片 ##{part_no} 获取上传URL失败")  # 避免打断进度条
        handle_retry(upload_params_item, "获取上传URL失败")
        return
    end

    original_url = sign_result["data"]["url"]
    upload_url = original_url

    # 如果启用 Cloudflare 加速,转换 URL
    if @enable_cloudflare
      upload_url = convert_to_cloudflare_fetch_url(original_url)
    end

    # Debug 模式下打印加速前后 URL 对比
    if ENV['PINDO_DEBUG'] == '1'
      puts "[PINDO_DEBUG] 分片 ##{part_no} 原始 URL: #{original_url}"
      if @enable_cloudflare
        puts "[PINDO_DEBUG] 分片 ##{part_no} 加速 URL: #{upload_url}"
      end
    end

    # 计算分片数据范围(使用配置的分片大小)
    chunk_size = @upload_config.chunk_size_bytes
    start_position = chunk_size * (part_no - 1)
    if part_no * chunk_size > @file_size
        read_length = @file_size - start_position
    else
        read_length = chunk_size
    end

    # 读取文件数据(每个线程独立打开文件,无需互斥锁)
    put_data = nil
    begin
        file = File.open(@upload_binary_file, "rb")
        begin
            file.seek(start_position)
            put_data = file.read(read_length)
        ensure
            file.close if file
        end
    rescue => e
        # Logger.instance.info("分片 ##{part_no} 读取文件失败: #{e.message}")  # 避免打断进度条
        handle_retry(upload_params_item, "文件读取失败: #{e.message}")
        return
    end

    # 创建上传请求(直接使用预签名URL)
    # 对于单个分片,使用更合理的超时时间(根据分片大小动态调整)
    # 5MB 分片在慢速网络(100KB/s)下需要约50秒,所以基础超时应该更长
    chunk_timeout = calculate_chunk_timeout(read_length)

    request = Typhoeus::Request.new(
        upload_url,
        method: :put,
        body: put_data,
        headers: {
            'Content-Type' => 'application/octet-stream',
            'Content-Length' => read_length.to_s
        },
        timeout: chunk_timeout,
        connecttimeout: 30  # 连接超时30秒
    )

    # 设置上传进度回调
    upload_size_last = 0
    last_progress_time = Time.now
    request.on_progress do |dltotal, dlnow, ultotal, ulnow|
        if ulnow && ulnow > upload_size_last
            upload_size_last = ulnow
            # 限制进度更新频率,避免过于频繁的更新
            if Time.now - last_progress_time > 0.5  # 每0.5秒更新一次
                @progress_bar.update_upload_index(upload_part:part_no, upload_size:ulnow)
                @progress_bar.update_upload_progress()
                last_progress_time = Time.now
            end
        end
    end

    # 执行请求并等待完成
    response = request.run

    # 处理响应结果
    if response.success?
        @progress_bar.complete_upload_index(upload_part:part_no, complete_size:read_length)
        # 只有在未完成时才更新进度条,避免100%时重复显示
        unless @progress_bar.is_done
          @progress_bar.update_upload_progress()
        end
        # 新API不再需要收集ETag,只记录成功的分片号
        @upload_eTags_mutex.synchronize { @upload_eTags << part_no }
        # Logger.instance.info("分片 ##{part_no} 上传成功")  # 避免打断进度条
    elsif response.timed_out?
        # 超时处理
        # Logger.instance.info("分片 ##{part_no} 上传超时,响应时间: #{response.time}秒")  # 避免打断进度条
        handle_retry(upload_params_item, "超时")
    elsif response.code == 0
        # 网络错误
        error_msg = response.return_message || "未知网络错误"
        # Logger.instance.info("分片 ##{part_no} 网络错误: #{error_msg}")  # 避免打断进度条
        handle_retry(upload_params_item, error_msg)
    else
        # HTTP错误
        # Logger.instance.info("分片 ##{part_no} HTTP错误: #{response.code}, Body: #{response.body[0..200] if response.body}")  # 避免打断进度条
        handle_retry(upload_params_item, "HTTP #{response.code}")
    end
end

#set_upload_failed(error_msg = nil) ⇒ Object

安全地设置上传失败状态



178
179
180
181
182
183
# File 'lib/jpsclient/upload/upload_client.rb', line 178

def set_upload_failed(error_msg = nil)
  @upload_failed_mutex.synchronize do
    @upload_failed = true
    Logger.instance.fancyinfo_error("上传失败: #{error_msg}") if error_msg
  end
end

#upload_failed?Boolean

安全地检查上传失败状态

Returns:

  • (Boolean)


173
174
175
# File 'lib/jpsclient/upload/upload_client.rb', line 173

def upload_failed?
  @upload_failed_mutex.synchronize { @upload_failed }
end

#upload_file(binary_file: nil, isAttach: false, enable_cloudflare: false) ⇒ Object

Raises:



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/jpsclient/upload/upload_client.rb', line 44

def upload_file(binary_file:nil, isAttach:false, enable_cloudflare:false)

  raise ExceptionError, "上传文件不能为空" if binary_file.nil? || !File.exist?(binary_file)

  @upload_binary_file = binary_file
  @file_size = File.size(@upload_binary_file)
  @enable_cloudflare = enable_cloudflare

  # 处理空文件
  if @file_size == 0
      raise ExceptionError, "不能上传空文件: #{binary_file}"
  end

  @progress_bar = UploadProgress.new(upload_total_size:@file_size)
  @upload_failed = false  # 重置上传失败标志

  # 生成S3 Key (保持UUID方式)
  file_uuid = SecureRandom.uuid
  extension = File.extname(@upload_binary_file)
  filename = File.basename(@upload_binary_file)

  # 使用配置中的路径前缀
  path_prefix = isAttach ? @upload_config.attach_url : @upload_config.default_url
  s3_key = "#{path_prefix}#{file_uuid}#{extension}"

  # 计算文件大小和分片信息
  total_mb = sprintf("%.2f", @file_size / 1024.0 / 1024.0)
  chunk_size = @upload_config.chunk_size_bytes
  chunks = (@file_size.to_f / chunk_size).ceil

  # 使用配置的并发工作线程数和重试次数
  task_num = @upload_config.concurrent_workers
  retry_count = @upload_config.max_retry_times

  puts "文件路径: #{@upload_binary_file}"
  puts "文件大小: #{total_mb}M"
  puts "上传路径: #{s3_key}"
  puts
  puts "切片大小: #{@upload_config.chunk_size_mb}MB"

  upload_result = nil

  begin
    # 步骤1: 初始化多部分上传
    Logger.instance.fancyinfo_start("初始化上传...")
    init_result = @jps_client.init_file_multipart(
        s3_key: s3_key,
        content_type: "",
        upload_config: @upload_config
    )

    # Debug 模式下打印初始化结果
    if ENV['PINDO_DEBUG'] == '1'
      puts "[PINDO_DEBUG] init_file_multipart 返回: #{init_result.inspect}"
    end

    if init_result.nil? || !init_result.dig("data", "upload_id")
        raise ExceptionError, "初始化上传失败,请检查网络或服务器状态"
    end

    upload_id = init_result["data"]["upload_id"]

    # 准备分片任务队列
    @tasks_queue = Queue.new
    @worker_threads = []
    @expected_parts = chunks  # 保存预期的分片数量

    # 创建分片任务
    chunks.times do |i|
        task_item = {
            "partNo" => i + 1,
            "s3Key" => s3_key,
            "uploadId" => upload_id,
            "retryCount" => retry_count
        }
        @tasks_queue.push(task_item)
    end

    puts "文件分片数: #{chunks}"
    puts "并发上传线程数: #{task_num} (CPU核心数: #{Etc.nprocessors})"
    puts "失败重试次数: #{retry_count}"
    puts

    Logger.instance.fancyinfo_start("开始上传...")
    @upload_eTags = []
    @active_tasks = 0  # 跟踪活动任务数量

    continuous_upload_data_req(concurrency: task_num)

    # 检查上传是否全部成功
    if upload_failed? || @upload_eTags.length != @expected_parts
        upload_result = nil
        Logger.instance.fancyinfo_error("文件#{@upload_binary_file} 上传失败! 😭😭😭")
        return upload_result
    end

    # 进度条已经在上传过程中自动更新到100%,不需要再手动设置

    # 步骤3: 完成多部分上传
    Logger.instance.fancyinfo_start("完成上传...")
    complete_result = @jps_client.complete_file_multipart(
        s3_key: s3_key,
        upload_id: upload_id,
        upload_config: @upload_config
    )

    if complete_result && (complete_result["code"] == 0 || complete_result["code"] == 200)
        # 返回 URL 字符串以保持向后兼容
        upload_result = complete_result.dig("data", "url") || s3_key
        Logger.instance.fancyinfo_success("文件#{@upload_binary_file} 上传成功! 😎😎😎")
    else
        upload_result = nil
        error_msg = complete_result ? (complete_result["msg"] || complete_result["message"] || "未知错误") : "未知错误"
        Logger.instance.fancyinfo_error("文件#{@upload_binary_file} 上传失败: #{error_msg} 😭😭😭")
    end

  rescue => e
    upload_result = nil
    Logger.instance.fancyinfo_error("文件上传过程发生异常: #{e.message} 😭😭😭")
  ensure
    # 确保所有工作线程都被清理
    cleanup_worker_threads
  end

  return upload_result

end

#worker_loopObject

工作线程的主循环



238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/jpsclient/upload/upload_client.rb', line 238

def worker_loop
    loop do
        upload_params_item = nil

        # 从队列获取任务
        @tasks_queue_mutex.synchronize do
            return if @stop_workers || (upload_failed? && @tasks_queue.empty?)

            if @tasks_queue.empty?
                # 队列为空,等待新任务
                @task_complete_cv.wait(@tasks_queue_mutex, 1)
                next
            end

            upload_params_item = @tasks_queue.pop
        end

        # 在锁外增加活跃任务计数,避免嵌套锁
        @active_tasks_mutex.synchronize { @active_tasks += 1 } if upload_params_item

        # 处理任务
        if upload_params_item
            begin
                process_upload_task(upload_params_item)
            rescue => e
                set_upload_failed("处理分片#{upload_params_item["partNo"]}时出错: #{e.message}")
            ensure
                @active_tasks_mutex.synchronize { @active_tasks -= 1 }
                @tasks_queue_mutex.synchronize { @task_complete_cv.broadcast }
            end
        end

        # 检查是否应该退出
        break if upload_failed?
    end
end