Merge pull request #2690 from apache/fix-create-db-options

Fix create db options on secondary shard creation
diff --git a/src/couch/include/couch_eunit.hrl b/src/couch/include/couch_eunit.hrl
index d3611c8..1885248 100644
--- a/src/couch/include/couch_eunit.hrl
+++ b/src/couch/include/couch_eunit.hrl
@@ -49,6 +49,11 @@
         Suffix = couch_uuids:random(),
         iolist_to_binary(["eunit-test-db-", Suffix])
     end).
+-define(tempshard,
+    fun() ->
+        Suffix = couch_uuids:random(),
+        iolist_to_binary(["shards/80000000-ffffffff/eunit-test-db-", Suffix])
+    end).
 -define(docid,
     fun() ->
         integer_to_list(couch_util:unique_monotonic_integer())
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index a67dcd1..85da3ff 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -439,7 +439,7 @@
 
 
 get_or_create_db(DbName, Options) ->
-    couch_db:open_int(DbName, [{create_if_missing, true} | Options]).
+    mem3_util:get_or_create_db(DbName, Options).
 
 
 get_view_cb(#mrargs{extra = Options}) ->
diff --git a/src/fabric/test/eunit/fabric_rpc_tests.erl b/src/fabric/test/eunit/fabric_rpc_tests.erl
new file mode 100644
index 0000000..b94caf6
--- /dev/null
+++ b/src/fabric/test/eunit/fabric_rpc_tests.erl
@@ -0,0 +1,181 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_rpc_tests).
+
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(TDEF(A), {A, fun A/1}).
+
+
+main_test_() ->
+    {
+        setup,
+        spawn,
+        fun setup_all/0,
+        fun teardown_all/1,
+        [
+            {
+                foreach,
+                fun setup_no_db_or_config/0,
+                fun teardown_db/1,
+                lists:map(fun wrap/1, [
+                    ?TDEF(t_no_config_non_shard_db_create_succeeds)
+                ])
+            },
+            {
+                foreach,
+                fun setup_shard/0,
+                fun teardown_noop/1,
+                lists:map(fun wrap/1, [
+                    ?TDEF(t_no_db),
+                    ?TDEF(t_no_config_db_create_fails_for_shard),
+                    ?TDEF(t_no_config_db_create_fails_for_shard_rpc)
+                ])
+            },
+            {
+                foreach,
+                fun setup_shard/0,
+                fun teardown_db/1,
+                lists:map(fun wrap/1, [
+                    ?TDEF(t_db_create_with_config)
+                ])
+            }
+
+        ]
+    }.
+
+
+setup_all() ->
+    test_util:start_couch([rexi, mem3, fabric]).
+
+
+teardown_all(Ctx) ->
+    test_util:stop_couch(Ctx).
+
+
+setup_no_db_or_config() ->
+    ?tempdb().
+
+
+setup_shard() ->
+    ?tempshard().
+
+
+teardown_noop(_DbName) ->
+    ok.
+
+teardown_db(DbName) ->
+    ok = couch_server:delete(DbName, []).
+
+
+wrap({Name, Fun}) ->
+    fun(Arg) ->
+        {timeout, 60, {atom_to_list(Name), fun() ->
+            process_flag(trap_exit, true),
+            Fun(Arg)
+        end}}
+    end.
+
+
+t_no_db(DbName) ->
+    ?assertEqual({not_found, no_db_file}, couch_db:open_int(DbName, [?ADMIN_CTX])).
+
+
+t_no_config_non_shard_db_create_succeeds(DbName) ->
+    ?assertEqual({not_found, no_db_file}, couch_db:open_int(DbName, [?ADMIN_CTX])),
+    ?assertEqual(DbName, mem3:dbname(DbName)),
+    ?assertMatch({ok, _}, mem3_util:get_or_create_db(DbName, [?ADMIN_CTX])).
+
+
+t_no_config_db_create_fails_for_shard(DbName) ->
+    ?assertEqual({not_found, no_db_file}, couch_db:open_int(DbName, [?ADMIN_CTX])),
+    ?assertException(throw, {error, missing_target}, mem3_util:get_or_create_db(DbName, [?ADMIN_CTX])).
+
+
+t_no_config_db_create_fails_for_shard_rpc(DbName) ->
+    ?assertEqual({not_found, no_db_file}, couch_db:open_int(DbName, [?ADMIN_CTX])),
+    ?assertException(throw, {error, missing_target}, mem3_util:get_or_create_db(DbName, [?ADMIN_CTX])),
+    MFA = {fabric_rpc, get_db_info, [DbName]},
+    Ref = rexi:cast(node(), self(), MFA),
+    Resp = receive
+        Resp0 -> Resp0
+    end,
+    ?assertMatch({Ref, {'rexi_EXIT', {{error, missing_target}, _}}}, Resp).
+
+
+t_db_create_with_config(DbName) ->
+    MDbName = mem3:dbname(DbName),
+    DbDoc = #doc{id = MDbName, body = test_db_doc()},
+
+    ?assertEqual({not_found, no_db_file}, couch_db:open_int(DbName, [?ADMIN_CTX])),
+
+    %% Write the dbs db config
+    couch_util:with_db(mem3_sync:shards_db(), fun(Db) ->
+        ?assertEqual({not_found, missing}, couch_db:open_doc(Db, MDbName, [ejson_body])),
+        ?assertMatch({ok, _}, couch_db:update_docs(Db, [DbDoc]))
+    end),
+
+    %% Test get_or_create_db loads the properties as expected
+    couch_util:with_db(mem3_sync:shards_db(), fun(Db) ->
+        ?assertMatch({ok, _}, couch_db:open_doc(Db, MDbName, [ejson_body])),
+        ?assertEqual({not_found, no_db_file}, couch_db:open_int(DbName, [?ADMIN_CTX])),
+        Resp = mem3_util:get_or_create_db(DbName, [?ADMIN_CTX]),
+        ?assertMatch({ok, _}, Resp),
+        {ok, LDb} = Resp,
+
+        {Body} = test_db_doc(),
+        DbProps = mem3_util:get_shard_opts(Body),
+        {Props} = case couch_db_engine:get_props(LDb) of
+            undefined -> {[]};
+            Else -> {Else}
+        end,
+        %% We don't normally store the default engine name
+        EngineProps = case couch_db_engine:get_engine(LDb) of
+            couch_bt_engine ->
+                [];
+            EngineName ->
+                [{engine, EngineName}]
+        end,
+        ?assertEqual([{props, Props} | EngineProps], DbProps)
+    end).
+
+
+test_db_doc() ->
+    {[
+        {<<"shard_suffix">>, ".1584997648"},
+        {<<"changelog">>, [
+            [<<"add">>, <<"00000000-7fffffff">>, <<"node1@127.0.0.1">>],
+            [<<"add">>, <<"00000000-7fffffff">>, <<"node2@127.0.0.1">>],
+            [<<"add">>, <<"00000000-7fffffff">>, <<"node3@127.0.0.1">>],
+            [<<"add">>, <<"80000000-ffffffff">>, <<"node1@127.0.0.1">>],
+            [<<"add">>, <<"80000000-ffffffff">>, <<"node2@127.0.0.1">>],
+            [<<"add">>, <<"80000000-ffffffff">>, <<"node3@127.0.0.1">>]
+        ]},
+        {<<"by_node">>, {[
+            {<<"node1@127.0.0.1">>, [<<"00000000-7fffffff">>, <<"80000000-ffffffff">>]},
+            {<<"node2@127.0.0.1">>, [<<"00000000-7fffffff">>, <<"80000000-ffffffff">>]},
+            {<<"node3@127.0.0.1">>, [<<"00000000-7fffffff">>, <<"80000000-ffffffff">>]}
+        ]}},
+        {<<"by_range">>, {[
+            {<<"00000000-7fffffff">>, [<<"node1@127.0.0.1">>, <<"node2@127.0.0.1">>, <<"node3@127.0.0.1">>]},
+            {<<"80000000-ffffffff">>, [<<"node1@127.0.0.1">>, <<"node2@127.0.0.1">>, <<"node3@127.0.0.1">>]}
+        ]}},
+        {<<"props">>, {[
+            {partitioned, true},
+            {hash, [couch_partition, hash, []]}
+        ]}}
+    ]}.
+
diff --git a/src/mem3/src/mem3_reshard_dbdoc.erl b/src/mem3/src/mem3_reshard_dbdoc.erl
index 7eb3e9f..4a0a35c 100644
--- a/src/mem3/src/mem3_reshard_dbdoc.erl
+++ b/src/mem3/src/mem3_reshard_dbdoc.erl
@@ -146,9 +146,8 @@
 
 
 write_shard_doc(#doc{id = Id} = Doc, Body) ->
-    DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
     UpdatedDoc = Doc#doc{body = Body},
-    couch_util:with_db(DbName, fun(Db) ->
+    couch_util:with_db(mem3_sync:shards_db(), fun(Db) ->
         try
             {ok, _} = couch_db:update_doc(Db, UpdatedDoc, [])
         catch
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl
index 0991aa7..5d1c62c 100644
--- a/src/mem3/src/mem3_rpc.erl
+++ b/src/mem3/src/mem3_rpc.erl
@@ -401,7 +401,7 @@
 
 
 get_or_create_db(DbName, Options) ->
-    couch_db:open_int(DbName, [{create_if_missing, true} | Options]).
+    mem3_util:get_or_create_db(DbName, Options).
 
 
 -ifdef(TEST).
diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl
index 110e227..4f33237 100644
--- a/src/mem3/src/mem3_shards.erl
+++ b/src/mem3/src/mem3_shards.erl
@@ -20,6 +20,7 @@
 -export([handle_config_change/5, handle_config_terminate/3]).
 
 -export([start_link/0]).
+-export([opts_for_db/1]).
 -export([for_db/1, for_db/2, for_docid/2, for_docid/3, get/3, local/1, fold/2]).
 -export([for_shard_range/1]).
 -export([set_max_size/1]).
@@ -45,6 +46,15 @@
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
+opts_for_db(DbName) ->
+    {ok, Db} = mem3_util:ensure_exists(mem3_sync:shards_db()),
+    case couch_db:open_doc(Db, DbName, [ejson_body]) of
+        {ok, #doc{body = {Props}}} ->
+            mem3_util:get_shard_opts(Props);
+        {not_found, _} ->
+            erlang:error(database_does_not_exist, ?b2l(DbName))
+    end.
+
 for_db(DbName) ->
     for_db(DbName, []).
 
@@ -144,8 +154,7 @@
     lists:filter(Pred, for_db(DbName)).
 
 fold(Fun, Acc) ->
-    DbName = config:get("mem3", "shards_db", "_dbs"),
-    {ok, Db} = mem3_util:ensure_exists(DbName),
+    {ok, Db} = mem3_util:ensure_exists(mem3_sync:shards_db()),
     FAcc = {Db, Fun, Acc},
     try
         {ok, LastAcc} = couch_db:fold_docs(Db, fun fold_fun/2, FAcc),
@@ -309,15 +318,13 @@
     end.
 
 get_update_seq() ->
-    DbName = config:get("mem3", "shards_db", "_dbs"),
-    {ok, Db} = mem3_util:ensure_exists(DbName),
+    {ok, Db} = mem3_util:ensure_exists(mem3_sync:shards_db()),
     Seq = couch_db:get_update_seq(Db),
     couch_db:close(Db),
     Seq.
 
 listen_for_changes(Since) ->
-    DbName = config:get("mem3", "shards_db", "_dbs"),
-    {ok, Db} = mem3_util:ensure_exists(DbName),
+    {ok, Db} = mem3_util:ensure_exists(mem3_sync:shards_db()),
     Args = #changes_args{
         feed = "continuous",
         since = Since,
@@ -362,8 +369,7 @@
 
 load_shards_from_disk(DbName) when is_binary(DbName) ->
     couch_stats:increment_counter([mem3, shard_cache, miss]),
-    X = ?l2b(config:get("mem3", "shards_db", "_dbs")),
-    {ok, Db} = mem3_util:ensure_exists(X),
+    {ok, Db} = mem3_util:ensure_exists(mem3_sync:shards_db()),
     try
         load_shards_from_db(Db, DbName)
     after
diff --git a/src/mem3/src/mem3_util.erl b/src/mem3/src/mem3_util.erl
index 3fc9b4f..28cb177 100644
--- a/src/mem3/src/mem3_util.erl
+++ b/src/mem3/src/mem3_util.erl
@@ -14,8 +14,10 @@
 
 -export([name_shard/2, create_partition_map/5, build_shards/2,
     n_val/2, q_val/1, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1,
-    shard_info/1, ensure_exists/1, open_db_doc/1]).
+    shard_info/1, ensure_exists/1, open_db_doc/1, get_or_create_db/2]).
 -export([is_deleted/1, rotate_list/2]).
+-export([get_shard_opts/1, get_engine_opt/1, get_props_opt/1]).
+-export([get_shard_props/1, find_dirty_shards/0]).
 -export([
     iso8601_timestamp/0,
     live_nodes/0,
@@ -87,13 +89,11 @@
     attach_nodes(Rest, [S#shard{node=Node} | Acc], Nodes, [Node | UsedNodes]).
 
 open_db_doc(DocId) ->
-    DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
-    {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]),
+    {ok, Db} = couch_db:open(mem3_sync:shards_db(), [?ADMIN_CTX]),
     try couch_db:open_doc(Db, DocId, [ejson_body]) after couch_db:close(Db) end.
 
 write_db_doc(Doc) ->
-    DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
-    write_db_doc(DbName, Doc, true).
+    write_db_doc(mem3_sync:shards_db(), Doc, true).
 
 write_db_doc(DbName, #doc{id=Id, body=Body} = Doc, ShouldMutate) ->
     {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]),
@@ -118,8 +118,7 @@
 
 delete_db_doc(DocId) ->
     gen_server:cast(mem3_shards, {cache_remove, DocId}),
-    DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
-    delete_db_doc(DbName, DocId, true).
+    delete_db_doc(mem3_sync:shards_db(), DocId, true).
 
 delete_db_doc(DbName, DocId, ShouldMutate) ->
     {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]),
@@ -324,7 +323,7 @@
 % which could be a while.
 %
 replicate_dbs_to_all_nodes(Timeout) ->
-    DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
+    DbName = mem3_sync:shards_db(),
     Targets= mem3_util:live_nodes() -- [node()],
     Res =  [start_replication(node(), T, DbName, Timeout) || T <- Targets],
     collect_replication_results(Res, Timeout).
@@ -335,7 +334,7 @@
 % them until they are all done.
 %
 replicate_dbs_from_all_nodes(Timeout) ->
-    DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
+    DbName = mem3_sync:shards_db(),
     Sources = mem3_util:live_nodes() -- [node()],
     Res = [start_replication(S, node(), DbName, Timeout) || S <- Sources],
     collect_replication_results(Res, Timeout).
@@ -509,6 +508,75 @@
     B1 =< B2.
 
 
+get_or_create_db(DbName, Options) ->
+    case couch_db:open_int(DbName, Options) of
+        {ok, _} = OkDb ->
+            OkDb;
+        {not_found, no_db_file} ->
+            try
+                DbOpts = case mem3:dbname(DbName) of
+                    DbName  -> [];
+                    MDbName -> mem3_shards:opts_for_db(MDbName)
+                end,
+                Options1 = [{create_if_missing, true} | Options],
+                Options2 = merge_opts(DbOpts, Options1),
+                couch_db:open_int(DbName, Options2)
+            catch error:database_does_not_exist ->
+                throw({error, missing_target})
+            end;
+        Else ->
+            Else
+    end.
+
+
+%% merge two proplists, atom options only valid in Old
+merge_opts(New, Old) ->
+    lists:foldl(fun({Key, Val}, Acc) ->
+        lists:keystore(Key, 1, Acc, {Key, Val})
+    end, Old, New).
+
+
+get_shard_props(ShardName) ->
+    case couch_db:open_int(ShardName, []) of
+        {ok, Db} ->
+            Props = case couch_db_engine:get_props(Db) of
+                undefined -> [];
+                Else -> Else
+            end,
+            %% We don't normally store the default engine name
+            EngineProps = case couch_db_engine:get_engine(Db) of
+                couch_bt_engine ->
+                    [];
+                EngineName ->
+                    [{engine, EngineName}]
+            end,
+            [{props, Props} | EngineProps];
+        {not_found, _} ->
+            not_found;
+        Else ->
+            Else
+    end.
+
+
+find_dirty_shards() ->
+    mem3_shards:fold(fun(#shard{node=Node, name=Name, opts=Opts}=Shard, Acc) ->
+        case Opts of
+            [] ->
+                Acc;
+            [{props, []}] ->
+                Acc;
+            _ ->
+                Props = rpc:call(Node, ?MODULE, get_shard_props, [Name]),
+                case Props =:= Opts of
+                    true ->
+                        Acc;
+                    false ->
+                        [{Shard, Props} | Acc]
+                end
+        end
+    end, []).
+
+
 -ifdef(TEST).
 
 -include_lib("eunit/include/eunit.hrl").