Improve couch_proc_manager

The main improvement is speeding up process lookup. This should result in
improved latency for concurrent requests which quickly acquire and
release couchjs processes. Testing with concurrent vdu and map/reduce calls
showed a 1.6 -> 6x performance speedup [1].

Previously, couch_proc_manager linearly searched through all the processes and
executed a custom callback function for each to match design doc IDs. Instead,
use a separate ets table index for idle processes to avoid scanning assigned
processes.

Use a db tag in addition to a ddoc id to quickly find idle processes. This could
improve performance, but if that's not the case, allow configuring the tagging
scheme to use a db prefix only, or disable the scheme altogether.

Use the new `map_get` ets select guard [2] to perform ddoc id lookups during
the ets select traversal without a custom matcher callback.

In ordered ets tables use the partially bound key trick [3]. This helps skip
scanning processes using a different query language altogether.

Waiting clients used `os:timestamp/0` as a unique client identifier. It turns
out, `os:timestamp/0` is not guaranteed to be unique and could result in some
clients never getting a response. This bug was mostly likely the reason the
"fifo client order" test had to be commented out. Fix the issue by using a
newer monotonic timestamp function, and for uniqueness add the client's
gen_server return tag at the end. Uncomment the previously commented out test
so it can hopefully run again.

When clients tag a previously untagged process, asynchronously replace the
untagged process with a new process. This happens in the background and the
client doesn't have to wait for it.

When a ddoc tagged process cannot be found, before giving up, stop the oldest
unused ddoc processes to allow spawning new fresh ones. To avoid doing a linear
scan here, keep a separate `?IDLE_ACCESS` index with an ordered list of idle
ddoc proceses sorted by their last usage time.

When processes are returned to the pool, quickly respond to the client with an
early return, instead of forcing them to wait until we re-insert the process
back into the idle ets table. This should improve client latency.

If the waiting client list gets long enough, where it waits longer than the
gen_server get_proc timeout, do not waste time assigning or spawning a new
process for that client, since it already timed-out.

When gathering stats, avoid making gen_server calls, at least for the total
number of processes spawned metric. Table sizes can be easily computed with
`ets:info(Table, size)` from outside the main process.

In addition to peformance improvements clean up the couch_proc_manager API by
forcing all the calls to go through properly exported functions instead of
doing direct gen_server calls.

Remove `#proc_int{}` and use only `#proc{}`. The cast to a list/tuple between
`#proc_int{}` and `#proc{}` was dangerous and it avoided the compiler checking
that we're using the proper fields. Adding an extra field to the record
resulted in mis-matched fields being assigned.

To simplify the code a bit, keep the per-language count in an ets table. This
helps not having to thread the old and updated state everywhere. Everything
else was mostly kept in ets tables anyway, so we're staying consistent with
that general pattern.

Improve test coverage and convert the tests to use the `?TDEF_FE` macro so
there is no need for the awkward `?_test(begin ... end)` construct.

[1] https://gist.github.com/nickva/f088accc958f993235e465b9591e5fac
[2] https://www.erlang.org/doc/apps/erts/match_spec.html
[3] https://www.erlang.org/doc/man/ets.html#table-traversal
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 93aa1ca..9c91679 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -382,6 +382,13 @@
 ;query_limit = 268435456
 ;partition_query_limit = 268435456
 
+; Configure what to use as the db tag when selecting design doc couchjs
+; processes. The choices are:
+;  - name (default) : Use the entire db name
+;  - prefix : Use only db prefix before the first "/" character
+;  - none : Do not use a db tag at all
+;db_tag = name
+
 [mango]
 ; Set to true to disable the "index all fields" text index, which can lead
 ; to out of memory issues when users have documents with nested array fields.
diff --git a/src/chttpd/src/chttpd_show.erl b/src/chttpd/src/chttpd_show.erl
index c2c37c6..9391fa4 100644
--- a/src/chttpd/src/chttpd_show.erl
+++ b/src/chttpd/src/chttpd_show.erl
@@ -79,7 +79,7 @@
         JsonReq = chttpd_external:json_req_obj(Req, Db, DocId),
         JsonDoc = couch_query_servers:json_doc(Doc),
         [<<"resp">>, ExternalResp] =
-            couch_query_servers:ddoc_prompt(DDoc, [<<"shows">>, ShowName],
+            couch_query_servers:ddoc_prompt(Db, DDoc, [<<"shows">>, ShowName],
                 [JsonDoc, JsonReq]),
         JsonResp = apply_etag(ExternalResp, CurrentEtag),
         chttpd_external:send_external_response(Req, JsonResp)
@@ -124,7 +124,7 @@
     JsonDoc = couch_query_servers:json_doc(Doc),
     Cmd = [<<"updates">>, UpdateName],
     W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
-    UpdateResp = couch_query_servers:ddoc_prompt(DDoc, Cmd, [JsonDoc, JsonReq]),
+    UpdateResp = couch_query_servers:ddoc_prompt(Db, DDoc, Cmd, [JsonDoc, JsonReq]),
     JsonResp = case UpdateResp of
         [<<"up">>, {NewJsonDoc}, {JsonResp0}] ->
             case chttpd:header_value(Req, "X-Couch-Full-Commit", "false") of
@@ -204,7 +204,7 @@
     CB = fun list_cb/2,
     QueryArgs = couch_mrview_http:parse_body_and_query(Req, Keys),
     Options = [{user_ctx, Req#httpd.user_ctx}],
-    couch_query_servers:with_ddoc_proc(DDoc, fun(QServer) ->
+    couch_query_servers:with_ddoc_proc(Db, DDoc, fun(QServer) ->
         Acc = #lacc{
             lname = LName,
             req = Req,
diff --git a/src/couch/include/couch_db.hrl b/src/couch/include/couch_db.hrl
index 019c205..a15c650 100644
--- a/src/couch/include/couch_db.hrl
+++ b/src/couch/include/couch_db.hrl
@@ -189,11 +189,14 @@
 -record(proc, {
     pid,
     lang,
-    client = nil,
-    ddoc_keys = [],
+    client,
+    db_key,
+    ddoc_keys = #{},
     prompt_fun,
     set_timeout_fun,
-    stop_fun
+    stop_fun,
+    threshold_ts,
+    last_use_ts
 }).
 
 -record(leaf,  {
diff --git a/src/couch/src/couch_changes.erl b/src/couch/src/couch_changes.erl
index 2078fed..2f0c545 100644
--- a/src/couch/src/couch_changes.erl
+++ b/src/couch/src/couch_changes.erl
@@ -233,7 +233,7 @@
     end;
 filter(Db, DocInfo, {view, Style, DDoc, VName}) ->
     Docs = open_revs(Db, DocInfo, Style),
-    {ok, Passes} = couch_query_servers:filter_view(DDoc, VName, Docs),
+    {ok, Passes} = couch_query_servers:filter_view(Db, DDoc, VName, Docs),
     filter_revs(Passes, Docs);
 filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) ->
     Req = case Req0 of
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index fdcf23e..333ed36 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -950,7 +950,7 @@
     end,
     DDocs = lists:map(OpenDocs, DDocInfos),
     Funs = lists:flatmap(fun(DDoc) ->
-        case couch_doc:get_validate_doc_fun(DDoc) of
+        case couch_doc:get_validate_doc_fun(Db, DDoc) of
             nil -> [];
             Fun -> [Fun]
         end
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 535acfa..169fbc1 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -330,7 +330,7 @@
         fun(DesignDocInfo) ->
             {ok, DesignDoc} = couch_db:open_doc_int(
                 Db, DesignDocInfo, [ejson_body]),
-            case couch_doc:get_validate_doc_fun(DesignDoc) of
+            case couch_doc:get_validate_doc_fun(Db, DesignDoc) of
             nil -> [];
             Fun -> [Fun]
             end
diff --git a/src/couch/src/couch_doc.erl b/src/couch/src/couch_doc.erl
index ec16d21..e9265e7 100644
--- a/src/couch/src/couch_doc.erl
+++ b/src/couch/src/couch_doc.erl
@@ -16,7 +16,7 @@
 -export([from_json_obj/1, from_json_obj_validate/1]).
 -export([from_json_obj/2, from_json_obj_validate/2]).
 -export([to_json_obj/2, has_stubs/1, merge_stubs/2]).
--export([validate_docid/1, validate_docid/2, get_validate_doc_fun/1]).
+-export([validate_docid/1, validate_docid/2, get_validate_doc_fun/2]).
 -export([doc_from_multi_part_stream/2, doc_from_multi_part_stream/3]).
 -export([doc_from_multi_part_stream/4]).
 -export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]).
@@ -400,15 +400,15 @@
     end.
 
 
-get_validate_doc_fun({Props}) ->
-    get_validate_doc_fun(couch_doc:from_json_obj({Props}));
-get_validate_doc_fun(#doc{body={Props}}=DDoc) ->
+get_validate_doc_fun(Db, {Props}) ->
+    get_validate_doc_fun(Db, couch_doc:from_json_obj({Props}));
+get_validate_doc_fun(Db, #doc{body={Props}}=DDoc) ->
     case couch_util:get_value(<<"validate_doc_update">>, Props) of
     undefined ->
         nil;
     _Else ->
         fun(EditDoc, DiskDoc, Ctx, SecObj) ->
-            couch_query_servers:validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj)
+            couch_query_servers:validate_doc_update(Db, DDoc, EditDoc, DiskDoc, Ctx, SecObj)
         end
     end.
 
diff --git a/src/couch/src/couch_native_process.erl b/src/couch/src/couch_native_process.erl
index eee8b28..20378bf 100644
--- a/src/couch/src/couch_native_process.erl
+++ b/src/couch/src/couch_native_process.erl
@@ -115,7 +115,7 @@
     {noreply, State, State#evstate.idle}.
 
 handle_info(timeout, State) ->
-    gen_server:cast(couch_proc_manager, {os_proc_idle, self()}),
+    couch_proc_manager:os_proc_idle(self()),
     erlang:garbage_collect(),
     {noreply, State, State#evstate.idle};
 handle_info({'EXIT',_,normal}, State) ->
diff --git a/src/couch/src/couch_os_process.erl b/src/couch/src/couch_os_process.erl
index 63a2414..675f105 100644
--- a/src/couch/src/couch_os_process.erl
+++ b/src/couch/src/couch_os_process.erl
@@ -229,7 +229,7 @@
     {noreply, OsProc, Idle}.
 
 handle_info(timeout, #os_proc{idle=Idle}=OsProc) ->
-    gen_server:cast(couch_proc_manager, {os_proc_idle, self()}),
+    couch_proc_manager:os_proc_idle(self()),
     erlang:garbage_collect(),
     {noreply, OsProc, Idle};
 handle_info({Port, {exit_status, 0}}, #os_proc{port=Port}=OsProc) ->
diff --git a/src/couch/src/couch_proc_manager.erl b/src/couch/src/couch_proc_manager.erl
index e7a25a6..42c8958 100644
--- a/src/couch/src/couch_proc_manager.erl
+++ b/src/couch/src/couch_proc_manager.erl
@@ -17,6 +17,10 @@
 
 -export([
     start_link/0,
+    get_proc/3,
+    get_proc/1,
+    ret_proc/1,
+    os_proc_idle/1,
     get_proc_count/0,
     get_stale_proc_count/0,
     new_proc/1,
@@ -45,11 +49,13 @@
 -define(WAITERS, couch_proc_manager_waiters).
 -define(OPENING, couch_proc_manager_opening).
 -define(SERVERS, couch_proc_manager_servers).
+-define(COUNTERS, couch_proc_manager_counters).
+-define(IDLE_BY_DB, couch_proc_manager_idle_by_db).
+-define(IDLE_ACCESS, couch_proc_manager_idle_access).
 -define(RELISTEN_DELAY, 5000).
 
 -record(state, {
     config,
-    counts,
     threshold_ts,
     hard_limit,
     soft_limit
@@ -59,31 +65,44 @@
 -type revision() :: {integer(), binary()}.
 
 -record(client, {
-    timestamp :: os:timestamp() | '_',
-    from :: undefined | {pid(), reference()}  | '_',
+    wait_key :: {binary(), integer(), gen_server:reply_tag()} | '_',
+    from :: undefined | {pid(), gen_server:reply_tag()} | '_',
     lang :: binary() | '_',
-    ddoc :: #doc{} | '_',
+    ddoc :: undefined | #doc{} | '_',
+    db_key :: undefined | binary(),
     ddoc_key :: undefined | {DDocId :: docid(), Rev :: revision()} | '_'
 }).
 
--record(proc_int, {
-    pid,
-    lang,
-    client,
-    ddoc_keys = [],
-    prompt_fun,
-    set_timeout_fun,
-    stop_fun,
-    t0 = os:timestamp()
-}).
-
 
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
+get_proc(#doc{body = {Props}} = DDoc, DbKey, {_DDocId, _Rev} = DDocKey) ->
+    LangStr = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
+    Lang = couch_util:to_binary(LangStr),
+    Client = #client{lang = Lang, ddoc = DDoc, db_key = DbKey, ddoc_key = DDocKey},
+    Timeout = get_os_process_timeout(),
+    gen_server:call(?MODULE, {get_proc, Client}, Timeout).
+
+get_proc(LangStr) ->
+    Lang = couch_util:to_binary(LangStr),
+    Client = #client{lang = Lang},
+    Timeout = get_os_process_timeout(),
+    gen_server:call(?MODULE, {get_proc, Client}, Timeout).
+
+ret_proc(#proc{} = Proc) ->
+    gen_server:call(?MODULE, {ret_proc, Proc}, infinity).
+
+os_proc_idle(Proc) when is_pid(Proc) ->
+    gen_server:cast(?MODULE, {os_proc_idle, Proc}).
 
 get_proc_count() ->
-    gen_server:call(?MODULE, get_proc_count).
+    try
+        ets:info(?PROCS, size) + ets:info(?OPENING, size)
+    catch
+        error:badarg ->
+            0
+    end.
 
 
 get_stale_proc_count() ->
@@ -102,11 +121,27 @@
     process_flag(trap_exit, true),
     ok = config:listen_for_changes(?MODULE, undefined),
 
-    TableOpts = [public, named_table, ordered_set],
-    ets:new(?PROCS, TableOpts ++ [{keypos, #proc_int.pid}]),
-    ets:new(?WAITERS, TableOpts ++ [{keypos, #client.timestamp}]),
-    ets:new(?OPENING, [public, named_table, set]),
-    ets:new(?SERVERS, [public, named_table, set]),
+    % Main process table. Pid -> #proc{}
+    ets:new(?PROCS, [named_table, {read_concurrency, true}, {keypos, #proc.pid}]),
+
+    % #client{} waiters ordered by {Lang, timestamp(), Ref}
+    ets:new(?WAITERS, [named_table, ordered_set, {keypos, #client.wait_key}]),
+
+    % Async process openers. Pid -> #client{}
+    ets:new(?OPENING, [named_table]),
+
+    % Configured language servers Lang -> Start MFA | Command
+    ets:new(?SERVERS, [named_table]),
+
+    % Idle Pids. Ordered to allow partial key lookups {Lang, DbKey, Pid} -> DDocs
+    ets:new(?IDLE_BY_DB, [named_table, ordered_set]),
+
+    % Idle Db tagged pids ordered by last use. {Lang, timestamp(), Pid} -> true
+    ets:new(?IDLE_ACCESS, [named_table, ordered_set]),
+
+    % Lang -> number of procs spawn for that lang
+    ets:new(?COUNTERS, [named_table]),
+
     ets:insert(?SERVERS, get_servers_from_env("COUCHDB_QUERY_SERVER_")),
     ets:insert(?SERVERS, get_servers_from_env("COUCHDB_NATIVE_QUERY_SERVER_")),
     ets:insert(?SERVERS, [{"QUERY", {mango_native_proc, start_link, []}}]),
@@ -114,99 +149,82 @@
 
     {ok, #state{
         config = get_proc_config(),
-        counts = dict:new(),
-        threshold_ts = os:timestamp(),
+        threshold_ts = timestamp(),
         hard_limit = get_hard_limit(),
         soft_limit = get_soft_limit()
     }}.
 
 
 terminate(_Reason, _State) ->
-    ets:foldl(fun(#proc_int{pid=P}, _) ->
-        couch_util:shutdown_sync(P)
-    end, 0, ?PROCS),
-    ok.
+    foreach_proc(fun(#proc{pid = P}) -> couch_util:shutdown_sync(P) end).
 
 
-handle_call(get_proc_count, _From, State) ->
-    NumProcs = ets:info(?PROCS, size),
-    NumOpening = ets:info(?OPENING, size),
-    {reply, NumProcs + NumOpening, State};
 
 handle_call(get_stale_proc_count, _From, State) ->
     #state{threshold_ts = T0} = State,
-    MatchSpec = [{#proc_int{t0='$1', _='_'}, [{'<', '$1', {T0}}], [true]}],
+    MatchSpec = [{#proc{threshold_ts = '$1', _ = '_'}, [{'<', '$1', T0}], [true]}],
     {reply, ets:select_count(?PROCS, MatchSpec), State};
+handle_call({get_proc, #client{} = Client}, From, State) ->
+    add_waiting_client(Client#client{from = From}),
+    ok = flush_waiters(State, Client#client.lang),
+    {noreply, State};
+handle_call({ret_proc, #proc{} = Proc}, From, State) ->
+    #proc{client = Ref, pid = Pid} = Proc,
 
-handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) ->
-    LangStr = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
-    Lang = couch_util:to_binary(LangStr),
-    Client = #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey},
-    add_waiting_client(Client),
-    {noreply, flush_waiters(State, Lang)};
 
-handle_call({get_proc, LangStr}, From, State) ->
-    Lang = couch_util:to_binary(LangStr),
-    Client = #client{from=From, lang=Lang},
-    add_waiting_client(Client),
-    {noreply, flush_waiters(State, Lang)};
-
-handle_call({ret_proc, #proc{client=Ref} = Proc}, _From, State) ->
     erlang:demonitor(Ref, [flush]),
-    NewState = case ets:lookup(?PROCS, Proc#proc.pid) of
-        [#proc_int{}=ProcInt] ->
-            return_proc(State, ProcInt);
+    gen_server:reply(From, true),
+    case ets:lookup(?PROCS, Pid) of
+        [#proc{} = ProcInt] ->
+            ok = return_proc(State, ProcInt);
         [] ->
             % Proc must've died and we already
             % cleared it out of the table in
             % the handle_info clause.
-            State
+            ok
     end,
-    {reply, true, NewState};
+    {noreply, State};
 
 handle_call(set_threshold_ts, _From, State) ->
-    FoldFun = fun
-        (#proc_int{client = undefined} = Proc, StateAcc) ->
-            remove_proc(StateAcc, Proc);
-        (_, StateAcc) ->
-            StateAcc
+    Fun = fun
+        (#proc{client = undefined} = Proc) -> ok = remove_proc(Proc);
+        (_) -> ok
     end,
-    NewState = ets:foldl(FoldFun, State, ?PROCS),
-    {reply, ok, NewState#state{threshold_ts = os:timestamp()}};
+    ok = foreach_proc(Fun),
+    {reply, ok, State#state{threshold_ts = timestamp()}};
 
 handle_call(terminate_stale_procs, _From, #state{threshold_ts = Ts1} = State) ->
-    FoldFun = fun
-        (#proc_int{client = undefined, t0 = Ts2} = Proc, StateAcc) ->
+    Fun = fun
+        (#proc{client = undefined, threshold_ts = Ts2} = Proc) ->
             case Ts1 > Ts2 of
-                true ->
-                    remove_proc(StateAcc, Proc);
-                false ->
-                    StateAcc
+                true -> ok = remove_proc(Proc);
+                false -> ok
             end;
-        (_, StateAcc) ->
-            StateAcc
+        (_) ->
+            ok
     end,
-    NewState = ets:foldl(FoldFun, State, ?PROCS),
-    {reply, ok, NewState};
+    foreach_proc(Fun),
+    {reply, ok, State};
 
 handle_call(_Call, _From, State) ->
     {reply, ignored, State}.
 
-
-handle_cast({os_proc_idle, Pid}, #state{counts=Counts}=State) ->
-    NewState = case ets:lookup(?PROCS, Pid) of
-        [#proc_int{client=undefined, lang=Lang}=Proc] ->
-            case dict:find(Lang, Counts) of
-                {ok, Count} when Count >= State#state.soft_limit ->
-                    couch_log:info("Closing idle OS Process: ~p", [Pid]),
-                    remove_proc(State, Proc);
-                {ok, _} ->
-                    State
+handle_cast({os_proc_idle, Pid}, #state{soft_limit = SoftLimit} = State) ->
+    case ets:lookup(?PROCS, Pid) of
+        [#proc{client = undefined, db_key = DbKey, lang = Lang} = Proc] ->
+            IsOverSoftLimit = get_count(Lang) >= SoftLimit,
+            IsTagged = DbKey =/= undefined,
+            case IsOverSoftLimit orelse IsTagged of
+                true ->
+                    couch_log:debug("Closing tagged or idle OS Process: ~p", [Pid]),
+                    ok = remove_proc(Proc);
+                false ->
+                    ok
             end;
         _ ->
             State
     end,
-    {noreply, NewState};
+    {noreply, State};
 
 handle_cast(reload_config, State) ->
     NewState = State#state{
@@ -215,47 +233,59 @@
         soft_limit = get_soft_limit()
     },
     maybe_configure_erlang_native_servers(),
-    {noreply, flush_waiters(NewState)};
-
+    lists:foreach(
+        fun({Lang, _}) ->
+            ok = flush_waiters(NewState, Lang)
+        end,
+        ets:tab2list(?COUNTERS)
+    ),
+    {noreply, NewState};
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
 
 handle_info(shutdown, State) ->
     {stop, shutdown, State};
-
+handle_info({'EXIT', Pid, {spawn_ok, Proc0, undefined = _From}}, State) ->
+    % Use ets:take/2 to assert that opener existed before removing. Also assert that
+    % the pid matches and the the client was a bogus client
+    [{Pid, #client{from = undefined}}] = ets:take(?OPENING, Pid),
+    Proc = Proc0#proc{client = undefined},
+    link(Proc#proc.pid),
+    ets:insert(?PROCS, Proc),
+    insert_idle_by_db(Proc),
+    {noreply, State};
 handle_info({'EXIT', Pid, {spawn_ok, Proc0, {ClientPid,_} = From}}, State) ->
-    ets:delete(?OPENING, Pid),
-    link(Proc0#proc_int.pid),
+    % Use ets:take/2 to assert that opener existed before removing
+    [{Pid, #client{}}] = ets:take(?OPENING, Pid),
+    link(Proc0#proc.pid),
     Proc = assign_proc(ClientPid, Proc0),
     gen_server:reply(From, {ok, Proc, State#state.config}),
     {noreply, State};
 
 handle_info({'EXIT', Pid, spawn_error}, State) ->
-    [{Pid, #client{lang=Lang}}] = ets:lookup(?OPENING, Pid),
-    ets:delete(?OPENING, Pid),
-    NewState = State#state{
-        counts = dict:update_counter(Lang, -1, State#state.counts)
-    },
-    {noreply, flush_waiters(NewState, Lang)};
+    % Assert when removing that we always expect the opener to have been there
+    [{Pid, #client{lang = Lang}}] = ets:take(?OPENING, Pid),
+    dec_count(Lang),
+    ok = flush_waiters(State, Lang),
+    {noreply, State};
 
 handle_info({'EXIT', Pid, Reason}, State) ->
     couch_log:info("~p ~p died ~p", [?MODULE, Pid, Reason]),
     case ets:lookup(?PROCS, Pid) of
-        [#proc_int{} = Proc] ->
-            NewState = remove_proc(State, Proc),
-            {noreply, flush_waiters(NewState, Proc#proc_int.lang)};
+        [#proc{} = Proc] ->
+            ok = remove_proc(Proc),
+            ok = flush_waiters(State, Proc#proc.lang);
         [] ->
-            {noreply, State}
-    end;
-
-handle_info({'DOWN', Ref, _, _, _Reason}, State0) ->
-    case ets:match_object(?PROCS, #proc_int{client=Ref, _='_'}) of
-        [#proc_int{} = Proc] ->
-            {noreply, return_proc(State0, Proc)};
-        [] ->
-            {noreply, State0}
-    end;
+            ok
+    end,
+    {noreply, State};
+handle_info({'DOWN', Ref, _, _, _Reason}, #state{} = State) ->
+    case ets:match_object(?PROCS, #proc{client = Ref, _ = '_'}) of
+        [#proc{} = Proc] -> ok = return_proc(State, Proc);
+        [] -> ok
+    end,
+    {noreply, State};
 
 
 handle_info(restart_config_listener, State) ->
@@ -284,73 +314,97 @@
 handle_config_change(_, _, _, _, _) ->
     {ok, undefined}.
 
-
-find_proc(#client{lang = Lang, ddoc_key = undefined}) ->
-    Pred = fun(_) ->
-        true
-    end,
-    find_proc(Lang, Pred);
-find_proc(#client{lang = Lang, ddoc = DDoc, ddoc_key = DDocKey} = Client) ->
-    Pred = fun(#proc_int{ddoc_keys = DDocKeys}) ->
-        lists:member(DDocKey, DDocKeys)
-    end,
-    case find_proc(Lang, Pred) of
+find_proc(#client{ddoc_key = undefined} = Client, _CanSpawn) ->
+    #client{lang = Lang} = Client,
+    % Find an unowned process first, if that fails find an owned one
+    case find_proc(Lang, undefined, '_') of
+        {ok, Proc} ->
+            {ok, Proc};
         not_found ->
-            case find_proc(Client#client{ddoc_key=undefined}) of
+            case find_proc(Lang, '_', '_') of
                 {ok, Proc} ->
-                    teach_ddoc(DDoc, DDocKey, Proc);
+                    {ok, Proc};
                 Else ->
                     Else
             end;
         Else ->
             Else
-    end.
-
-find_proc(Lang, Fun) ->
-    try iter_procs(Lang, Fun)
-    catch ?STACKTRACE(error, Reason, StackTrace)
-        couch_log:error("~p ~p ~p", [?MODULE, Reason, StackTrace]),
-        {error, Reason}
-    end.
-
-
-iter_procs(Lang, Fun) when is_binary(Lang) ->
-    Pattern = #proc_int{lang=Lang, client=undefined, _='_'},
-    MSpec = [{Pattern, [], ['$_']}],
-    case ets:select_reverse(?PROCS, MSpec, 25) of
-        '$end_of_table' ->
-            not_found;
-        Continuation ->
-            iter_procs_int(Continuation, Fun)
-    end.
-
-
-iter_procs_int({[], Continuation0}, Fun) ->
-    case ets:select_reverse(Continuation0) of
-        '$end_of_table' ->
-            not_found;
-        Continuation1 ->
-            iter_procs_int(Continuation1, Fun)
     end;
-iter_procs_int({[Proc | Rest], Continuation}, Fun) ->
-    case Fun(Proc) of
-        true ->
+find_proc(#client{} = Client, CanSpawn) ->
+    #client{
+        lang = Lang,
+        ddoc = DDoc,
+        db_key = DbKey,
+        ddoc_key = DDocKey
+    } = Client,
+    case find_proc(Lang, DbKey, DDocKey) of
+        not_found ->
+            % Find a ddoc process used by the same db at least
+            case find_proc(Lang, DbKey, '_') of
+                {ok, Proc} ->
+                    teach_ddoc(DDoc, DbKey, DDocKey, Proc);
+                not_found ->
+                    % Pick a process not used by any ddoc
+                    case find_proc(Lang, undefined, '_') of
+                        {ok, Proc} ->
+                            replenish_untagged_pool(Lang, CanSpawn),
+                            teach_ddoc(DDoc, DbKey, DDocKey, Proc);
+                        Else ->
+                            Else
+                    end;
+                Else ->
+                    Else
+            end;
+        {ok, Proc} ->
             {ok, Proc};
-        false ->
-            iter_procs_int({Rest, Continuation}, Fun)
+        Else ->
+            Else
     end.
 
+find_proc(Lang, DbPat, DDocKey) when
+    DbPat =:= '_' orelse DbPat =:= undefined orelse is_binary(DbPat),
+    DDocKey =:= '_' orelse is_tuple(DDocKey)
+->
+    Pattern = {{Lang, DbPat, '$1'}, '$2'},
+    Guards =
+        case DDocKey of
+            '_' -> [];
+            {_, _} -> [{map_get, {const, DDocKey}, '$2'}]
+        end,
+    MSpec = [{Pattern, Guards, ['$1']}],
+    case ets:select_reverse(?IDLE_BY_DB, MSpec, 1) of
 
-spawn_proc(State, Client) ->
+        '$end_of_table' ->
+            not_found;
+        {[Pid], _Continuation} when is_pid(Pid) ->
+            [#proc{client = undefined} = Proc] = ets:lookup(?PROCS, Pid),
+            % Once it's found it's not idle any longer and it might be
+            % "tought" a new ddoc, so its db_key might change
+            remove_idle_by_db(Proc),
+            remove_idle_access(Proc),
+            {ok, Proc}
+    end.
+
+spawn_proc(#client{} = Client) ->
     Pid = spawn_link(?MODULE, new_proc, [Client]),
     ets:insert(?OPENING, {Pid, Client}),
-    Counts = State#state.counts,
-    Lang = Client#client.lang,
-    State#state{
-        counts = dict:update_counter(Lang, 1, Counts)
-    }.
+    inc_count(Client#client.lang).
 
-
+% This instance was spawned without a client to replenish
+% the untagged pool asynchronously
+new_proc(#client{from = undefined} = Client) ->
+    #client{lang = Lang} = Client,
+    Resp = try
+            case new_proc_int(undefined, Lang) of
+                {ok, Proc} ->
+                    {spawn_ok, Proc, undefined};
+                _Error ->
+                    spawn_error
+            end
+        catch _:_ ->
+            spawn_error
+        end,
+    exit(Resp);
 new_proc(#client{ddoc=undefined, ddoc_key=undefined}=Client) ->
     #client{from=From, lang=Lang} = Client,
     Resp = try
@@ -365,13 +419,17 @@
         spawn_error
     end,
     exit(Resp);
-
-new_proc(Client) ->
-    #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey} = Client,
-    Resp = try
-        case new_proc_int(From, Lang) of
+new_proc(#client{} = Client) ->
+    #client{
+        from = From,
+        lang = Lang,
+        ddoc = DDoc,
+        db_key = DbKey,
+        ddoc_key = DDocKey
+    } = Client,
+    Resp = try case new_proc_int(From, Lang) of
         {ok, NewProc} ->
-            {ok, Proc} = teach_ddoc(DDoc, DDocKey, NewProc),
+                    {ok, Proc} = teach_ddoc(DDoc, DbKey, DDocKey, NewProc),
             {spawn_ok, Proc, From};
         Error ->
             gen_server:reply(From, {error, Error}),
@@ -382,6 +440,64 @@
     end,
     exit(Resp).
 
+replenish_untagged_pool(Lang, _CanSpawn = true) ->
+    % After an untagged instance is tagged, we try to replenish
+    % the untagged pool asynchronously. Here we are using a "bogus"
+    % #client{} with an undefined from field.
+    ok = spawn_proc(#client{lang = Lang, from = undefined});
+replenish_untagged_pool(_Lang, _CanSpawn = false) ->
+    ok.
+
+reap_idle(Num, <<_/binary>> = Lang) when is_integer(Num), Num >= 1 ->
+    case ets:match_object(?IDLE_ACCESS, {{Lang, '_', '_'}, '_'}, Num) of
+        '$end_of_table' ->
+            0;
+        {Objects = [_ | _], _} ->
+            ok = reap_idle(Objects),
+            length(Objects)
+    end.
+
+reap_idle([]) ->
+    ok;
+reap_idle([{{_Lang, _Ts, Pid}, true} | Rest]) ->
+    case ets:lookup(?PROCS, Pid) of
+        % Do an extra assert that client is undefined
+        [#proc{client = undefined} = Proc] ->
+            ok = remove_proc(Proc);
+        [] ->
+            ok
+    end,
+    reap_idle(Rest).
+
+insert_idle_access(#proc{db_key = undefined}, _Ts) ->
+    % Only tagged proc are index in ?IDLE_ACCESS
+    ok;
+insert_idle_access(#proc{db_key = <<_/binary>>} = Proc, Ts) ->
+    #proc{lang = Lang, pid = Pid} = Proc,
+    % Lang is used for partially bound key access
+    % Pid is for uniqueness as time is not strictly monotonic
+    true = ets:insert_new(?IDLE_ACCESS, {{Lang, Ts, Pid}, true}),
+    ok.
+
+remove_idle_access(#proc{db_key = undefined}) ->
+    % Only tagged procs are indexed in ?IDLE_ACCESS
+    ok;
+remove_idle_access(#proc{db_key = <<_/binary>>} = Proc) ->
+    #proc{last_use_ts = Ts, lang = Lang, pid = Pid} = Proc,
+    true = ets:delete(?IDLE_ACCESS, {Lang, Ts, Pid}),
+    ok.
+
+insert_idle_by_db(#proc{} = Proc) ->
+    #proc{lang = Lang, pid = Pid, db_key = Db, ddoc_keys = #{} = DDocs} = Proc,
+    % An extra assert that only expect to insert a new object
+    true = ets:insert_new(?IDLE_BY_DB, {{Lang, Db, Pid}, DDocs}),
+    ok.
+
+remove_idle_by_db(#proc{} = Proc) ->
+    #proc{lang = Lang, pid = Pid, db_key = Db} = Proc,
+    true = ets:delete(?IDLE_BY_DB, {Lang, Db, Pid}),
+    ok.
+
 split_string_if_longer(String, Pos) ->
     case length(String) > Pos of
         true -> lists:split(Pos, String);
@@ -435,7 +551,10 @@
     LangStr = binary_to_list(Lang),
     case get_query_server(LangStr) of
     undefined ->
-        gen_server:reply(From, {unknown_query_language, Lang});
+            case From of
+                undefined -> ok;
+                {_, _} -> gen_server:reply(From, {unknown_query_language, Lang})
+            end;
     {M, F, A} ->
         {ok, Pid} = apply(M, F, A),
         make_proc(Pid, Lang, M);
@@ -444,102 +563,98 @@
         make_proc(Pid, Lang, couch_os_process)
     end.
 
-
-teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc_int{ddoc_keys=Keys}=Proc) ->
+teach_ddoc(DDoc, DbKey, {DDocId, _Rev} = DDocKey, #proc{ddoc_keys = #{} = Keys} = Proc) ->
     % send ddoc over the wire
     % we only share the rev with the client we know to update code
     % but it only keeps the latest copy, per each ddoc, around.
-    true = couch_query_servers:proc_prompt(
-        export_proc(Proc),
-        [<<"ddoc">>, <<"new">>, DDocId, couch_doc:to_json_obj(DDoc, [])]),
+    JsonDoc = couch_doc:to_json_obj(DDoc, []),
+    Prompt = [<<"ddoc">>, <<"new">>, DDocId, JsonDoc],
+    true = couch_query_servers:proc_prompt(Proc, Prompt),
     % we should remove any other ddocs keys for this docid
     % because the query server overwrites without the rev
-    Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
+    Keys2 = maps:filter(fun({Id, _}, true) -> Id =/= DDocId end, Keys),
+
     % add ddoc to the proc
-    {ok, Proc#proc_int{ddoc_keys=[DDocKey|Keys2]}}.
+    {ok, Proc#proc{db_key = DbKey, ddoc_keys = Keys2#{DDocKey => true}}}.
 
 
 make_proc(Pid, Lang, Mod) when is_binary(Lang) ->
-    Proc = #proc_int{
+    Proc = #proc{
         lang = Lang,
         pid = Pid,
         prompt_fun = {Mod, prompt},
         set_timeout_fun = {Mod, set_timeout},
-        stop_fun = {Mod, stop}
+        stop_fun = {Mod, stop},
+        threshold_ts = timestamp(),
+        last_use_ts = timestamp()
     },
     unlink(Pid),
     {ok, Proc}.
 
-
-assign_proc(Pid, #proc_int{client=undefined}=Proc0) when is_pid(Pid) ->
-    Proc = Proc0#proc_int{client = erlang:monitor(process, Pid)},
+assign_proc(Pid, #proc{client = undefined} = Proc0) when is_pid(Pid) ->
+    Proc = Proc0#proc{client = erlang:monitor(process, Pid)},
+    % It's important to insert the proc here instead of doing an update_element
+    % as we might have updated the db_key or ddoc_keys in teach_ddoc/4
     ets:insert(?PROCS, Proc),
-    export_proc(Proc);
-assign_proc(#client{}=Client, #proc_int{client=undefined}=Proc) ->
+    Proc;
+assign_proc(#client{} = Client, #proc{client = undefined} = Proc) ->
     {Pid, _} = Client#client.from,
     assign_proc(Pid, Proc).
 
-
-return_proc(#state{} = State, #proc_int{} = ProcInt) ->
-    #proc_int{pid = Pid, lang = Lang} = ProcInt,
-    NewState = case is_process_alive(Pid) of true ->
-        case ProcInt#proc_int.t0 < State#state.threshold_ts of
-            true ->
-                remove_proc(State, ProcInt);
-            false ->
-                gen_server:cast(Pid, garbage_collect),
-                true = ets:update_element(?PROCS, Pid, [
-                    {#proc_int.client, undefined}
-                ]),
-                State
-        end;
-    false ->
-        remove_proc(State, ProcInt)
+return_proc(#state{} = State, #proc{} = Proc) ->
+    #proc{pid = Pid, lang = Lang} = Proc,
+    case is_process_alive(Pid) of
+        true ->
+            case Proc#proc.threshold_ts < State#state.threshold_ts of
+                true ->
+                    ok = remove_proc(Proc);
+                false ->
+                    gen_server:cast(Pid, garbage_collect),
+                    Ts = timestamp(),
+                    true = ets:update_element(?PROCS, Pid, [
+                        {#proc.client, undefined},
+                        {#proc.last_use_ts, Ts}
+                    ]),
+                    Proc1 = Proc#proc{client = undefined, last_use_ts = Ts},
+                    insert_idle_access(Proc1, Ts),
+                    insert_idle_by_db(Proc1)
+            end;
+        false ->
+            ok = remove_proc(Proc)
     end,
-    flush_waiters(NewState, Lang).
+    ok = flush_waiters(State, Lang).
 
-
-remove_proc(State, #proc_int{}=Proc) ->
-    ets:delete(?PROCS, Proc#proc_int.pid),
-    case is_process_alive(Proc#proc_int.pid) of true ->
-        unlink(Proc#proc_int.pid),
-        gen_server:cast(Proc#proc_int.pid, stop);
-    false ->
-        ok
+remove_proc(#proc{pid = Pid} = Proc) ->
+    remove_idle_access(Proc),
+    remove_idle_by_db(Proc),
+    ets:delete(?PROCS, Pid),
+    case is_process_alive(Pid) of
+        true->
+            unlink(Pid),
+            gen_server:cast(Pid, stop);
+        false ->
+            ok
     end,
-    Counts = State#state.counts,
-    Lang = Proc#proc_int.lang,
-    State#state{
-        counts = dict:update_counter(Lang, -1, Counts)
-    }.
+    dec_count(Proc#proc.lang).
 
 
--spec export_proc(#proc_int{}) -> #proc{}.
-export_proc(#proc_int{} = ProcInt) ->
-    ProcIntList = tuple_to_list(ProcInt),
-    ProcLen = record_info(size, proc),
-    [_ | Data] = lists:sublist(ProcIntList, ProcLen),
-    list_to_tuple([proc | Data]).
 
-
-flush_waiters(State) ->
-    dict:fold(fun(Lang, Count, StateAcc) ->
-        case Count < State#state.hard_limit of
-            true ->
-                flush_waiters(StateAcc, Lang);
-            false ->
-                StateAcc
-        end
-    end, State, State#state.counts).
-
-
-flush_waiters(State, Lang) ->
-    CanSpawn = can_spawn(State, Lang),
+flush_waiters(#state{} = State, Lang) ->
+    #state{hard_limit = HardLimit, config = {[_ | _] = Cfg}} = State,
+    TimeoutMSec = couch_util:get_value(<<"timeout">>, Cfg),
+    Timeout = erlang:convert_time_unit(TimeoutMSec, millisecond, native),
+    StaleLimit = timestamp() - Timeout,
     case get_waiting_client(Lang) of
+        #client{wait_key = {_, T, _}} = Client when is_integer(T), T < StaleLimit ->
+            % Client waited too long and the gen_server call timeout
+            % likey fired already, don't bother allocating a process for it
+            remove_waiting_client(Client),
+            flush_waiters(State, Lang);
         #client{from = From} = Client ->
-            case find_proc(Client) of
-                {ok, ProcInt} ->
-                    Proc = assign_proc(Client, ProcInt),
+            CanSpawn = get_count(Lang) < HardLimit,
+            case find_proc(Client, CanSpawn) of
+                {ok, Proc0} ->
+                    Proc = assign_proc(Client, Proc0),
                     gen_server:reply(From, {ok, Proc, State#state.config}),
                     remove_waiting_client(Client),
                     flush_waiters(State, Lang);
@@ -548,44 +663,56 @@
                     remove_waiting_client(Client),
                     flush_waiters(State, Lang);
                 not_found when CanSpawn ->
-                    NewState = spawn_proc(State, Client),
+                    ok = spawn_proc(Client),
                     remove_waiting_client(Client),
-                    flush_waiters(NewState, Lang);
+                    flush_waiters(State, Lang);
                 not_found ->
-                    State
+                    % 10% of limit
+                    ReapBatch = round(HardLimit * 0.1 + 1),
+                    case reap_idle(ReapBatch, Lang) of
+                        N when is_integer(N), N > 0 ->
+                            % We may have room available to spawn
+                            case get_count(Lang) < HardLimit of
+                                true ->
+                                    ok = spawn_proc(Client),
+                                    remove_waiting_client(Client),
+                                    flush_waiters(State, Lang);
+                                false ->
+                                    ok
+                            end;
+                        0 ->
+                            ok
+                    end
             end;
         undefined ->
-            State
+            ok
     end.
 
-
-add_waiting_client(Client) ->
-    ets:insert(?WAITERS, Client#client{timestamp=os:timestamp()}).
+add_waiting_client(#client{from = {_Pid, Tag}, lang = Lang} = Client) ->
+    % Use Lang in the key first since we can look it up using a partially bound
+    % in get_waiting_client/2. Use the reply tag to provide uniqueness.
+    Key = {Lang, timestamp(), Tag},
+    true = ets:insert_new(?WAITERS, Client#client{wait_key = Key}).
 
 -spec get_waiting_client(Lang :: binary()) -> undefined | #client{}.
 get_waiting_client(Lang) ->
-    case ets:match_object(?WAITERS, #client{lang=Lang, _='_'}, 1) of
+    % Use a partially bound key (Lang) to avoid scanning unrelated procs
+    Key = {Lang, '_', '_'},
+    case ets:match_object(?WAITERS, #client{wait_key = Key, _ = '_'}, 1) of
         '$end_of_table' ->
             undefined;
         {[#client{}=Client], _} ->
             Client
     end.
 
+remove_waiting_client(#client{wait_key = Key}) ->
+    ets:delete(?WAITERS, Key).
 
-remove_waiting_client(#client{timestamp = Timestamp}) ->
-    ets:delete(?WAITERS, Timestamp).
-
-
-can_spawn(#state{hard_limit = HardLimit, counts = Counts}, Lang) ->
-    case dict:find(Lang, Counts) of
-        {ok, Count} -> Count < HardLimit;
-        error -> true
-    end.
 
 
 get_proc_config() ->
     Limit = config:get_boolean("query_server_config", "reduce_limit", true),
-    Timeout = config:get_integer("couchdb", "os_process_timeout", 5000),
+    Timeout = get_os_process_timeout(),
     {[
         {<<"reduce_limit">>, Limit},
         {<<"timeout">>, Timeout}
@@ -593,9 +720,37 @@
 
 
 get_hard_limit() ->
-    LimStr = config:get("query_server_config", "os_process_limit", "100"),
-    list_to_integer(LimStr).
+    config:get_integer("query_server_config", "os_process_limit", 100).
 
 
 get_soft_limit() ->
     config:get_integer("query_server_config", "os_process_soft_limit", 100).
+
+get_os_process_timeout() ->
+    config:get_integer("couchdb", "os_process_timeout", 5000).
+
+timestamp() ->
+    erlang:monotonic_time().
+
+foreach_proc(Fun) when is_function(Fun, 1) ->
+    FoldFun = fun(#proc{} = Proc, ok) ->
+        Fun(Proc),
+        ok
+    end,
+    ok = ets:foldl(FoldFun, ok, ?PROCS).
+
+inc_count(Lang) ->
+    ets:update_counter(?COUNTERS, Lang, 1, {Lang, 0}),
+    ok.
+
+dec_count(Lang) ->
+    ets:update_counter(?COUNTERS, Lang, -1, {Lang, 0}),
+    ok.
+
+get_count(Lang) ->
+    case ets:lookup(?COUNTERS, Lang) of
+        [{_, Count}] when is_integer(Count) ->
+            Count;
+        [] ->
+            0
+    end.
diff --git a/src/couch/src/couch_query_servers.erl b/src/couch/src/couch_query_servers.erl
index 10b8048..43a1b7f 100644
--- a/src/couch/src/couch_query_servers.erl
+++ b/src/couch/src/couch_query_servers.erl
@@ -14,16 +14,16 @@
 
 -export([try_compile/4]).
 -export([start_doc_map/3, map_doc_raw/2, stop_doc_map/1, raw_to_ejson/1]).
--export([reduce/3, rereduce/3,validate_doc_update/5]).
+-export([reduce/3, rereduce/3, validate_doc_update/6]).
 -export([filter_docs/5]).
--export([filter_view/3]).
+-export([filter_view/4]).
 -export([finalize/2]).
 -export([rewrite/3]).
 
--export([with_ddoc_proc/2, proc_prompt/2, ddoc_prompt/3, ddoc_proc_prompt/3, json_doc/1]).
+-export([with_ddoc_proc/3, proc_prompt/2, ddoc_prompt/4, ddoc_proc_prompt/3, json_doc/1]).
 
 % For 210-os-proc-pool.t
--export([get_os_process/1, get_ddoc_process/2, ret_os_process/1]).
+-export([get_os_process/1, get_ddoc_process/3, ret_os_process/1]).
 
 -include_lib("couch/include/couch_db.hrl").
 
@@ -355,17 +355,19 @@
     hyper:union([Filter || [_, Filter] <- Reds]).
 
 % use the function stored in ddoc.validate_doc_update to test an update.
--spec validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj) -> ok when
+-spec validate_doc_update(Db, DDoc, EditDoc, DiskDoc, Ctx, SecObj) -> ok when
+    Db :: term(),
     DDoc    :: ddoc(),
     EditDoc :: doc(),
     DiskDoc :: doc() | nil,
     Ctx     :: user_ctx(),
     SecObj  :: sec_obj().
 
-validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj) ->
+validate_doc_update(Db, DDoc, EditDoc, DiskDoc, Ctx, SecObj) ->
     JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]),
     JsonDiskDoc = json_doc(DiskDoc),
     Resp = ddoc_prompt(
+        Db,
         DDoc,
         [<<"validate_doc_update">>],
         [JsonEditDoc, JsonDiskDoc, Ctx, SecObj]
@@ -392,7 +394,7 @@
               F =/= <<"info">>, F =/= <<"form">>,
               F =/= <<"uuid">>, F =/= <<"id">>],
     JsonReq = chttpd_external:json_req_obj(Req, Db, null, Fields),
-    case couch_query_servers:ddoc_prompt(DDoc, [<<"rewrites">>], [JsonReq]) of
+    case ddoc_prompt(Db, DDoc, [<<"rewrites">>], [JsonReq]) of
         {[{<<"forbidden">>, Message}]} ->
             throw({forbidden, Message});
         {[{<<"unauthorized">>, Message}]} ->
@@ -480,10 +482,10 @@
 json_doc(Doc, Options) ->
     couch_doc:to_json_obj(Doc, Options).
 
-filter_view(DDoc, VName, Docs) ->
+filter_view(Db, DDoc, VName, Docs) ->
     Options = json_doc_options(),
     JsonDocs = [json_doc(Doc, Options) || Doc <- Docs],
-    [true, Passes] = ddoc_prompt(DDoc, [<<"views">>, VName, <<"map">>], [JsonDocs]),
+    [true, Passes] = ddoc_prompt(Db, DDoc, [<<"views">>, VName, <<"map">>], [JsonDocs]),
     {ok, Passes}.
 
 filter_docs(Req, Db, DDoc, FName, Docs) ->
@@ -496,33 +498,34 @@
     Options = json_doc_options(),
     JsonDocs = [json_doc(Doc, Options) || Doc <- Docs],
     try
-        {ok, filter_docs_int(DDoc, FName, JsonReq, JsonDocs)}
+        {ok, filter_docs_int(Db, DDoc, FName, JsonReq, JsonDocs)}
     catch
         throw:{os_process_error,{exit_status,1}} ->
             %% batch used too much memory, retry sequentially.
             Fun = fun(JsonDoc) ->
-                filter_docs_int(DDoc, FName, JsonReq, [JsonDoc])
+                filter_docs_int(Db, DDoc, FName, JsonReq, [JsonDoc])
             end,
             {ok, lists:flatmap(Fun, JsonDocs)}
     end.
 
-filter_docs_int(DDoc, FName, JsonReq, JsonDocs) ->
-    [true, Passes] = ddoc_prompt(DDoc, [<<"filters">>, FName],
+filter_docs_int(Db, DDoc, FName, JsonReq, JsonDocs) ->
+    [true, Passes] = ddoc_prompt(Db, DDoc, [<<"filters">>, FName],
         [JsonDocs, JsonReq]),
     Passes.
 
 ddoc_proc_prompt({Proc, DDocId}, FunPath, Args) ->
     proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args]).
 
-ddoc_prompt(DDoc, FunPath, Args) ->
-    with_ddoc_proc(DDoc, fun({Proc, DDocId}) ->
+ddoc_prompt(Db, DDoc, FunPath, Args) ->
+    with_ddoc_proc(Db, DDoc, fun({Proc, DDocId}) ->
         proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args])
     end).
 
-with_ddoc_proc(#doc{id=DDocId,revs={Start, [DiskRev|_]}}=DDoc, Fun) ->
+with_ddoc_proc(Db, #doc{id = DDocId, revs = {Start, [DiskRev | _]}} = DDoc, Fun) ->
     Rev = couch_doc:rev_to_str({Start, DiskRev}),
+    DbKey = db_key(Db),
     DDocKey = {DDocId, Rev},
-    Proc = get_ddoc_process(DDoc, DDocKey),
+    Proc = get_ddoc_process(DDoc, DbKey, DDocKey),
     try Fun({Proc, DDocId}) of
         Resp ->
             ok = ret_os_process(Proc),
@@ -532,6 +535,22 @@
         erlang:raise(Tag, Err, Stack)
     end.
 
+db_key(DbName) when is_binary(DbName) ->
+    Name = mem3:dbname(DbName),
+    case config:get("query_server_config", "db_tag", "name") of
+        "prefix" ->
+            case binary:split(Name, <<"/">>) of
+                [Prefix, _] when byte_size(Prefix) > 0 -> Prefix;
+                _ -> Name
+            end;
+        "none" ->
+            undefined;
+        _ ->
+            Name
+    end;
+db_key(Db) ->
+    db_key(couch_db:name(Db)).
+
 proc_prompt(Proc, Args) ->
      case proc_prompt_raw(Proc, Args) of
      {json, Json} ->
@@ -622,12 +641,9 @@
     {Mod, Func} = Proc#proc.set_timeout_fun,
     apply(Mod, Func, [Proc#proc.pid, Timeout]).
 
-get_os_process_timeout() ->
-    config:get_integer("couchdb", "os_process_timeout", 5000).
-
-get_ddoc_process(#doc{} = DDoc, DDocKey) ->
+get_ddoc_process(#doc{} = DDoc, DbKey, DDocKey) ->
     % remove this case statement
-    case gen_server:call(couch_proc_manager, {get_proc, DDoc, DDocKey}, get_os_process_timeout()) of
+    case couch_proc_manager:get_proc(DDoc, DbKey, DDocKey) of
     {ok, Proc, {QueryConfig}} ->
         % process knows the ddoc
         case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of
@@ -636,14 +652,14 @@
             Proc;
         _ ->
             catch proc_stop(Proc),
-            get_ddoc_process(DDoc, DDocKey)
+                    get_ddoc_process(DDoc, DbKey, DDocKey)
         end;
     Error ->
         throw(Error)
     end.
 
 get_os_process(Lang) ->
-    case gen_server:call(couch_proc_manager, {get_proc, Lang}, get_os_process_timeout()) of
+    case couch_proc_manager:get_proc(Lang) of
     {ok, Proc, {QueryConfig}} ->
         case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of
         true ->
@@ -658,7 +674,7 @@
     end.
 
 ret_os_process(Proc) ->
-    true = gen_server:call(couch_proc_manager, {ret_proc, Proc}, infinity),
+    true = couch_proc_manager:ret_proc(Proc),
     catch unlink(Proc#proc.pid),
     ok.
 
@@ -670,7 +686,10 @@
 
 
 -ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
+
+-include_lib("couch/include/couch_eunit.hrl").
+
+-define(TDEF_FE(Name), fun(Arg) -> {atom_to_list(Name), ?_test(Name(Arg))} end).
 
 builtin_sum_rows_negative_test() ->
     A = [{[{<<"a">>, 1}]}, {[{<<"a">>, 2}]}, {[{<<"a">>, 3}]}],
@@ -799,4 +818,42 @@
         end
     end, NotOk).
 
+db_key_test_() ->
+    {
+        foreach,
+        fun setup/0,
+        fun teardown/1,
+        [
+            ?TDEF_FE(t_db_key_default),
+            ?TDEF_FE(t_db_key_prefix),
+            ?TDEF_FE(t_db_key_none)
+        ]
+    }.
+
+setup() ->
+    meck:new(config, [passthrough]),
+    meck:expect(config, get, fun(_, _, Default) -> Default end),
+    ok.
+
+teardown(_) ->
+    meck:unload().
+
+t_db_key_default(_) ->
+    ?assertEqual(<<"foo">>, db_key(<<"foo">>)),
+    ?assertEqual(<<"foo/bar">>, db_key(<<"foo/bar">>)),
+    ?assertEqual(<<"foo/bar">>, db_key(<<"shards/00000000-3fffffff/foo/bar.1415960794">>)).
+
+t_db_key_prefix(_) ->
+    meck:expect(config, get, fun(_, "db_tag", _) -> "prefix" end),
+    ?assertEqual(<<"foo">>, db_key(<<"foo">>)),
+    ?assertEqual(<<"foo">>, db_key(<<"foo/bar">>)),
+    ?assertEqual(<<"foo">>, db_key(<<"shards/00000000-3fffffff/foo/bar.1415960794">>)),
+    ?assertEqual(<<"/foo">>, db_key(<<"/foo">>)).
+
+t_db_key_none(_) ->
+    meck:expect(config, get, fun(_, "db_tag", _) -> "none" end),
+    ?assertEqual(undefined, db_key(<<"foo">>)),
+    ?assertEqual(undefined, db_key(<<"foo/bar">>)),
+    ?assertEqual(undefined, db_key(<<"shards/00000000-3fffffff/foo/bar.1415960794">>)).
+
 -endif.
diff --git a/src/couch/test/eunit/couch_auth_cache_tests.erl b/src/couch/test/eunit/couch_auth_cache_tests.erl
index 71faf77..ceed681 100644
--- a/src/couch/test/eunit/couch_auth_cache_tests.erl
+++ b/src/couch/test/eunit/couch_auth_cache_tests.erl
@@ -345,5 +345,6 @@
 
 validate(DiskDoc, NewDoc, JSONCtx) ->
     {ok, DDoc0} = couch_auth_cache:auth_design_doc(<<"_design/anything">>),
+    Db = <<"validate_couch_auth_cache_tests">>,
     DDoc = DDoc0#doc{revs = {1, [<<>>]}},
-    couch_query_servers:validate_doc_update(DDoc, NewDoc, DiskDoc, JSONCtx, []).
+    couch_query_servers:validate_doc_update(Db, DDoc, NewDoc, DiskDoc, JSONCtx, []).
diff --git a/src/couch/test/eunit/couch_query_servers_tests.erl b/src/couch/test/eunit/couch_query_servers_tests.erl
index 440fc8e..d142498 100644
--- a/src/couch/test/eunit/couch_query_servers_tests.erl
+++ b/src/couch/test/eunit/couch_query_servers_tests.erl
@@ -110,7 +110,7 @@
 
 should_split_large_batches() ->
     Req = {json_req, {[]}},
-    Db = undefined,
+    Db = <<"somedb">>,
     DDoc = #doc{
         id = <<"_design/foo">>,
         revs = {0, [<<"bork bork bork">>]},
diff --git a/src/couch/test/eunit/couchdb_os_proc_pool.erl b/src/couch/test/eunit/couchdb_os_proc_pool.erl
index b552e11..6453853 100644
--- a/src/couch/test/eunit/couchdb_os_proc_pool.erl
+++ b/src/couch/test/eunit/couchdb_os_proc_pool.erl
@@ -15,240 +15,492 @@
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
+-define(TDEF_FE(Name), fun(Arg) -> {atom_to_list(Name), ?_test(Name(Arg))} end).
+
 -define(TIMEOUT, 1000).
 
 
 setup() ->
-    ok = couch_proc_manager:reload(),
+    Ctx = test_util:start_couch(),
     meck:new(couch_os_process, [passthrough]),
-    ok = setup_config().
+    meck:new(couch_proc_manager, [passthrough]),
+    ok = setup_config(),
+    Ctx.
 
-teardown(_) ->
+teardown(Ctx) ->
+    ok = teardown_config(),
     meck:unload(),
+    test_util:stop_couch(Ctx),
     ok.
 
 os_proc_pool_test_() ->
     {
         "OS processes pool tests",
         {
-            setup,
-            fun test_util:start_couch/0, fun test_util:stop_couch/1,
-            {
-                foreach,
-                fun setup/0, fun teardown/1,
-                [
-                    should_block_new_proc_on_full_pool(),
-                    should_free_slot_on_proc_unexpected_exit(),
-                    should_reuse_known_proc(),
-%                    should_process_waiting_queue_as_fifo(),
-                    should_reduce_pool_on_idle_os_procs(),
-                    should_not_return_broken_process_to_the_pool()
-                ]
-            }
+            foreach,
+            fun setup/0,
+            fun teardown/1,
+            [
+                ?TDEF_FE(should_block_new_proc_on_full_pool),
+                ?TDEF_FE(should_free_slot_on_proc_unexpected_exit),
+                ?TDEF_FE(should_reuse_known_proc),
+                ?TDEF_FE(should_process_waiting_queue_as_fifo),
+                ?TDEF_FE(should_reduce_pool_on_idle_os_procs),
+                ?TDEF_FE(should_reduce_pool_of_tagged_processes_on_idle),
+                ?TDEF_FE(should_not_return_broken_process_to_the_pool),
+                ?TDEF_FE(oldest_tagged_process_is_reaped),
+                ?TDEF_FE(untagged_process_is_replenished),
+                ?TDEF_FE(exact_ddoc_tagged_process_is_picked_first),
+                ?TDEF_FE(db_tagged_process_is_second_choice),
+                ?TDEF_FE(if_no_tagged_process_found_new_must_be_spawned),
+                ?TDEF_FE(db_tag_none_works),
+                ?TDEF_FE(stale_procs_are_cleaned),
+                ?TDEF_FE(bad_query_language)
+            ]
         }
     }.
 
+should_block_new_proc_on_full_pool(_) ->
+    Client1 = spawn_client(),
+    Client2 = spawn_client(),
+    Client3 = spawn_client(),
 
-should_block_new_proc_on_full_pool() ->
-    ?_test(begin
-        Client1 = spawn_client(),
-        Client2 = spawn_client(),
-        Client3 = spawn_client(),
+    ?assertEqual(ok, ping_client(Client1)),
+    ?assertEqual(ok, ping_client(Client2)),
+    ?assertEqual(ok, ping_client(Client3)),
 
-        ?assertEqual(ok, ping_client(Client1)),
-        ?assertEqual(ok, ping_client(Client2)),
-        ?assertEqual(ok, ping_client(Client3)),
+    Proc1 = get_client_proc(Client1, "1"),
+    Proc2 = get_client_proc(Client2, "2"),
+    Proc3 = get_client_proc(Client3, "3"),
 
-        Proc1 = get_client_proc(Client1, "1"),
-        Proc2 = get_client_proc(Client2, "2"),
-        Proc3 = get_client_proc(Client3, "3"),
+    ?assertNotEqual(Proc1, Proc2),
+    ?assertNotEqual(Proc2, Proc3),
+    ?assertNotEqual(Proc3, Proc1),
 
-        ?assertNotEqual(Proc1, Proc2),
-        ?assertNotEqual(Proc2, Proc3),
-        ?assertNotEqual(Proc3, Proc1),
+    Client4 = spawn_client(),
+    ?assertEqual(timeout, ping_client(Client4)),
 
-        Client4 = spawn_client(),
-        ?assertEqual(timeout, ping_client(Client4)),
+    ?assertEqual(ok, stop_client(Client1)),
+    ?assertEqual(ok, ping_client(Client4)),
 
-        ?assertEqual(ok, stop_client(Client1)),
-        ?assertEqual(ok, ping_client(Client4)),
+    Proc4 = get_client_proc(Client4, "4"),
 
-        Proc4 = get_client_proc(Client4, "4"),
+    ?assertEqual(Proc1#proc.pid, Proc4#proc.pid),
+    ?assertNotEqual(Proc1#proc.client, Proc4#proc.client),
 
-        ?assertEqual(Proc1#proc.pid, Proc4#proc.pid),
-        ?assertNotEqual(Proc1#proc.client, Proc4#proc.client),
+    stop_clients([Client2, Client3, Client4]).
 
-        lists:map(fun(C) ->
-            ?assertEqual(ok, stop_client(C))
-        end, [Client2, Client3, Client4])
-    end).
+should_free_slot_on_proc_unexpected_exit(_) ->
+    Client1 = spawn_client(),
+    Client2 = spawn_client(),
+    Client3 = spawn_client(),
 
+    ?assertEqual(ok, ping_client(Client1)),
+    ?assertEqual(ok, ping_client(Client2)),
+    ?assertEqual(ok, ping_client(Client3)),
 
-should_free_slot_on_proc_unexpected_exit() ->
-    ?_test(begin
-        Client1 = spawn_client(),
-        Client2 = spawn_client(),
-        Client3 = spawn_client(),
+    Proc1 = get_client_proc(Client1, "1"),
+    Proc2 = get_client_proc(Client2, "2"),
+    Proc3 = get_client_proc(Client3, "3"),
 
-        ?assertEqual(ok, ping_client(Client1)),
-        ?assertEqual(ok, ping_client(Client2)),
-        ?assertEqual(ok, ping_client(Client3)),
+    ?assertNotEqual(Proc1#proc.pid, Proc2#proc.pid),
+    ?assertNotEqual(Proc1#proc.client, Proc2#proc.client),
+    ?assertNotEqual(Proc2#proc.pid, Proc3#proc.pid),
+    ?assertNotEqual(Proc2#proc.client, Proc3#proc.client),
+    ?assertNotEqual(Proc3#proc.pid, Proc1#proc.pid),
+    ?assertNotEqual(Proc3#proc.client, Proc1#proc.client),
 
-        Proc1 = get_client_proc(Client1, "1"),
-        Proc2 = get_client_proc(Client2, "2"),
-        Proc3 = get_client_proc(Client3, "3"),
+    ?assertEqual(ok, kill_client(Client1)),
 
-        ?assertNotEqual(Proc1#proc.pid, Proc2#proc.pid),
-        ?assertNotEqual(Proc1#proc.client, Proc2#proc.client),
-        ?assertNotEqual(Proc2#proc.pid, Proc3#proc.pid),
-        ?assertNotEqual(Proc2#proc.client, Proc3#proc.client),
-        ?assertNotEqual(Proc3#proc.pid, Proc1#proc.pid),
-        ?assertNotEqual(Proc3#proc.client, Proc1#proc.client),
+    Client4 = spawn_client(),
+    ?assertEqual(ok, ping_client(Client4)),
 
-        ?assertEqual(ok, kill_client(Client1)),
+    Proc4 = get_client_proc(Client4, "4"),
 
-        Client4 = spawn_client(),
-        ?assertEqual(ok, ping_client(Client4)),
+    ?assertEqual(Proc4#proc.pid, Proc1#proc.pid),
+    ?assertNotEqual(Proc4#proc.client, Proc1#proc.client),
+    ?assertNotEqual(Proc2#proc.pid, Proc4#proc.pid),
+    ?assertNotEqual(Proc2#proc.client, Proc4#proc.client),
+    ?assertNotEqual(Proc3#proc.pid, Proc4#proc.pid),
+    ?assertNotEqual(Proc3#proc.client, Proc4#proc.client),
 
-        Proc4 = get_client_proc(Client4, "4"),
+    stop_clients([Client2, Client3, Client4]).
 
-        ?assertEqual(Proc4#proc.pid, Proc1#proc.pid),
-        ?assertNotEqual(Proc4#proc.client, Proc1#proc.client),
-        ?assertNotEqual(Proc2#proc.pid, Proc4#proc.pid),
-        ?assertNotEqual(Proc2#proc.client, Proc4#proc.client),
-        ?assertNotEqual(Proc3#proc.pid, Proc4#proc.pid),
-        ?assertNotEqual(Proc3#proc.client, Proc4#proc.client),
+should_reuse_known_proc(_) ->
+    Db = <<"db">>,
+    Client1 = spawn_client(Db, <<"ddoc1">>),
+    Client2 = spawn_client(Db, <<"ddoc2">>),
 
-        lists:map(fun(C) ->
-            ?assertEqual(ok, stop_client(C))
-        end, [Client2, Client3, Client4])
-    end).
+    ?assertEqual(ok, ping_client(Client1)),
+    ?assertEqual(ok, ping_client(Client2)),
 
+    Proc1 = get_client_proc(Client1, "1"),
+    Proc2 = get_client_proc(Client2, "2"),
+    ?assertNotEqual(Proc1#proc.pid, Proc2#proc.pid),
 
-should_reuse_known_proc() ->
-    ?_test(begin
-        Client1 = spawn_client(<<"ddoc1">>),
-        Client2 = spawn_client(<<"ddoc2">>),
+    ?assertEqual(ok, stop_client(Client1)),
+    ?assertEqual(ok, stop_client(Client2)),
+    ?assert(is_process_alive(Proc1#proc.pid)),
+    ?assert(is_process_alive(Proc2#proc.pid)),
 
-        ?assertEqual(ok, ping_client(Client1)),
-        ?assertEqual(ok, ping_client(Client2)),
+    Client1Again = spawn_client(Db, <<"ddoc1">>),
+    ?assertEqual(ok, ping_client(Client1Again)),
+    Proc1Again = get_client_proc(Client1Again, "1-again"),
+    ?assertEqual(Proc1#proc.pid, Proc1Again#proc.pid),
+    ?assertNotEqual(Proc1#proc.client, Proc1Again#proc.client),
+    ?assertEqual(ok, stop_client(Client1Again)).
 
-        Proc1 = get_client_proc(Client1, "1"),
-        Proc2 = get_client_proc(Client2, "2"),
-        ?assertNotEqual(Proc1#proc.pid, Proc2#proc.pid),
+should_process_waiting_queue_as_fifo(_) ->
+    Db = <<"db">>,
+    meck:reset(couch_proc_manager),
+    Client1 = spawn_client(Db, <<"ddoc1">>),
+    meck:wait(1, couch_proc_manager, handle_call, [{get_proc, '_'}, '_', '_'], 1000),
+    Client2 = spawn_client(Db, <<"ddoc2">>),
+    meck:wait(2, couch_proc_manager, handle_call, [{get_proc, '_'}, '_', '_'], 1000),
+    Client3 = spawn_client(Db, <<"ddoc3">>),
+    meck:wait(3, couch_proc_manager, handle_call, [{get_proc, '_'}, '_', '_'], 1000),
+    Client4 = spawn_client(Db, <<"ddoc4">>),
+    meck:wait(4, couch_proc_manager, handle_call, [{get_proc, '_'}, '_', '_'], 1000),
+    Client5 = spawn_client(Db, <<"ddoc5">>),
+    meck:wait(5, couch_proc_manager, handle_call, [{get_proc, '_'}, '_', '_'], 1000),
 
-        ?assertEqual(ok, stop_client(Client1)),
-        ?assertEqual(ok, stop_client(Client2)),
-        ?assert(is_process_alive(Proc1#proc.pid)),
-        ?assert(is_process_alive(Proc2#proc.pid)),
+    ?assertEqual(ok, ping_client(Client1)),
+    ?assertEqual(ok, ping_client(Client2)),
+    ?assertEqual(ok, ping_client(Client3)),
+    ?assertEqual(timeout, ping_client(Client4)),
+    ?assertEqual(timeout, ping_client(Client5)),
 
-        Client1Again = spawn_client(<<"ddoc1">>),
-        ?assertEqual(ok, ping_client(Client1Again)),
-        Proc1Again = get_client_proc(Client1Again, "1-again"),
-        ?assertEqual(Proc1#proc.pid, Proc1Again#proc.pid),
-        ?assertNotEqual(Proc1#proc.client, Proc1Again#proc.client),
-        ?assertEqual(ok, stop_client(Client1Again))
-    end).
+    Proc1 = get_client_proc(Client1, "1"),
+    ?assertEqual(ok, stop_client(Client1)),
+    ?assertEqual(ok, ping_client(Client4)),
+    Proc4 = get_client_proc(Client4, "4"),
 
+    ?assertNotEqual(Proc4#proc.client, Proc1#proc.client),
+    ?assertEqual(Proc1#proc.pid, Proc4#proc.pid),
+    ?assertEqual(timeout, ping_client(Client5)),
 
-%should_process_waiting_queue_as_fifo() ->
-%    ?_test(begin
-%        Client1 = spawn_client(<<"ddoc1">>),
-%        Client2 = spawn_client(<<"ddoc2">>),
-%        Client3 = spawn_client(<<"ddoc3">>),
-%        Client4 = spawn_client(<<"ddoc4">>),
-%        timer:sleep(100),
-%        Client5 = spawn_client(<<"ddoc5">>),
-%
-%        ?assertEqual(ok, ping_client(Client1)),
-%        ?assertEqual(ok, ping_client(Client2)),
-%        ?assertEqual(ok, ping_client(Client3)),
-%        ?assertEqual(timeout, ping_client(Client4)),
-%        ?assertEqual(timeout, ping_client(Client5)),
-%
-%        Proc1 = get_client_proc(Client1, "1"),
-%        ?assertEqual(ok, stop_client(Client1)),
-%        ?assertEqual(ok, ping_client(Client4)),
-%        Proc4 = get_client_proc(Client4, "4"),
-%
-%        ?assertNotEqual(Proc4#proc.client, Proc1#proc.client),
-%        ?assertEqual(Proc1#proc.pid, Proc4#proc.pid),
-%        ?assertEqual(timeout, ping_client(Client5)),
-%
-%        ?assertEqual(ok, stop_client(Client2)),
-%        ?assertEqual(ok, stop_client(Client3)),
-%        ?assertEqual(ok, stop_client(Client4)),
-%        ?assertEqual(ok, stop_client(Client5))
-%    end).
+    ?assertEqual(ok, stop_client(Client2)),
+    ?assertEqual(ok, stop_client(Client3)),
+    ?assertEqual(ok, stop_client(Client4)),
+    ?assertEqual(ok, stop_client(Client5)).
 
+should_reduce_pool_on_idle_os_procs(_) ->
+    %% os_process_idle_limit is in sec
+    cfg_set("os_process_idle_limit", "1"),
 
-should_reduce_pool_on_idle_os_procs() ->
-    ?_test(begin
-        %% os_process_idle_limit is in sec
-        config:set("query_server_config",
-            "os_process_idle_limit", "1", false),
-        ok = confirm_config("os_process_idle_limit", "1"),
+    Db = undefined,
+    Client1 = spawn_client(Db, <<"ddoc1">>),
+    Client2 = spawn_client(Db, <<"ddoc2">>),
+    Client3 = spawn_client(Db, <<"ddoc3">>),
 
-        Client1 = spawn_client(<<"ddoc1">>),
-        Client2 = spawn_client(<<"ddoc2">>),
-        Client3 = spawn_client(<<"ddoc3">>),
+    ?assertEqual(ok, ping_client(Client1)),
+    ?assertEqual(ok, ping_client(Client2)),
+    ?assertEqual(ok, ping_client(Client3)),
 
-        ?assertEqual(ok, ping_client(Client1)),
-        ?assertEqual(ok, ping_client(Client2)),
-        ?assertEqual(ok, ping_client(Client3)),
+    ?assertEqual(3, couch_proc_manager:get_proc_count()),
 
-        ?assertEqual(3, couch_proc_manager:get_proc_count()),
+    ?assertEqual(ok, stop_client(Client1)),
+    ?assertEqual(ok, stop_client(Client2)),
+    ?assertEqual(ok, stop_client(Client3)),
 
-        ?assertEqual(ok, stop_client(Client1)),
-        ?assertEqual(ok, stop_client(Client2)),
-        ?assertEqual(ok, stop_client(Client3)),
+    % granularity of idle limit is in seconds
+    timer:sleep(1000),
+    wait_process_count(1).
 
-        timer:sleep(1200),
-        ?assertEqual(1, couch_proc_manager:get_proc_count())
-    end).
+should_reduce_pool_of_tagged_processes_on_idle(_) ->
+    %% os_process_idle_limit is in sec
+    cfg_set("os_process_idle_limit", "1"),
 
+    Db = <<"reduce_pool_on_idle_db">>,
+    Client1 = spawn_client(Db, <<"ddoc1">>),
+    Client2 = spawn_client(Db, <<"ddoc2">>),
+    Client3 = spawn_client(Db, <<"ddoc3">>),
 
-should_not_return_broken_process_to_the_pool() ->
-    ?_test(begin
-        config:set("query_server_config",
-            "os_process_soft_limit", "1", false),
-        ok = confirm_config("os_process_soft_limit", "1"),
+    ?assertEqual(ok, ping_client(Client1)),
+    ?assertEqual(ok, ping_client(Client2)),
+    ?assertEqual(ok, ping_client(Client3)),
 
-        config:set("query_server_config",
-            "os_process_limit", "1", false),
-        ok = confirm_config("os_process_limit", "1"),
+    ?assertEqual(3, couch_proc_manager:get_proc_count()),
 
-        DDoc1 = ddoc(<<"_design/ddoc1">>),
+    stop_clients([Client1, Client2, Client3]),
 
-        meck:reset(couch_os_process),
+    timer:sleep(1000),
+    wait_process_count(0).
 
-        ?assertEqual(0, couch_proc_manager:get_proc_count()),
-        ok = couch_query_servers:with_ddoc_proc(DDoc1, fun(_) -> ok end),
-        ?assertEqual(0, meck:num_calls(couch_os_process, stop, 1)),
-        ?assertEqual(1, couch_proc_manager:get_proc_count()),
+oldest_tagged_process_is_reaped(_) ->
+    Client1 = spawn_client(<<"db1">>, <<"ddoc1">>),
+    Client2 = spawn_client(<<"db2">>, <<"ddoc1">>),
+    Client3 = spawn_client(<<"db3">>, <<"ddoc1">>),
 
-        ?assertError(bad, couch_query_servers:with_ddoc_proc(DDoc1, fun(_) ->
+    ?assertEqual(ok, ping_client(Client1)),
+    ?assertEqual(ok, ping_client(Client2)),
+    ?assertEqual(ok, ping_client(Client3)),
+
+    Proc1 = get_client_proc(Client1, "1"),
+    Proc2 = get_client_proc(Client2, "2"),
+    Proc3 = get_client_proc(Client3, "3"),
+
+    ?assert(all_alive_all_different([Proc1, Proc2, Proc3])),
+
+    stop_clients([Client1, Client2, Client3]),
+
+    % All procs should be released back into the pool
+    wait_tagged_idle_count(3),
+
+    % Processes should be alive
+    ?assert(all_alive([Proc1, Proc2, Proc3])),
+
+    % Spawning a new tagged proc with a different tag should kill
+    % the oldest unused proc and spawn a new one
+    Client4 = spawn_client(<<"db4">>, <<"ddoc1">>),
+    ?assertEqual(ok, ping_client(Client4)),
+    Proc4 = get_client_proc(Client4, "4"),
+
+    ?assert(all_alive_all_different([Proc2, Proc3, Proc4])),
+    ?assertNot(is_process_alive(Proc1#proc.pid)),
+
+    ?assertEqual(ok, stop_client(Client4)).
+
+untagged_process_is_replenished(_) ->
+    Client1 = spawn_client(),
+    Client2 = spawn_client(),
+
+    ?assertEqual(ok, ping_client(Client1)),
+    ?assertEqual(ok, ping_client(Client2)),
+
+    Proc1 = get_client_proc(Client1, "1"),
+    Proc2 = get_client_proc(Client2, "2"),
+
+    ?assert(all_alive_all_different([Proc1, Proc2])),
+
+    stop_clients([Client1, Client2]),
+
+    % All procs should be released back into the pool
+    % and they are now all untagged idle
+    wait_idle_count(2),
+    ?assertEqual(0, tagged_idle_count()),
+
+    % Processes should still be alive
+    ?assert(all_alive([Proc1, Proc2])),
+
+    % Spawning a new tagged proc should tag one of the procs
+    % and also asynchronously replenish the untagged pool
+    Client3 = spawn_client(<<"db">>, <<"ddoc1">>),
+    ?assertEqual(ok, ping_client(Client3)),
+
+    % The process is one of the previously untagged ones
+    Proc3 = get_client_proc(Client3, "3"),
+    Pid3 = Proc3#proc.pid,
+    ?assert(lists:member(Pid3, proc_pids([Proc1, Proc2]))),
+
+    % wait for replinishment
+    wait_idle_count(2),
+
+    ?assertEqual(ok, stop_client(Client3)).
+
+exact_ddoc_tagged_process_is_picked_first(_) ->
+    Client1 = spawn_client(<<"db">>, <<"ddoc1">>),
+    Client2 = spawn_client(<<"db">>, <<"ddoc2">>),
+
+    ?assertEqual(ok, ping_client(Client1)),
+    ?assertEqual(ok, ping_client(Client2)),
+
+    Proc1 = get_client_proc(Client1, "1"),
+    Proc2 = get_client_proc(Client2, "2"),
+
+    ?assert(all_alive_all_different([Proc1, Proc2])),
+
+    stop_clients([Client1, Client2]),
+
+    % All procs should be released back into the pool
+    % and they now tagged and idle
+    wait_tagged_idle_count(2),
+    wait_idle_count(2),
+
+    % Processes should still be alive
+    ?assert(all_alive([Proc1, Proc2])),
+
+    % Spawning a new tagged proc should pick the one with
+    % matching ddoc
+    Client3 = spawn_client(<<"db">>, <<"ddoc1">>),
+    ?assertEqual(ok, ping_client(Client3)),
+    Proc3 = get_client_proc(Client3, "3"),
+    ?assertEqual(Proc1#proc.pid, Proc3#proc.pid),
+
+    ?assertEqual(ok, stop_client(Client3)).
+
+db_tagged_process_is_second_choice(_) ->
+    Client1 = spawn_client(<<"db1">>, <<"ddoc1">>),
+    Client2 = spawn_client(<<"db2">>, <<"ddoc2">>),
+
+    ?assertEqual(ok, ping_client(Client1)),
+    ?assertEqual(ok, ping_client(Client2)),
+
+    Proc1 = get_client_proc(Client1, "1"),
+    Proc2 = get_client_proc(Client2, "2"),
+
+    ?assert(all_alive_all_different([Proc1, Proc2])),
+
+    stop_clients([Client1, Client2]),
+
+    % All procs should be released back into the pool
+    % and they now tagged and idle
+    wait_tagged_idle_count(2),
+    wait_idle_count(2),
+
+    % Processes should still be alive
+    ?assert(all_alive([Proc1, Proc2])),
+
+    % Spawning a new tagged proc should pick the one with
+    % the matching ddoc
+    Client3 = spawn_client(<<"db1">>, <<"ddoc3">>),
+    ?assertEqual(ok, ping_client(Client3)),
+    Proc3 = get_client_proc(Client3, "3"),
+    ?assertEqual(Proc1#proc.pid, Proc3#proc.pid),
+
+    ?assertEqual(ok, stop_client(Client3)).
+
+if_no_tagged_process_found_new_must_be_spawned(_) ->
+    Client1 = spawn_client(<<"db1">>, <<"ddoc">>),
+    Client2 = spawn_client(<<"db2">>, <<"ddoc">>),
+
+    ?assertEqual(ok, ping_client(Client1)),
+    ?assertEqual(ok, ping_client(Client2)),
+
+    Proc1 = get_client_proc(Client1, "1"),
+    Proc2 = get_client_proc(Client2, "2"),
+
+    ?assert(all_alive_all_different([Proc1, Proc2])),
+
+    stop_clients([Client1, Client2]),
+
+    % All procs should be released back into the pool
+    % and they now tagged and idle
+    wait_tagged_idle_count(2),
+    wait_idle_count(2),
+
+    % Processes should still be alive
+    ?assert(all_alive([Proc1, Proc2])),
+
+    % If new tagged process with new db should spawn
+    % new process never pick up an existing one
+    Client3 = spawn_client(<<"db3">>, <<"ddoc">>),
+    ?assertEqual(ok, ping_client(Client3)),
+    Proc3 = get_client_proc(Client3, "3"),
+    ?assertNotEqual(Proc1#proc.pid, Proc3#proc.pid),
+    ?assertNotEqual(Proc2#proc.pid, Proc3#proc.pid),
+
+    % db1 and db2 procs should still be sitting idle
+    ?assertEqual(2, tagged_idle_count()),
+
+    % After 3rd proc returns to the pool there should
+    % be 3 tagged idle processes
+    ?assertEqual(ok, stop_client(Client3)),
+    wait_tagged_idle_count(3),
+    ?assertEqual(3, idle_count()).
+
+db_tag_none_works(_) ->
+    cfg_set("db_tag", "none"),
+    Client1 = spawn_client(undefined, <<"ddoc1">>),
+    Client2 = spawn_client(undefined, <<"ddoc2">>),
+
+    ?assertEqual(ok, ping_client(Client1)),
+    ?assertEqual(ok, ping_client(Client2)),
+
+    Proc1 = get_client_proc(Client1, "1"),
+    Proc2 = get_client_proc(Client2, "2"),
+
+    ?assert(all_alive_all_different([Proc1, Proc2])),
+
+    stop_clients([Client1, Client2]),
+
+    % All procs should be released back into the pool
+    % they should be untagged effectively
+    wait_idle_count(2),
+    ?assertEqual(0, tagged_idle_count()),
+
+    % Processes should still be alive
+    ?assert(all_alive([Proc1, Proc2])),
+
+    % If new tagged process with new db should spawn
+    % new process and pick based on ddoc id matching
+    Client3 = spawn_client(undefined, <<"ddoc1">>),
+    ?assertEqual(ok, ping_client(Client3)),
+    Proc3 = get_client_proc(Client3, "3"),
+    ?assertEqual(Proc1#proc.pid, Proc3#proc.pid),
+    ?assertNotEqual(Proc2#proc.pid, Proc3#proc.pid),
+
+    wait_idle_count(1),
+
+    % After 3rd client stop there should be 2 idle
+    % untagged procs
+    ?assertEqual(ok, stop_client(Client3)),
+    wait_idle_count(2),
+    ?assertEqual(0, tagged_idle_count()).
+
+stale_procs_are_cleaned(_) ->
+    Client1 = spawn_client(),
+    Client2 = spawn_client(),
+
+    ?assertEqual(ok, ping_client(Client1)),
+    ?assertEqual(ok, ping_client(Client2)),
+
+    Proc1 = get_client_proc(Client1, "1"),
+    Proc2 = get_client_proc(Client2, "2"),
+
+    ?assert(all_alive_all_different([Proc1, Proc2])),
+
+    ?assertEqual(0, couch_proc_manager:get_stale_proc_count()),
+    ?assertEqual(ok, couch_proc_manager:reload()),
+    ?assertEqual(2, couch_proc_manager:get_stale_proc_count()),
+
+    stop_clients([Client1, Client2]),
+    ?assertEqual(ok, couch_proc_manager:terminate_stale_procs()),
+    wait_idle_count(0),
+    ?assertEqual(0, couch_proc_manager:get_proc_count()).
+
+bad_query_language(_) ->
+    Expect = {unknown_query_language, <<"bad">>},
+    ?assertThrow(Expect, couch_query_servers:get_os_process(<<"bad">>)).
+
+should_not_return_broken_process_to_the_pool(_) ->
+    cfg_set("os_process_soft_limit", "1"),
+    cfg_set("os_process_limit", "1"),
+
+    Db = <<"thedb">>,
+    DDoc1 = ddoc(<<"_design/ddoc1">>),
+
+    meck:reset(couch_os_process),
+
+    ?assertEqual(0, couch_proc_manager:get_proc_count()),
+    ok = couch_query_servers:with_ddoc_proc(Db, DDoc1, fun(_) -> ok end),
+    ?assertEqual(0, meck:num_calls(couch_os_process, stop, 1)),
+    ?assertEqual(1, couch_proc_manager:get_proc_count()),
+
+    ?assertError(
+        bad,
+        couch_query_servers:with_ddoc_proc(Db, DDoc1, fun(_) ->
             error(bad)
-        end)),
-        ?assertEqual(1, meck:num_calls(couch_os_process, stop, 1)),
+        end)
+    ),
+    ?assertEqual(1, meck:num_calls(couch_os_process, stop, 1)),
 
-        WaitFun = fun() ->
-            case couch_proc_manager:get_proc_count() of
-                0 -> ok;
-                N when is_integer(N), N > 0 -> wait
-            end
-        end,
-        case test_util:wait(WaitFun, 5000) of
-            timeout -> error(timeout);
-            _ -> ok
-        end,
-        ?assertEqual(0, couch_proc_manager:get_proc_count()),
+    WaitFun = fun() ->
+        case couch_proc_manager:get_proc_count() of
+            0 -> ok;
+            N when is_integer(N), N > 0 -> wait
+        end
+    end,
+    case test_util:wait(WaitFun, 5000) of
+        timeout -> error(timeout);
+        _ -> ok
+    end,
+    ?assertEqual(0, couch_proc_manager:get_proc_count()),
 
-        DDoc2 = ddoc(<<"_design/ddoc2">>),
-        ok = couch_query_servers:with_ddoc_proc(DDoc2, fun(_) -> ok end),
-        ?assertEqual(1, meck:num_calls(couch_os_process, stop, 1)),
-        ?assertEqual(1, couch_proc_manager:get_proc_count())
-    end).
+    DDoc2 = ddoc(<<"_design/ddoc2">>),
+    ok = couch_query_servers:with_ddoc_proc(Db, DDoc2, fun(_) -> ok end),
+    ?assertEqual(1, meck:num_calls(couch_os_process, stop, 1)),
+    ?assertEqual(1, couch_proc_manager:get_proc_count()).
 
 
 ddoc(DDocId) ->
@@ -269,26 +521,14 @@
     config:set("native_query_servers", "enable_erlang_query_server", "true", false),
     config:set("query_server_config", "os_process_limit", "3", false),
     config:set("query_server_config", "os_process_soft_limit", "2", false),
-    ok = confirm_config("os_process_soft_limit", "2").
+    ok.
 
-confirm_config(Key, Value) ->
-    confirm_config(Key, Value, 0).
-
-confirm_config(Key, Value, Count) ->
-    case config:get("query_server_config", Key) of
-        Value ->
-            ok;
-        _ when Count > 10 ->
-            erlang:error({config_setup, [
-                {module, ?MODULE},
-                {line, ?LINE},
-                {value, timeout}
-            ]});
-        _ ->
-            %% we need to wait to let gen_server:cast finish
-            timer:sleep(10),
-            confirm_config(Key, Value, Count + 1)
-    end.
+teardown_config() ->
+    config:delete("native_query_servers", "enable_erlang_query_server", false),
+    config:delete("query_server_config", "os_process_limit", false),
+    config:delete("query_server_config", "os_process_soft_limit", false),
+    config:delete("query_server_config", "db_tag", false),
+    ok.
 
 spawn_client() ->
     Parent = self(),
@@ -299,13 +539,13 @@
     end),
     {Pid, Ref}.
 
-spawn_client(DDocId) ->
+spawn_client(Db, DDocId) ->
     Parent = self(),
     Ref = make_ref(),
     Pid = spawn(fun() ->
         DDocKey = {DDocId, <<"1-abcdefgh">>},
         DDoc = #doc{body={[{<<"language">>, <<"erlang">>}]}},
-        Proc = couch_query_servers:get_ddoc_process(DDoc, DDocKey),
+        Proc = couch_query_servers:get_ddoc_process(DDoc, Db, DDocKey),
         loop(Parent, Ref, Proc)
     end),
     {Pid, Ref}.
@@ -332,20 +572,30 @@
     end.
 
 stop_client({Pid, Ref}) ->
+    MRef = erlang:monitor(process, Pid),
     Pid ! stop,
     receive
         {stop, Ref} ->
+            receive
+                {'DOWN', MRef, process, Pid, _} -> ok
+            end,
             ok
     after ?TIMEOUT ->
+        erlang:demonitor(MRef, [flush]),
         timeout
     end.
 
 kill_client({Pid, Ref}) ->
+    MRef = erlang:monitor(process, Pid),
     Pid ! die,
     receive
         {die, Ref} ->
+            receive
+                {'DOWN', MRef, process, Pid, _} -> ok
+            end,
             ok
     after ?TIMEOUT ->
+        erlang:demonitor(MRef, [flush]),
         timeout
     end.
 
@@ -364,3 +614,68 @@
             Parent ! {die, Ref},
             exit(some_error)
     end.
+
+proc_pids(Procs) ->
+    [P#proc.pid || P <- Procs].
+
+all_alive(Procs) ->
+    lists:all(fun is_process_alive/1, proc_pids(Procs)).
+
+all_different(Procs) ->
+    lists:usort(proc_pids(Procs)) =:= lists:sort(proc_pids(Procs)).
+
+all_alive_all_different(Procs) ->
+    all_alive(Procs) andalso all_different(Procs).
+
+idle_count() ->
+    ets:info(couch_proc_manager_idle_by_db, size).
+
+tagged_idle_count() ->
+    ets:info(couch_proc_manager_idle_access, size).
+
+stop_clients(Clients) ->
+    Fun = fun(C) -> ?assertEqual(ok, stop_client(C)) end,
+    lists:map(Fun, Clients).
+
+wait_tagged_idle_count(N) ->
+    WaitFun = fun() ->
+        case tagged_idle_count() == N of
+            true -> ok;
+            false -> wait
+        end
+    end,
+    case test_util:wait(WaitFun, 5000) of
+        timeout -> error(timeout);
+        _ -> ok
+    end,
+    ?assertEqual(N, tagged_idle_count()).
+
+wait_idle_count(N) ->
+    WaitFun = fun() ->
+        case idle_count() == N of
+            true -> ok;
+            false -> wait
+        end
+    end,
+    case test_util:wait(WaitFun, 5000) of
+        timeout -> error(timeout);
+        _ -> ok
+    end,
+    ?assertEqual(N, idle_count()).
+
+wait_process_count(N) ->
+    WaitFun = fun() ->
+        case couch_proc_manager:get_proc_count() == N of
+            true -> ok;
+            false -> wait
+        end
+    end,
+    case test_util:wait(WaitFun, 5000) of
+        timeout -> error(timeout);
+        _ -> ok
+    end,
+    ?assertEqual(N, couch_proc_manager:get_proc_count()).
+
+cfg_set(K, V) ->
+    config:set("query_server_config", K, V, false),
+    ok.
diff --git a/src/couch_mrview/src/couch_mrview_show.erl b/src/couch_mrview/src/couch_mrview_show.erl
index 0268b70..16a7597 100644
--- a/src/couch_mrview/src/couch_mrview_show.erl
+++ b/src/couch_mrview/src/couch_mrview_show.erl
@@ -77,7 +77,7 @@
         JsonReq = chttpd_external:json_req_obj(Req, Db, DocId),
         JsonDoc = couch_query_servers:json_doc(Doc),
         [<<"resp">>, ExternalResp] =
-            couch_query_servers:ddoc_prompt(DDoc, [<<"shows">>, ShowName],
+            couch_query_servers:ddoc_prompt(Db, DDoc, [<<"shows">>, ShowName],
                 [JsonDoc, JsonReq]),
         JsonResp = apply_etag(ExternalResp, CurrentEtag),
         chttpd_external:send_external_response(Req, JsonResp)
@@ -122,7 +122,7 @@
     JsonReq = chttpd_external:json_req_obj(Req, Db, DocId),
     JsonDoc = couch_query_servers:json_doc(Doc),
     Cmd = [<<"updates">>, UpdateName],
-    UpdateResp = couch_query_servers:ddoc_prompt(DDoc, Cmd, [JsonDoc, JsonReq]),
+    UpdateResp = couch_query_servers:ddoc_prompt(Db, DDoc, Cmd, [JsonDoc, JsonReq]),
     JsonResp = case UpdateResp of
         [<<"up">>, {NewJsonDoc}, {JsonResp0}] ->
             case chttpd:header_value(
@@ -196,7 +196,7 @@
     end,
     Args = Args0#mrargs{preflight_fun=ETagFun},
     couch_httpd:etag_maybe(Req, fun() ->
-        couch_query_servers:with_ddoc_proc(DDoc, fun(QServer) ->
+        couch_query_servers:with_ddoc_proc(Db, DDoc, fun(QServer) ->
             Acc = #lacc{db=Db, req=Req, qserver=QServer, lname=LName},
             case VName of
               <<"_all_docs">> ->
diff --git a/src/ddoc_cache/src/ddoc_cache_entry_validation_funs.erl b/src/ddoc_cache/src/ddoc_cache_entry_validation_funs.erl
index 2182dea..4cbbd38 100644
--- a/src/ddoc_cache/src/ddoc_cache_entry_validation_funs.erl
+++ b/src/ddoc_cache/src/ddoc_cache_entry_validation_funs.erl
@@ -32,7 +32,7 @@
 recover(DbName) ->
     {ok, DDocs} = fabric:design_docs(mem3:dbname(DbName)),
     Funs = lists:flatmap(fun(DDoc) ->
-        case couch_doc:get_validate_doc_fun(DDoc) of
+        case couch_doc:get_validate_doc_fun(DbName, DDoc) of
             nil -> [];
             Fun -> [Fun]
         end