Database Encryption Support

CouchDB can optionally encrypt databases and views.

We use AES in Counter Mode, which ensures we can encrypt and decrypt
any section of the file without padding or alignment. The ciphertext
is the same length as the plaintext. This mode provides
confidentiality but not authentication.

Key management is configurable, a system administrator can write a
module implementing the couch_encryption_manager behaviour with any
implementation.
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 486ed7c..c7dcf0d 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -827,14 +827,15 @@
         needs_commit = true
     }}.
 
-open_db_file(FilePath, Options) ->
-    case couch_file:open(FilePath, Options) of
+open_db_file(FilePath, Options0) ->
+    case couch_file:open(FilePath, Options0) of
         {ok, Fd} ->
             {ok, Fd};
         {error, enoent} ->
             % Couldn't find file. is there a compact version? This ca
             % happen (rarely) if we crashed during the file switch.
-            case couch_file:open(FilePath ++ ".compact", [nologifmissing]) of
+            Options1 = couch_encryption_manager:encryption_options(Options0),
+            case couch_file:open(FilePath ++ ".compact", [nologifmissing | Options1]) of
                 {ok, Fd} ->
                     Fmt = "Recovering from compaction file: ~s~s",
                     couch_log:info(Fmt, [FilePath, ".compact"]),
diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl
index 8ed55b5..ea0bde9 100644
--- a/src/couch/src/couch_bt_engine_compactor.erl
+++ b/src/couch/src/couch_bt_engine_compactor.erl
@@ -54,7 +54,7 @@
     couch_db_engine:trigger_on_compact(DbName),
 
     ?COMP_EVENT(init),
-    {ok, InitCompSt} = open_compaction_files(DbName, St, Options),
+    {ok, InitCompSt} = open_compaction_files(DbName, St, [{db_name, DbName} | Options]),
     ?COMP_EVENT(files_opened),
 
     Stages = [
@@ -94,8 +94,9 @@
     } = OldSt,
     DataFile = DbFilePath ++ ".compact.data",
     MetaFile = DbFilePath ++ ".compact.meta",
-    {ok, DataFd, DataHdr} = open_compaction_file(DataFile),
-    {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile),
+    EncryptionOptions = couch_encryption_manager:encryption_options(Options),
+    {ok, DataFd, DataHdr} = open_compaction_file(DataFile, EncryptionOptions),
+    {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile, EncryptionOptions),
     DataHdrIsDbHdr = couch_bt_engine_header:is_header(DataHdr),
     CompSt =
         case {DataHdr, MetaHdr} of
@@ -623,15 +624,15 @@
         new_st = St1
     }.
 
-open_compaction_file(FilePath) ->
-    case couch_file:open(FilePath, [nologifmissing]) of
+open_compaction_file(FilePath, FileOpenOptions) ->
+    case couch_file:open(FilePath, [nologifmissing | FileOpenOptions]) of
         {ok, Fd} ->
             case couch_file:read_header(Fd) of
                 {ok, Header} -> {ok, Fd, Header};
                 no_valid_header -> {ok, Fd, nil}
             end;
         {error, enoent} ->
-            {ok, Fd} = couch_file:open(FilePath, [create]),
+            {ok, Fd} = couch_file:open(FilePath, [create | FileOpenOptions]),
             {ok, Fd, nil}
     end.
 
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 17a1e91..ffe8d60 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -37,7 +37,8 @@
     erlang:put(io_priority, {db_update, DbName}),
     update_idle_limit_from_config(),
     DefaultSecObj = default_security_object(DbName),
-    Options = [{default_security_object, DefaultSecObj} | Options0],
+    Options =
+        [{db_name, DbName}, {default_security_object, DefaultSecObj} | Options0],
     try
         {ok, EngineState} = couch_db_engine:init(Engine, FilePath, Options),
         Db = init_db(DbName, FilePath, EngineState, Options),
diff --git a/src/couch/src/couch_encryption_manager.erl b/src/couch/src/couch_encryption_manager.erl
new file mode 100644
index 0000000..140433a
--- /dev/null
+++ b/src/couch/src/couch_encryption_manager.erl
@@ -0,0 +1,56 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_encryption_manager).
+
+-export([new_dek/1, unwrap_dek/2, encryption_options/1]).
+
+-callback new_dek(DbName :: binary()) ->
+    {ok, KeyID :: binary(), DEK :: binary(), WEK :: binary()}
+    | dont_encrypt
+    | {error, Reason :: term()}.
+
+-callback unwrap_dek(KeyID :: binary(), WEK :: binary()) ->
+    {ok, DEK :: binary()}
+    | {ok, NewKeyID :: binary(), DEK :: binary(), NewWEK :: binary()}
+    | {error, Reason :: term()}.
+
+new_dek(DbName) ->
+    case manager() of
+        undefined ->
+            dont_encrypt;
+        Module ->
+            Module:new_dek(DbName)
+    end.
+
+unwrap_dek(KeyID, WEK) ->
+    case manager() of
+        undefined ->
+            {error, encryption_not_supported};
+        Manager ->
+            Manager:unwrap_dek(KeyID, WEK)
+    end.
+
+manager() ->
+    case config:get("encryption", "manager") of
+        undefined ->
+            undefined;
+        Module ->
+            list_to_atom(Module)
+    end.
+
+%% Extract just the encryption related options from an options list.
+encryption_options(Options) ->
+    case lists:keyfind(db_name, 1, Options) of
+        false -> [];
+        {db_name, DbName} -> [{db_name, DbName}]
+    end.
diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index ba8d9c4..fc300fd 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -20,9 +20,35 @@
 -define(MONITOR_CHECK, 10000).
 % 4 KiB
 -define(SIZE_BLOCK, 16#1000).
+-define(ENCRYPTION_HEADER_SIZE, 16#800).
 -define(IS_OLD_STATE(S), is_pid(S#file.db_monitor)).
 -define(PREFIX_SIZE, 5).
 -define(DEFAULT_READ_COUNT, 1024).
+-define(ENCRYPTED_HEADER_MARKER, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15).
+
+%% Database encryption design details
+
+%% On file creation, if an encryption manager is configured, the
+%% manager is asked to generate a new data encryption key (DEK). The
+%% manager either returns a new DEK along with a wrapped form of the
+%% same value that is safe to store in plain view (WEK) or it returns
+%% `dont_encrypt` to indicate that the database should not be
+%% encrypted.
+
+%% When opening an encrypted file the encryption manager is asked to
+%% unwrap the WEK retrieved from encryption header. The manager can
+%% return just the unwrapped DEK or can return the unwrapped DEK and a
+%% new WEK (wrapping the same DEK value) and a new KeyID. This
+%% supports efficient key rotation.
+
+%% The first 4096 bytes of an encrypted file contains two copies of
+%% the encrypted header, to protect against partial writes. If the two
+%% copies are valid but different the first copy is used.
+
+%% All bytes, excepting the first 4096 bytes, are encrypted with AES
+%% in Counter Mode. A random initialisation vector is generated at
+%% file creation and stored in the encryption header. This
+%% initialisation vector is re-randomised if the file is truncated.
 
 -type block_id() :: non_neg_integer().
 -type location() :: non_neg_integer().
@@ -33,9 +59,38 @@
     is_sys,
     eof = 0,
     db_monitor,
-    pread_limit = 0
+    pread_limit = 0,
+    iv,
+    dek
 }).
 
+-define(encrypt_ctr(File, Pos, Data),
+    crypto:stream_encrypt(
+        crypto:stream_init(aes_ctr, File#file.dek, aes_ctr(File#file.iv, Pos)), Data
+    )
+).
+-define(decrypt_ctr(File, Pos, Data),
+    crypto:stream_decrypt(
+        crypto:stream_init(aes_ctr, File#file.dek, aes_ctr(File#file.iv, Pos)), Data
+    )
+).
+
+-ifdef(OTP_RELEASE).
+-if(?OTP_RELEASE >= 22).
+
+-undef(encrypt_ctr).
+-define(encrypt_ctr(File, Pos, Data),
+    crypto:crypto_one_time(aes_256_ctr, File#file.dek, aes_ctr(File#file.iv, Pos), Data, true)
+).
+
+-undef(decrypt_ctr).
+-define(decrypt_ctr(File, Pos, Data),
+    crypto:crypto_one_time(aes_256_ctr, File#file.dek, aes_ctr(File#file.iv, Pos), Data, false)
+).
+
+-endif.
+-endif.
+
 % public API
 -export([open/1, open/2, close/1, bytes/1, sync/1, truncate/2, set_db_pid/2]).
 -export([pread_term/2, pread_iolist/2, pread_binary/2]).
@@ -439,7 +494,20 @@
                                     ok = file:sync(Fd),
                                     maybe_track_open_os_files(Options),
                                     erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
-                                    {ok, #file{fd = Fd, is_sys = IsSys, pread_limit = Limit}};
+                                    case
+                                        init_crypto(
+                                            Filepath,
+                                            #file{
+                                                fd = Fd, is_sys = IsSys, pread_limit = Limit
+                                            },
+                                            Options
+                                        )
+                                    of
+                                        {ok, File} ->
+                                            {ok, File};
+                                        Error ->
+                                            init_status_error(ReturnPid, Ref, Error)
+                                    end;
                                 false ->
                                     ok = file:close(Fd),
                                     init_status_error(ReturnPid, Ref, {error, eexist})
@@ -447,7 +515,20 @@
                         false ->
                             maybe_track_open_os_files(Options),
                             erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
-                            {ok, #file{fd = Fd, is_sys = IsSys, pread_limit = Limit}}
+                            case
+                                init_crypto(
+                                    Filepath,
+                                    #file{
+                                        fd = Fd, is_sys = IsSys, pread_limit = Limit
+                                    },
+                                    Options
+                                )
+                            of
+                                {ok, File} ->
+                                    {ok, File};
+                                Error ->
+                                    init_status_error(ReturnPid, Ref, Error)
+                            end
                     end;
                 Error ->
                     init_status_error(ReturnPid, Ref, Error)
@@ -464,7 +545,20 @@
                             maybe_track_open_os_files(Options),
                             {ok, Eof} = file:position(Fd, eof),
                             erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
-                            {ok, #file{fd = Fd, eof = Eof, is_sys = IsSys, pread_limit = Limit}};
+                            case
+                                init_crypto(
+                                    Filepath,
+                                    #file{
+                                        fd = Fd, eof = Eof, is_sys = IsSys, pread_limit = Limit
+                                    },
+                                    Options
+                                )
+                            of
+                                {ok, File} ->
+                                    {ok, File};
+                                Error ->
+                                    init_status_error(ReturnPid, Ref, Error)
+                            end;
                         Error ->
                             init_status_error(ReturnPid, Ref, Error)
                     end;
@@ -563,7 +657,9 @@
             % can't fathom.
             {stop, Error, Error, #file{fd = nil}}
     end;
-handle_call({truncate, Pos}, _From, #file{fd = Fd} = File) ->
+handle_call({truncate, Pos}, _From, #file{fd = Fd, dek = DEK} = File) when
+    DEK == undefined orelse Pos >= ?SIZE_BLOCK
+->
     {ok, Pos} = file:position(Fd, Pos),
     case file:truncate(Fd) of
         ok ->
@@ -571,16 +667,41 @@
         Error ->
             {reply, Error, File}
     end;
-handle_call({append_bin, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
+%% truncating an encrypted file earlier than the end of the encryption header.
+%% reuse the wrapped key with a new iv.
+handle_call({truncate, Pos}, _From, #file{fd = Fd, dek = DEK} = File0) when
+    DEK /= undefined andalso Pos < ?SIZE_BLOCK
+->
+    case read_encryption_header(File0) of
+        {ok, {KeyID, WEK, _IV}} ->
+            {ok, 0} = file:position(Fd, 0),
+            case file:truncate(Fd) of
+                ok ->
+                    File1 = File0#file{eof = 0},
+                    IV = new_aes_iv(),
+                    case write_encryption_header(File1, KeyID, WEK, IV) of
+                        {ok, File2} ->
+                            ok = file:sync(File2#file.fd),
+                            {reply, ok, init_crypto_file(File2, DEK, IV)};
+                        {error, Reason} ->
+                            {reply, {error, Reason}, File1}
+                    end;
+                Error ->
+                    {reply, Error, File0}
+            end;
+        {error, Reason} ->
+            {reply, {error, Reason}, File0}
+    end;
+handle_call({append_bin, Bin}, _From, #file{eof = Pos} = File) ->
     Blocks = make_blocks(Pos rem ?SIZE_BLOCK, Bin),
     Size = iolist_size(Blocks),
-    case file:write(Fd, Blocks) of
+    case encrypted_write(File, Blocks) of
         ok ->
             {reply, {ok, Pos, Size}, File#file{eof = Pos + Size}};
         Error ->
             {reply, Error, reset_eof(File)}
     end;
-handle_call({append_bins, Bins}, _From, #file{fd = Fd, eof = Pos} = File) ->
+handle_call({append_bins, Bins}, _From, #file{eof = Pos} = File) ->
     {BlockResps, FinalPos} = lists:mapfoldl(
         fun(Bin, PosAcc) ->
             Blocks = make_blocks(PosAcc rem ?SIZE_BLOCK, Bin),
@@ -591,13 +712,13 @@
         Bins
     ),
     {AllBlocks, Resps} = lists:unzip(BlockResps),
-    case file:write(Fd, AllBlocks) of
+    case encrypted_write(File, AllBlocks) of
         ok ->
             {reply, {ok, Resps}, File#file{eof = FinalPos}};
         Error ->
             {reply, Error, reset_eof(File)}
     end;
-handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
+handle_call({write_header, Bin}, _From, #file{eof = Pos} = File) ->
     BinSize = byte_size(Bin),
     case Pos rem ?SIZE_BLOCK of
         0 ->
@@ -606,14 +727,14 @@
             Padding = <<0:(8 * (?SIZE_BLOCK - BlockOffset))>>
     end,
     FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(5, [Bin])],
-    case file:write(Fd, FinalBin) of
+    case encrypted_write(File, FinalBin) of
         ok ->
             {reply, ok, File#file{eof = Pos + iolist_size(FinalBin)}};
         Error ->
             {reply, Error, reset_eof(File)}
     end;
-handle_call(find_header, _From, #file{fd = Fd, eof = Pos} = File) ->
-    {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
+handle_call(find_header, _From, #file{eof = Pos} = File) ->
+    {reply, find_header(File, Pos div ?SIZE_BLOCK), File}.
 
 handle_cast(close, Fd) ->
     {stop, normal, Fd}.
@@ -641,26 +762,26 @@
     {_Fd, FilePath} = couch_util:get_value(couch_file_fd, PDict),
     [{data, [{"State", File}, {"InitialFilePath", FilePath}]}].
 
-find_header(Fd, Block) ->
-    case (catch load_header(Fd, Block)) of
+find_header(#file{} = File, Block) ->
+    case (catch load_header(File, Block)) of
         {ok, Bin} ->
             {ok, Bin};
         _Error ->
             ReadCount = config:get_integer(
                 "couchdb", "find_header_read_count", ?DEFAULT_READ_COUNT
             ),
-            find_header(Fd, Block - 1, ReadCount)
+            find_header(File, Block - 1, ReadCount)
     end.
 
-load_header(Fd, Block) ->
+load_header(#file{} = File, Block) ->
     {ok, <<1, HeaderLen:32/integer, RestBlock/binary>>} =
-        file:pread(Fd, Block * ?SIZE_BLOCK, ?SIZE_BLOCK),
-    load_header(Fd, Block * ?SIZE_BLOCK, HeaderLen, RestBlock).
+        encrypted_pread(File, Block * ?SIZE_BLOCK, ?SIZE_BLOCK),
+    load_header(File, Block * ?SIZE_BLOCK, HeaderLen, RestBlock).
 
-load_header(Fd, Pos, HeaderLen) ->
-    load_header(Fd, Pos, HeaderLen, <<>>).
+load_header(#file{} = File, Pos, HeaderLen) ->
+    load_header(File, Pos, HeaderLen, <<>>).
 
-load_header(Fd, Pos, HeaderLen, RestBlock) ->
+load_header(#file{} = File, Pos, HeaderLen, RestBlock) ->
     TotalBytes = calculate_total_read_len(?PREFIX_SIZE, HeaderLen),
     RawBin =
         case TotalBytes =< byte_size(RestBlock) of
@@ -670,7 +791,7 @@
             false ->
                 ReadStart = Pos + ?PREFIX_SIZE + byte_size(RestBlock),
                 ReadLen = TotalBytes - byte_size(RestBlock),
-                {ok, Missing} = file:pread(Fd, ReadStart, ReadLen),
+                {ok, Missing} = encrypted_pread(File, ReadStart, ReadLen),
                 <<RestBlock/binary, Missing/binary>>
         end,
     <<Md5Sig:16/binary, HeaderBin/binary>> =
@@ -681,12 +802,12 @@
 %% Read multiple block locations using a single file:pread/2.
 -spec find_header(file:fd(), block_id(), non_neg_integer()) ->
     {ok, binary()} | no_valid_header.
-find_header(_Fd, Block, _ReadCount) when Block < 0 ->
+find_header(_File, Block, _ReadCount) when Block < 0 ->
     no_valid_header;
-find_header(Fd, Block, ReadCount) ->
+find_header(#file{} = File, Block, ReadCount) ->
     FirstBlock = max(0, Block - ReadCount + 1),
     BlockLocations = [?SIZE_BLOCK * B || B <- lists:seq(FirstBlock, Block)],
-    {ok, DataL} = file:pread(Fd, [{L, ?PREFIX_SIZE} || L <- BlockLocations]),
+    {ok, DataL} = encrypted_pread(File, [{L, ?PREFIX_SIZE} || L <- BlockLocations]),
     %% Since BlockLocations are ordered from oldest to newest, we rely
     %% on lists:foldl/3 to reverse the order, making HeaderLocations
     %% correctly ordered from newest to oldest.
@@ -700,27 +821,27 @@
         [],
         lists:zip(BlockLocations, DataL)
     ),
-    case find_newest_header(Fd, HeaderLocations) of
+    case find_newest_header(File, HeaderLocations) of
         {ok, _Location, HeaderBin} ->
             {ok, HeaderBin};
         _ ->
             ok = file:advise(
-                Fd, hd(BlockLocations), ReadCount * ?SIZE_BLOCK, dont_need
+                File#file.fd, hd(BlockLocations), ReadCount * ?SIZE_BLOCK, dont_need
             ),
             NextBlock = hd(BlockLocations) div ?SIZE_BLOCK - 1,
-            find_header(Fd, NextBlock, ReadCount)
+            find_header(File, NextBlock, ReadCount)
     end.
 
 -spec find_newest_header(file:fd(), [{location(), header_size()}]) ->
     {ok, location(), binary()} | not_found.
-find_newest_header(_Fd, []) ->
+find_newest_header(_File, []) ->
     not_found;
-find_newest_header(Fd, [{Location, Size} | LocationSizes]) ->
-    case (catch load_header(Fd, Location, Size)) of
+find_newest_header(#file{} = File, [{Location, Size} | LocationSizes]) ->
+    case (catch load_header(File, Location, Size)) of
         {ok, HeaderBin} ->
             {ok, Location, HeaderBin};
         _Error ->
-            find_newest_header(Fd, LocationSizes)
+            find_newest_header(File, LocationSizes)
     end.
 
 -spec read_raw_iolist_int(#file{}, Pos :: non_neg_integer(), Len :: non_neg_integer()) ->
@@ -728,9 +849,9 @@
 % 0110 UPGRADE CODE
 read_raw_iolist_int(Fd, {Pos, _Size}, Len) ->
     read_raw_iolist_int(Fd, Pos, Len);
-read_raw_iolist_int(#file{fd = Fd} = File, Pos, Len) ->
+read_raw_iolist_int(#file{} = File, Pos, Len) ->
     {Pos, TotalBytes} = get_pread_locnum(File, Pos, Len),
-    case catch file:pread(Fd, Pos, TotalBytes) of
+    case catch encrypted_pread(File, Pos, TotalBytes) of
         {ok, <<RawBin:TotalBytes/binary>>} ->
             {remove_block_prefixes(Pos rem ?SIZE_BLOCK, RawBin), Pos + TotalBytes};
         Else ->
@@ -744,15 +865,15 @@
             throw({file_truncate_error, Else, Filepath})
     end.
 
-% TODO: check if this is really unused
-read_multi_raw_iolists_int(#file{fd = Fd} = File, PosLens) ->
+% used in couch_bt_engine_compactor.erl via pread_terms/2
+read_multi_raw_iolists_int(#file{} = File, PosLens) ->
     LocNums = lists:map(
         fun({Pos, Len}) ->
             get_pread_locnum(File, Pos, Len)
         end,
         PosLens
     ),
-    {ok, Bins} = file:pread(Fd, LocNums),
+    {ok, Bins} = encrypted_pread(File, LocNums),
     lists:zipwith(
         fun({Pos, TotalBytes}, Bin) ->
             <<RawBin:TotalBytes/binary>> = Bin,
@@ -905,6 +1026,192 @@
     {ok, Eof} = file:position(File#file.fd, eof),
     File#file{eof = Eof}.
 
+%% new file.
+init_crypto(_Filepath, #file{eof = 0, dek = undefined} = File0, Options) ->
+    case lists:keyfind(db_name, 1, Options) of
+        {db_name, DbName} ->
+            case couch_encryption_manager:new_dek(DbName) of
+                {ok, KeyID, DEK, WEK} ->
+                    IV = new_aes_iv(),
+                    case write_encryption_header(File0, KeyID, WEK, IV) of
+                        {ok, File1} ->
+                            ok = file:sync(File1#file.fd),
+                            {ok, init_crypto_file(File1, DEK, IV)};
+                        {error, Reason} ->
+                            {error, Reason}
+                    end;
+                dont_encrypt ->
+                    {ok, File0};
+                {error, Reason} ->
+                    {error, Reason}
+            end;
+        false ->
+            {ok, File0}
+    end;
+%% we're opening an existing file and need to unwrap the key if file is encrypted.
+init_crypto(Filepath, #file{eof = Eof, dek = undefined} = File, _Options) when Eof >= ?SIZE_BLOCK ->
+    case read_encryption_header(File) of
+        {ok, {KeyID, WEK, IV}} ->
+            case couch_encryption_manager:unwrap_dek(KeyID, WEK) of
+                {ok, DEK} ->
+                    {ok, init_crypto_file(File, DEK, IV)};
+                {ok, NewKeyID, DEK, NewWEK} ->
+                    %% manager has rewrapped the DEK with a new key, update our header.
+                    case file:open(Filepath, [read, write, raw]) of
+                        {ok, Fd} ->
+                            case write_encryption_header(#file{fd = Fd}, NewKeyID, NewWEK, IV) of
+                                {ok, _File} ->
+                                    ok = file:sync(Fd),
+                                    ok = file:close(Fd),
+                                    {ok, init_crypto_file(File, DEK, IV)};
+                                {error, Reason} ->
+                                    ok = file:close(Fd),
+                                    {error, Reason}
+                            end;
+                        {error, Reason} ->
+                            {error, Reason}
+                    end;
+                {error, Reason} ->
+                    {error, Reason}
+            end;
+        not_encrypted ->
+            {ok, File};
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+init_crypto_file(#file{} = File, DEK, IV) when is_binary(DEK), is_binary(IV) ->
+    File#file{iv = crypto:bytes_to_integer(IV), dek = DEK}.
+
+write_encryption_header(#file{eof = 0} = File, KeyID, WrappedKey, IV) when
+    bit_size(KeyID) =< 128, bit_size(WrappedKey) == 512, bit_size(IV) == 128
+->
+    Header = [<<?ENCRYPTED_HEADER_MARKER>>, <<(byte_size(KeyID)):16>>, KeyID, IV, WrappedKey],
+    PaddedHeader = [Header, <<0:((?ENCRYPTION_HEADER_SIZE - iolist_size(Header) - 32) * 8)>>],
+    DigestHeader = [PaddedHeader, crypto:hash(sha256, PaddedHeader)],
+    ?ENCRYPTION_HEADER_SIZE = iolist_size(DigestHeader),
+    case file:write(File#file.fd, [DigestHeader, DigestHeader]) of
+        ok ->
+            {ok, File#file{eof = ?SIZE_BLOCK}};
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+read_encryption_header(#file{} = File) ->
+    case file:pread(File#file.fd, 0, ?SIZE_BLOCK) of
+        {ok,
+            <<?ENCRYPTED_HEADER_MARKER, Bin1:(?ENCRYPTION_HEADER_SIZE - 16)/binary,
+                ?ENCRYPTED_HEADER_MARKER, Bin2/binary>>} ->
+            Header1 = extract_header(<<?ENCRYPTED_HEADER_MARKER, Bin1/binary>>),
+            Header2 = extract_header(<<?ENCRYPTED_HEADER_MARKER, Bin2/binary>>),
+            {_Fd, Filepath} = get(couch_file_fd),
+            case {Header1, Header2} of
+                {{ok, Header}, {ok, Header}} ->
+                    % headers match
+                    {ok, Header};
+                {{ok, Header1}, {ok, Header2}} ->
+                    couch_log:warning(
+                        "~s: Encryption header version differences.~nPrimary header: ~p~nSecondary header: ~p. Using primary header",
+                        [Filepath, Header1, Header2]
+                    ),
+                    {ok, Header1};
+                {{ok, Header}, {error, Reason}} ->
+                    couch_log:warning(
+                        "~s: Secondary encryption header corruption (error: ~p). Using primary header.",
+                        [
+                            Filepath, Reason
+                        ]
+                    ),
+                    {ok, Header};
+                {{error, Reason}, {ok, Header}} ->
+                    couch_log:warning(
+                        "~s: Primary encryption header corruption (error: ~p). Using secondary header.",
+                        [
+                            Filepath, Reason
+                        ]
+                    ),
+                    {ok, Header};
+                {{error, Reason1}, {error, Reason2}} ->
+                    couch_log:error("~s: Both encryption headers corrupted (errors: ~p, ~p).", [
+                        Filepath, Reason1, Reason2
+                    ]),
+                    {error, corrupted_encryption_header}
+            end;
+        {ok, _} ->
+            not_encrypted;
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+extract_header(
+    <<?ENCRYPTED_HEADER_MARKER, KeyIDLen:16, KeyID:(KeyIDLen)/binary, IV:16/binary,
+        WrappedKey:64/binary, _/binary>> = Bin
+) ->
+    case check_header(Bin) of
+        true ->
+            {ok, {KeyID, WrappedKey, IV}};
+        false ->
+            {error, corrupted_encryption_header}
+    end;
+extract_header(_) ->
+    {error, corrupted_encryption_header}.
+
+check_header(Bin) ->
+    Data = binary:part(Bin, 0, byte_size(Bin) - 32),
+    Digest = binary:part(Bin, byte_size(Bin), -32),
+    Digest == crypto:hash(sha256, Data).
+
+%% We can encrypt any section of the file but we must make
+%% sure we align with the key stream.
+encrypted_write(#file{dek = undefined} = File, Data) ->
+    file:write(File#file.fd, Data);
+encrypted_write(#file{} = File, Data) ->
+    CipherText = ?encrypt_ctr(File, File#file.eof, pad(File#file.eof, Data)),
+    file:write(File#file.fd, unpad(File#file.eof, CipherText)).
+
+encrypted_pread(#file{dek = undefined} = File, LocNums) ->
+    file:pread(File#file.fd, LocNums);
+encrypted_pread(#file{} = File, LocNums) ->
+    case file:pread(File#file.fd, LocNums) of
+        {ok, DataL} ->
+            {ok,
+                lists:zipwith(
+                    fun({Pos, _Len}, CipherText) ->
+                        PlainText = ?decrypt_ctr(File, Pos, pad(Pos, CipherText)),
+                        unpad(Pos, PlainText)
+                    end,
+                    LocNums,
+                    DataL
+                )};
+        Else ->
+            Else
+    end.
+
+encrypted_pread(#file{dek = undefined} = File, Pos, Len) ->
+    file:pread(File#file.fd, Pos, Len);
+encrypted_pread(#file{} = File, Pos, Len) ->
+    case file:pread(File#file.fd, Pos, Len) of
+        {ok, CipherText} ->
+            PlainText = ?decrypt_ctr(File, Pos, pad(Pos, CipherText)),
+            {ok, unpad(Pos, PlainText)};
+        Else ->
+            Else
+    end.
+
+new_aes_iv() ->
+    crypto:strong_rand_bytes(16).
+
+aes_ctr(IV, Pos) ->
+    <<(IV + (Pos div 16)):128>>.
+
+pad(Pos, IOData) ->
+    [<<0:(Pos rem 16 * 8)>>, IOData].
+
+unpad(Pos, Bin) when is_binary(Bin) ->
+    Size = Pos rem 16 * 8,
+    <<_:Size, Result/binary>> = Bin,
+    Result.
+
 -ifdef(TEST).
 -include_lib("couch/include/couch_eunit.hrl").
 
diff --git a/src/couch/test/eunit/couch_encryption_manager_demo.erl b/src/couch/test/eunit/couch_encryption_manager_demo.erl
new file mode 100644
index 0000000..65e2fda
--- /dev/null
+++ b/src/couch/test/eunit/couch_encryption_manager_demo.erl
@@ -0,0 +1,71 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_encryption_manager_demo).
+-behaviour(couch_encryption_manager).
+
+-export([new_dek/1, unwrap_dek/2]).
+
+new_dek(_DbName) ->
+    Keys = keys(),
+    {KeyID, KEK} = random_key(Keys),
+    DEK = crypto:strong_rand_bytes(32),
+    {ok, KeyID, DEK, wrap_key(KeyID, KEK, DEK)}.
+
+unwrap_dek(KeyID, WEK) ->
+    Keys = keys(),
+    case lists:keyfind(KeyID, 1, Keys) of
+        {KeyID, KEK} ->
+            case rand:uniform(2) of
+                1 ->
+                    unwrap_key(KeyID, KEK, WEK);
+                2 ->
+                    case unwrap_key(KeyID, KEK, WEK) of
+                        {ok, DEK} ->
+                            {NewKeyID, NewKEK} = random_key(Keys),
+                            {ok, NewKeyID, DEK, wrap_key(NewKeyID, NewKEK, DEK)};
+                        {error, Reason} ->
+                            {error, Reason}
+                    end
+            end;
+        false ->
+            {error, invalid_key_id}
+    end.
+
+keys() ->
+    [
+        {<<"key0">>, <<0:256>>},
+        {<<"key1">>, <<1:256>>},
+        {<<"key2">>, <<2:256>>},
+        {<<"key3">>, <<3:256>>}
+    ].
+
+random_key(Keys) ->
+    lists:nth(rand:uniform(length(Keys)), Keys).
+
+wrap_key(KeyID, KEK, DEK) when is_binary(KEK), is_binary(DEK) ->
+    IV = crypto:strong_rand_bytes(16),
+    {<<_:32/binary>> = CipherText, <<_:16/binary>> = CipherTag} =
+        crypto:crypto_one_time_aead(aes_256_gcm, KEK, IV, DEK, KeyID, 16, true),
+    <<IV:16/binary, CipherText/binary, CipherTag/binary>>.
+
+unwrap_key(KeyID, KEK, <<IV:16/binary, CipherText:32/binary, CipherTag:16/binary>>) when
+    is_binary(KEK)
+->
+    case crypto:crypto_one_time_aead(aes_256_gcm, KEK, IV, CipherText, KeyID, CipherTag, false) of
+        error ->
+            {error, unwrap_failed};
+        DEK ->
+            {ok, DEK}
+    end;
+unwrap_key(_KeyID, _KEK, _) ->
+    {error, malformed_wrapped_key}.
diff --git a/src/couch/test/eunit/couch_file_tests.erl b/src/couch/test/eunit/couch_file_tests.erl
index 1b54cd7..30bd4f6 100644
--- a/src/couch/test/eunit/couch_file_tests.erl
+++ b/src/couch/test/eunit/couch_file_tests.erl
@@ -551,3 +551,165 @@
         {'$gen_call', From, sync} ->
             gen:reply(From, {error, eio})
     end.
+
+%% re-run above functions with encryption enabled
+
+encrypted_setup() ->
+    {ok, Fd} = couch_file:open(?tempfile(), [create, overwrite, {db_name, <<"foo">>}]),
+    Fd.
+
+-define(encrypted_foreach(Fs), {foreach, fun encrypted_setup/0, fun teardown/1, Fs}).
+
+encryption_read_write_test_() ->
+    {
+        "Common file read/write tests (encryption enabled)",
+        {
+            setup,
+            fun() ->
+                Ctx = test_util:start(?MODULE),
+                meck:expect(config, get, fun("encryption", "manager") -> "couch_encryption_manager_demo"; (_, _) -> undefined end),
+                Ctx
+            end,
+            fun(Ctx) ->
+                test_util:stop(Ctx)
+            end,
+            ?encrypted_foreach([
+                fun should_increase_file_size_on_write/1,
+                fun should_return_current_file_size_on_write/1,
+                fun should_write_and_read_term/1,
+                fun should_write_and_read_binary/1,
+                fun should_write_and_read_large_binary/1,
+                fun should_return_term_as_binary_for_reading_binary/1,
+                fun should_read_term_written_as_binary/1,
+                fun should_read_iolist/1,
+                fun should_fsync/1,
+                fun should_not_read_beyond_eof/1,
+                fun should_truncate/1
+            ])
+        }
+    }.
+
+encryption_pread_limit_test_() ->
+    {
+        "Read limit tests (encryption enabled)",
+        {
+            setup,
+            fun() ->
+                Ctx = test_util:start(?MODULE),
+                meck:expect(config, get, fun("encryption", "manager") -> "couch_encryption_manager_demo"; (_, _) -> undefined end),
+                config:set("couchdb", "max_pread_size", "50000"),
+                Ctx
+            end,
+            fun(Ctx) ->
+                config:delete("couchdb", "max_pread_size"),
+                test_util:stop(Ctx)
+            end,
+            ?encrypted_foreach([
+                fun should_increase_file_size_on_write/1,
+                fun should_return_current_file_size_on_write/1,
+                fun should_write_and_read_term/1,
+                fun should_write_and_read_binary/1,
+                fun should_not_read_more_than_pread_limit/1
+            ])
+        }
+    }.
+
+
+encryption_header_test_() ->
+    {
+        "File header read/write tests  (encryption enabled)",
+        {
+            setup,
+            fun() ->
+                Ctx = test_util:start(?MODULE),
+                meck:expect(config, get, fun("encryption", "manager") -> "couch_encryption_manager_demo"; (_, _) -> undefined end),
+                Ctx
+            end,
+            fun(Ctx) ->
+                test_util:stop(Ctx)
+            end,
+            [
+                ?encrypted_foreach([
+                    fun should_write_and_read_atom_header/1,
+                    fun should_write_and_read_tuple_header/1,
+                    fun should_write_and_read_second_header/1,
+                    fun should_truncate_second_header/1,
+                    fun should_produce_same_file_size_on_rewrite/1,
+                    fun should_save_headers_larger_than_block_size/1
+                ]),
+                should_recover_header_marker_corruption(),
+                should_recover_header_size_corruption(),
+                should_recover_header_md5sig_corruption(),
+                should_recover_header_data_corruption()
+            ]
+        }
+    }.
+
+%% encryption-specific tests
+
+encrypted_setup_with_path() ->
+    Path = ?tempfile(),
+    {ok, Fd} = couch_file:open(Path, [create, overwrite, {db_name, <<"foo">>}]),
+    {Path, Fd}.
+
+teardown_with_path({_Path, Fd}) ->
+    case is_process_alive(Fd) of
+        true -> ok = couch_file:close(Fd);
+        false -> ok
+    end.
+
+-define(encrypted_foreach_with_path(Fs), {foreach, fun encrypted_setup_with_path/0, fun teardown_with_path/1, Fs}).
+
+
+encryption_test_() ->
+    {
+        "couch_file encryption tests",
+        {
+            setup,
+            fun() ->
+                Ctx = test_util:start(?MODULE),
+                meck:expect(config, get, fun("encryption", "manager") -> "couch_encryption_manager_demo"; (_, _) -> undefined end),
+                Ctx
+            end,
+            fun(Ctx) ->
+                test_util:stop(Ctx)
+            end,
+            ?encrypted_foreach_with_path([
+                fun file_is_encrypted/1,
+                fun recover_from_corrupted_primary_header/1,
+                fun recover_from_corrupted_secondary_header/1,
+                fun cant_recover_if_both_headers_corrupted/1
+            ])
+        }
+    }.
+
+file_is_encrypted({_Path, Fd}) ->
+    ?_assertEqual({ok, 4096}, couch_file:bytes(Fd)).
+
+recover_from_corrupted_primary_header({Path, Fd0}) ->
+    ok = couch_file:close(Fd0),
+
+    {ok, RawFd} = file:open(Path, [read, write, raw, binary]),
+    ok = file:pwrite(RawFd, 1024, <<"bleh">>),
+    file:close(RawFd),
+
+    ?_assertMatch({ok, _Fd1}, couch_file:open(Path, [])).
+
+recover_from_corrupted_secondary_header({Path, Fd0}) ->
+    ok = couch_file:close(Fd0),
+
+    {ok, RawFd} = file:open(Path, [read, write, raw, binary]),
+    ok = file:pwrite(RawFd, 3072, <<"bleh">>),
+    file:close(RawFd),
+
+    ?_assertMatch({ok, _Fd1}, couch_file:open(Path, [])).
+
+cant_recover_if_both_headers_corrupted({Path, Fd0}) ->
+    ok = couch_file:close(Fd0),
+
+    {ok, RawFd} = file:open(Path, [read, write, raw, binary]),
+    ok = file:pwrite(RawFd, 1024, <<"bleh">>),
+    ok = file:pwrite(RawFd, 3072, <<"bleh">>),
+    file:close(RawFd),
+
+    ?_assertMatch({error, corrupted_encryption_header}, couch_file:open(Path, [])).
diff --git a/src/couch_mrview/src/couch_mrview_compactor.erl b/src/couch_mrview/src/couch_mrview_compactor.erl
index 28e5a9b..5471b12 100644
--- a/src/couch_mrview/src/couch_mrview_compactor.erl
+++ b/src/couch_mrview/src/couch_mrview_compactor.erl
@@ -47,7 +47,7 @@
 
     {EmptyState, NumDocIds} = couch_util:with_db(DbName, fun(Db) ->
         CompactFName = couch_mrview_util:compaction_file(DbName, Sig),
-        {ok, Fd} = couch_mrview_util:open_file(CompactFName),
+        {ok, Fd} = couch_mrview_util:open_file(CompactFName, [{db_name, DbName}]),
         ESt = couch_mrview_util:reset_index(Db, Fd, State),
 
         {ok, Count} = couch_db:get_doc_count(Db),
diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl
index 1bfdb28..b4528c2 100644
--- a/src/couch_mrview/src/couch_mrview_index.erl
+++ b/src/couch_mrview/src/couch_mrview_index.erl
@@ -120,7 +120,7 @@
 
     OldSig = couch_mrview_util:maybe_update_index_file(State),
 
-    case couch_mrview_util:open_file(IndexFName) of
+    case couch_mrview_util:open_file(IndexFName, [{db_name, DbName}]) of
         {ok, Fd} ->
             case couch_file:read_header(Fd) of
                 % upgrade code for <= 2.x
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index 9e3d292..f9c5333 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -17,7 +17,7 @@
 -export([verify_view_filename/1, get_signature_from_filename/1]).
 -export([ddoc_to_mrst/2, init_state/4, reset_index/3]).
 -export([make_header/1]).
--export([index_file/2, compaction_file/2, open_file/1]).
+-export([index_file/2, compaction_file/2, open_file/2]).
 -export([delete_files/2, delete_index_file/2, delete_compaction_file/2]).
 -export([get_row_count/1, all_docs_reduce_to_count/1, reduce_to_count/1]).
 -export([all_docs_key_opts/1, all_docs_key_opts/2, key_opts/1, key_opts/2]).
@@ -791,10 +791,10 @@
     FileName = couch_index_util:hexsig(Sig) ++ ".compact.view",
     couch_index_util:index_file(mrview, DbName, FileName).
 
-open_file(FName) ->
-    case couch_file:open(FName, [nologifmissing]) of
+open_file(FName, Options) ->
+    case couch_file:open(FName, [nologifmissing | Options]) of
         {ok, Fd} -> {ok, Fd};
-        {error, enoent} -> couch_file:open(FName, [create]);
+        {error, enoent} -> couch_file:open(FName, [create | Options]);
         Error -> Error
     end.