Class: ActiveCipherStorage::EncryptedMultipartUpload

Inherits:
Object
  • Object
show all
Includes:
KeyUtils
Defined in:
lib/active_cipher_storage/multipart_upload.rb

Overview

Session-based multipart upload where the caller sends plaintext chunks across separate HTTP requests. Each chunk is encrypted as an ACS frame before being accumulated and flushed to S3.

S3 requires every part except the last to be >= 5 MiB. Chunks can be any size — this class buffers encrypted frames and flushes S3 parts only when the buffer reaches chunk_size (default 5 MiB).

Flow:

uploader = EncryptedMultipartUpload.new(s3_client:, bucket:)
session_id = uploader.initiate(key: "uploads/doc.pdf")
uploader.upload_part(session_id:, chunk_io: io1)   # repeat per chunk
uploader.complete(session_id:)

Session state is kept in an in-memory store by default. For multi-process deployments pass store: Rails.cache (or any object that responds to read/write/delete with the same keyword signatures).

Defined Under Namespace

Classes: MemorySessionStore

Constant Summary collapse

SESSION_TTL =
24 * 3600

Instance Method Summary collapse

Constructor Details

#initialize(s3_client:, bucket:, config: nil, store: nil) ⇒ EncryptedMultipartUpload

Returns a new instance of EncryptedMultipartUpload.



28
29
30
31
32
33
34
# File 'lib/active_cipher_storage/multipart_upload.rb', line 28

def initialize(s3_client:, bucket:, config: nil, store: nil)
  @s3     = s3_client
  @bucket = bucket
  @config = config || ActiveCipherStorage.configuration
  @store  = store || MemorySessionStore.new
  @config.validate!
end

Instance Method Details

#abort(session_id:) ⇒ Object

Aborts the in-progress S3 multipart upload and discards the session.



115
116
117
118
119
120
# File 'lib/active_cipher_storage/multipart_upload.rb', line 115

def abort(session_id:)
  session = @store.read(session_id)
  return unless session
  abort_s3(session)
  @store.delete(session_id)
end

#complete(session_id:) ⇒ Object

Writes a zero-byte FINAL_SEQ sentinel frame, flushes remaining bytes as the last S3 part, and completes the multipart upload. Returns { status: :completed, key:, parts_count: }.



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/active_cipher_storage/multipart_upload.rb', line 91

def complete(session_id:)
  session = load_session!(session_id)

  # Zero-byte final frame signals end-of-stream to the decryptor.
  session[:pending] = (session[:pending] +
    build_frame("".b, session[:encrypted_dek], Format::FINAL_SEQ)).b

  flush_part(session, session[:pending]) unless session[:pending].empty?
  session[:pending] = "".b

  @s3.complete_multipart_upload(
    bucket: @bucket, key: session[:key],
    upload_id: session[:upload_id],
    multipart_upload: { parts: session[:parts] }
  )
  @store.delete(session_id)
  { status: :completed, key: session[:key], parts_count: session[:parts].length }
rescue StandardError
  abort_s3(session)
  @store.delete(session_id)
  raise
end

#initiate(key:, metadata: {}) ⇒ Object

Starts a new multipart upload. Returns an opaque session_id.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/active_cipher_storage/multipart_upload.rb', line 37

def initiate(key:, metadata: {})
  dek_bundle = @config.provider.generate_data_key
  s3_opts    = { content_type: "application/octet-stream" }
  s3_opts[:metadata] =  unless .empty?
  upload_id  = @s3.create_multipart_upload(bucket: @bucket, key: key, **s3_opts).upload_id

  header_io = StringIO.new("".b)
  Format.write_header(header_io, Format::Header.new(
    version:       Format::VERSION,
    algorithm:     Format::ALGO_AES256GCM,
    chunked:       true,
    chunk_size:    @config.chunk_size,
    provider_id:   @config.provider.provider_id,
    encrypted_dek: dek_bundle[:encrypted_key]
  ))

  session_id = SecureRandom.urlsafe_base64(24)
  save_session(session_id, {
    upload_id:     upload_id,
    key:           key,
    encrypted_dek: dek_bundle[:encrypted_key],
    seq:           0,
    parts:         [],
    pending:       header_io.string
  })
  session_id
ensure
  zero_bytes!(dek_bundle&.dig(:plaintext_key))
end

#upload_part(session_id:, chunk_io:) ⇒ Object

Encrypts a chunk and buffers it. Flushes complete S3 parts (>= chunk_size) automatically. Returns { status: :ok, parts_uploaded: N }.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/active_cipher_storage/multipart_upload.rb', line 69

def upload_part(session_id:, chunk_io:)
  session   = load_session!(session_id)
  plaintext = chunk_io.read.b
  session[:seq] += 1

  session[:pending] = (session[:pending] +
    build_frame(plaintext, session[:encrypted_dek], session[:seq])).b

  while session[:pending].bytesize >= @config.chunk_size
    flush_part(session, session[:pending].byteslice(0, @config.chunk_size))
    session[:pending] = (session[:pending].byteslice(@config.chunk_size..) || "".b).b
  end

  save_session(session_id, session)
  { status: :ok, parts_uploaded: session[:parts].length }
ensure
  zero_bytes!(plaintext)
end