Use ICU to compare POSTed view keys when necessary

It's possible for user-supplied view keys to compare equal under ICU
collation but not under Erlang's =:=. Since dict uses =:= for equality
this will result in runtime errors unless we fall back to ICU when
necessary and appropriate.

COUCHDB-2932
diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl
index 3853d3e..7e27637 100644
--- a/src/fabric_view_map.erl
+++ b/src/fabric_view_map.erl
@@ -140,37 +140,97 @@
         query_args = #mrargs{direction = Dir},
         counters = Counters0,
         rows = Rows0,
-        keys = KeyDict,
+        keys = KeyDict0,
         collation = Collation
     } = State,
-    Rows = merge_row(
+    {Rows, KeyDict} = merge_row(
         Dir,
         Collation,
-        KeyDict,
+        KeyDict0,
         Row#view_row{worker={Worker, From}},
         Rows0
     ),
     Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
-    State1 = State#collector{rows=Rows, counters=Counters1},
+    State1 = State#collector{rows=Rows, counters=Counters1, keys=KeyDict},
     fabric_view:maybe_send_row(State1);
 
 handle_message(complete, Worker, State) ->
     Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
     fabric_view:maybe_send_row(State#collector{counters = Counters}).
 
-merge_row(Dir, Collation, undefined, Row, Rows) ->
-    lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) ->
-        compare(Dir, Collation, {KeyA, IdA}, {KeyB, IdB})
-    end, [Row], Rows);
-merge_row(_, _, KeyDict, Row, Rows) ->
-    lists:merge(fun(#view_row{key=A, id=IdA}, #view_row{key=B, id=IdB}) ->
-        if A =:= B -> IdA < IdB; true ->
-            dict:fetch(A, KeyDict) < dict:fetch(B, KeyDict)
-        end
-    end, [Row], Rows).
+merge_row(Dir, Collation, undefined, Row, Rows0) ->
+    Rows1 = lists:merge(
+        fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) ->
+            compare(Dir, Collation, {KeyA, IdA}, {KeyB, IdB})
+        end,
+        [Row],
+        Rows0
+    ),
+    {Rows1, undefined};
+merge_row(Dir, Collation, KeyDict0, Row, Rows0) ->
+    CmpFun = case Collation of
+        <<"raw">> ->
+            fun (A, A) -> 0;
+                (A, B) -> case A < B of
+                    true -> -1;
+                    false -> 1
+                end
+            end;
+        _ ->
+            fun couch_ejson_compare:less/2
+    end,
+    case maybe_update_keydict(Row#view_row.key, KeyDict0, CmpFun) of
+        undefined ->
+            {Rows0, KeyDict0};
+        KeyDict1 ->
+            Rows1 = lists:merge(
+                fun(#view_row{key=A, id=IdA}, #view_row{key=B, id=IdB}) ->
+                    case {Dir, CmpFun(A, B)} of
+                        {fwd, 0} ->
+                            IdA < IdB;
+                        {rev, 0} ->
+                            IdB < IdA;
+                        {fwd, _} ->
+                            dict:fetch(A, KeyDict1) < dict:fetch(B, KeyDict1);
+                        {rev, _} ->
+                            dict:fetch(B, KeyDict1) < dict:fetch(A, KeyDict1)
+                    end
+                end,
+                [Row],
+                Rows0
+            ),
+            {Rows1, KeyDict1}
+    end.
 
 compare(_, _, A, A) -> true;
 compare(fwd, <<"raw">>, A, B) -> A < B;
 compare(rev, <<"raw">>, A, B) -> B < A;
 compare(fwd, _, A, B) -> couch_ejson_compare:less_json_ids(A, B);
 compare(rev, _, A, B) -> couch_ejson_compare:less_json_ids(B, A).
+
+% KeyDict captures the user-supplied ordering of keys POSTed by the user by
+% mapping to integers (see fabric_view:keydict/1). It's possible that these keys
+% do not compare equal (i.e., =:=, used by dict) to those returned by the view
+% but are in fact equal under ICU. In this case (assuming the view uses ICU
+% collation) we must update KeyDict with a mapping from the ICU-equal key to its
+% appropriate value.
+maybe_update_keydict(Key, KeyDict, CmpFun) ->
+    case dict:find(Key, KeyDict) of
+        {ok, _} ->
+            KeyDict;
+        error ->
+            case key_index(Key, dict:to_list(KeyDict), CmpFun) of
+                undefined ->
+                    undefined;
+                Value ->
+                    dict:store(Key, Value, KeyDict)
+            end
+    end.
+
+key_index(_, [], _) ->
+    undefined;
+key_index(KeyA, [{KeyB, Value}|KVs], CmpFun) ->
+    case CmpFun(KeyA, KeyB) of
+        0 -> Value;
+        _ -> key_index(KeyA, KVs, CmpFun)
+    end.