Module: ActiveCipherStorage::Format

Defined in:
lib/active_cipher_storage/format.rb

Overview

Binary format v1:

Header

[4]  Magic "ACS\x01"
[1]  Version (0x01)
[1]  Algorithm (0x01 = AES-256-GCM)
[1]  Flags    (bit 0 = chunked)
[4]  Chunk-size hint (uint32 BE; 0 if non-chunked)
[2]  Provider-ID length (uint16 BE)
[N]  Provider ID (UTF-8)
[2]  Encrypted DEK length (uint16 BE)
[M]  Encrypted DEK bytes

Non-chunked payload: [12 IV] [K ciphertext] [16 auth-tag]

Chunked payload (repeat until seq == FINAL_SEQ):

[4]  Sequence number (1-based; FINAL_SEQ = 0xFFFFFFFF marks last frame)
[12] Chunk IV
[4]  Ciphertext length (uint32 BE)
[K]  Ciphertext
[16] Auth tag

The final frame may carry zero-length ciphertext when the plaintext length is an exact multiple of chunk_size.

Defined Under Namespace

Classes: Header

Constant Summary collapse

MAGIC =
"ACS\x01".b.freeze
VERSION =
0x01
ALGO_AES256GCM =
0x01
FLAG_CHUNKED =
0x01
IV_SIZE =
12
AUTH_TAG_SIZE =
16
FINAL_SEQ =
0xFFFF_FFFF

Class Method Summary collapse

Class Method Details

.read_chunk(io) ⇒ Object

Returns { seq:, iv:, ciphertext:, auth_tag: } or nil on clean EOF.



91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/active_cipher_storage/format.rb', line 91

def self.read_chunk(io)
  seq_bytes = io.read(4)
  return nil if seq_bytes.nil? || seq_bytes.empty?

  seq        = seq_bytes.unpack1("N")
  iv         = safe_read(io, IV_SIZE)
  ct_len     = safe_read(io, 4).unpack1("N")
  ciphertext = ct_len.positive? ? safe_read(io, ct_len) : "".b
  auth_tag   = safe_read(io, AUTH_TAG_SIZE)

  { seq: seq, iv: iv, ciphertext: ciphertext, auth_tag: auth_tag }
end

.read_header(io) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/active_cipher_storage/format.rb', line 55

def self.read_header(io)
  magic = safe_read(io, 4)
  raise Errors::InvalidFormat, "Invalid magic bytes" unless magic == MAGIC

  version   = safe_read(io, 1).unpack1("C")
  algorithm = safe_read(io, 1).unpack1("C")
  flags     = safe_read(io, 1).unpack1("C")
  chunk_sz  = safe_read(io, 4).unpack1("N")

  validate_header_fields!(version, algorithm, flags)

  provider_len  = safe_read(io, 2).unpack1("n")
  provider_id   = safe_read(io, provider_len).force_encoding("UTF-8")

  dek_len       = safe_read(io, 2).unpack1("n")
  encrypted_dek = safe_read(io, dek_len)

  Header.new(
    version:       version,
    algorithm:     algorithm,
    chunked:       (flags & FLAG_CHUNKED) != 0,
    chunk_size:    chunk_sz,
    provider_id:   provider_id,
    encrypted_dek: encrypted_dek
  )
end

.write_chunk(io, seq:, iv:, ciphertext:, auth_tag:) ⇒ Object



82
83
84
85
86
87
88
# File 'lib/active_cipher_storage/format.rb', line 82

def self.write_chunk(io, seq:, iv:, ciphertext:, auth_tag:)
  io.write([seq].pack("N"))
  io.write(iv)
  io.write([ciphertext.bytesize].pack("N"))
  io.write(ciphertext)
  io.write(auth_tag)
end

.write_header(io, header) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/active_cipher_storage/format.rb', line 40

def self.write_header(io, header)
  provider_bytes = header.provider_id.encode("UTF-8").b
  flags = header.chunked ? FLAG_CHUNKED : 0x00

  io.write(MAGIC)
  io.write([VERSION].pack("C"))
  io.write([ALGO_AES256GCM].pack("C"))
  io.write([flags].pack("C"))
  io.write([header.chunk_size.to_i].pack("N"))
  io.write([provider_bytes.bytesize].pack("n"))
  io.write(provider_bytes)
  io.write([header.encrypted_dek.bytesize].pack("n"))
  io.write(header.encrypted_dek)
end