Class: ActiveCipherStorage::EncryptedMultipartUpload
- Inherits:
-
Object
- Object
- ActiveCipherStorage::EncryptedMultipartUpload
- Includes:
- KeyUtils
- Defined in:
- lib/active_cipher_storage/multipart_upload.rb
Overview
Session-based multipart upload where the caller sends plaintext chunks across separate HTTP requests. Each chunk is encrypted as an ACS frame before being accumulated and flushed to S3.
S3 requires every part except the last to be >= 5 MiB. Chunks can be any size — this class buffers encrypted frames and flushes S3 parts only when the buffer reaches chunk_size (default 5 MiB).
Flow:
uploader = EncryptedMultipartUpload.new(s3_client:, bucket:)
session_id = uploader.initiate(key: "uploads/doc.pdf")
uploader.upload_part(session_id:, chunk_io: io1) # repeat per chunk
uploader.complete(session_id:)
Session state is kept in an in-memory store by default. For multi-process deployments pass store: Rails.cache (or any object that responds to read/write/delete with the same keyword signatures).
Defined Under Namespace
Classes: MemorySessionStore
Constant Summary collapse
- SESSION_TTL =
24 * 3600
Instance Method Summary collapse
-
#abort(session_id:) ⇒ Object
Aborts the in-progress S3 multipart upload and discards the session.
-
#complete(session_id:) ⇒ Object
Writes a zero-byte FINAL_SEQ sentinel frame, flushes remaining bytes as the last S3 part, and completes the multipart upload.
-
#initialize(s3_client:, bucket:, config: nil, store: nil) ⇒ EncryptedMultipartUpload
constructor
A new instance of EncryptedMultipartUpload.
-
#initiate(key:, metadata: {}) ⇒ Object
Starts a new multipart upload.
-
#upload_part(session_id:, chunk_io:) ⇒ Object
Encrypts a chunk and buffers it.
Constructor Details
#initialize(s3_client:, bucket:, config: nil, store: nil) ⇒ EncryptedMultipartUpload
Returns a new instance of EncryptedMultipartUpload.
28 29 30 31 32 33 34 |
# File 'lib/active_cipher_storage/multipart_upload.rb', line 28 def initialize(s3_client:, bucket:, config: nil, store: nil) @s3 = s3_client @bucket = bucket @config = config || ActiveCipherStorage.configuration @store = store || MemorySessionStore.new @config.validate! end |
Instance Method Details
#abort(session_id:) ⇒ Object
Aborts the in-progress S3 multipart upload and discards the session.
115 116 117 118 119 120 |
# File 'lib/active_cipher_storage/multipart_upload.rb', line 115 def abort(session_id:) session = @store.read(session_id) return unless session abort_s3(session) @store.delete(session_id) end |
#complete(session_id:) ⇒ Object
Writes a zero-byte FINAL_SEQ sentinel frame, flushes remaining bytes as the last S3 part, and completes the multipart upload. Returns { status: :completed, key:, parts_count: }.
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/active_cipher_storage/multipart_upload.rb', line 91 def complete(session_id:) session = load_session!(session_id) # Zero-byte final frame signals end-of-stream to the decryptor. session[:pending] = (session[:pending] + build_frame("".b, session[:encrypted_dek], Format::FINAL_SEQ)).b flush_part(session, session[:pending]) unless session[:pending].empty? session[:pending] = "".b @s3.complete_multipart_upload( bucket: @bucket, key: session[:key], upload_id: session[:upload_id], multipart_upload: { parts: session[:parts] } ) @store.delete(session_id) { status: :completed, key: session[:key], parts_count: session[:parts].length } rescue StandardError abort_s3(session) @store.delete(session_id) raise end |
#initiate(key:, metadata: {}) ⇒ Object
Starts a new multipart upload. Returns an opaque session_id.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/active_cipher_storage/multipart_upload.rb', line 37 def initiate(key:, metadata: {}) dek_bundle = @config.provider.generate_data_key s3_opts = { content_type: "application/octet-stream" } s3_opts[:metadata] = unless .empty? upload_id = @s3.create_multipart_upload(bucket: @bucket, key: key, **s3_opts).upload_id header_io = StringIO.new("".b) Format.write_header(header_io, Format::Header.new( version: Format::VERSION, algorithm: Format::ALGO_AES256GCM, chunked: true, chunk_size: @config.chunk_size, provider_id: @config.provider.provider_id, encrypted_dek: dek_bundle[:encrypted_key] )) session_id = SecureRandom.urlsafe_base64(24) save_session(session_id, { upload_id: upload_id, key: key, encrypted_dek: dek_bundle[:encrypted_key], seq: 0, parts: [], pending: header_io.string }) session_id ensure zero_bytes!(dek_bundle&.dig(:plaintext_key)) end |
#upload_part(session_id:, chunk_io:) ⇒ Object
Encrypts a chunk and buffers it. Flushes complete S3 parts (>= chunk_size) automatically. Returns { status: :ok, parts_uploaded: N }.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/active_cipher_storage/multipart_upload.rb', line 69 def upload_part(session_id:, chunk_io:) session = load_session!(session_id) plaintext = chunk_io.read.b session[:seq] += 1 session[:pending] = (session[:pending] + build_frame(plaintext, session[:encrypted_dek], session[:seq])).b while session[:pending].bytesize >= @config.chunk_size flush_part(session, session[:pending].byteslice(0, @config.chunk_size)) session[:pending] = (session[:pending].byteslice(@config.chunk_size..) || "".b).b end save_session(session_id, session) { status: :ok, parts_uploaded: session[:parts].length } ensure zero_bytes!(plaintext) end |