IGNITE-14177 Fixed NULLS LAST/FIRST sorting if query parallelism is set (#8875)
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/SortedReduceIndexAdapter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/SortedReduceIndexAdapter.java
index dd430a7..8248b8d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/SortedReduceIndexAdapter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/SortedReduceIndexAdapter.java
@@ -22,10 +22,13 @@
import org.h2.engine.Session;
import org.h2.index.Index;
import org.h2.index.IndexType;
+import org.h2.result.SearchRow;
import org.h2.result.SortOrder;
import org.h2.table.Column;
import org.h2.table.IndexColumn;
import org.h2.table.TableFilter;
+import org.h2.value.Value;
+import org.h2.value.ValueNull;
/**
* H2 {@link Index} adapter for {@link SortedReducer}.
@@ -64,4 +67,37 @@
HashSet<Column> allColumnsSet) {
return getCostRangeIndex(masks, getRowCountApproximation(), filters, filter, sortOrder, false, allColumnsSet);
}
+
+ /** {@inheritDoc} */
+ @Override public int compareRows(SearchRow rowData, SearchRow compare) {
+ if (rowData == compare)
+ return 0;
+
+ for (int i = 0, len = indexColumns.length; i < len; i++) {
+ int index = columnIds[i];
+ int sortType = indexColumns[i].sortType;
+ Value v1 = rowData.getValue(index);
+ Value v2 = compare.getValue(index);
+
+ if (v1 == null || v2 == null)
+ return 0;
+
+ else if (v1 == v2) continue;
+
+ else if (v1 == ValueNull.INSTANCE || v2 == ValueNull.INSTANCE) {
+ if ((sortType & SortOrder.NULLS_FIRST) != 0)
+ return v1 == ValueNull.INSTANCE ? -1 : 1;
+ else if ((sortType & SortOrder.NULLS_LAST) != 0)
+ return v1 == ValueNull.INSTANCE ? 1 : -1;
+ }
+
+ int comp = table.compareTypeSafe(v1, v2);
+ if ((sortType & SortOrder.DESCENDING) != 0)
+ comp = -comp;
+
+ if (comp != 0)
+ return comp;
+ }
+ return 0;
+ }
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheParallelismQuerySortOrderTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheParallelismQuerySortOrderTest.java
new file mode 100644
index 0000000..c4751dd
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheParallelismQuerySortOrderTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static java.util.Collections.reverseOrder;
+import static java.util.Comparator.naturalOrder;
+import static java.util.Comparator.nullsFirst;
+import static java.util.Comparator.nullsLast;
+
+/**
+ * Query parallelism sorting tests.
+ */
+public class IgniteCacheParallelismQuerySortOrderTest extends GridCommonAbstractTest {
+
+ /** Test data */
+ private static final Map<Long, Person> CACHE_DATA = new HashMap<>();
+
+ /** Sorted ID by city */
+ private static final List<Long> CITY_SORTED_ASC;
+
+ /** Sorted ID by city DESC */
+ private static final List<Long> CITY_SORTED_DESC;
+
+ /** ID where city is null */
+ private static final List<Long> NULLS_ID = Arrays.asList(3L, 5L, 6L);
+
+ /**
+ * data initialization block
+ */
+ static {
+ CACHE_DATA.put(1L, new Person(1L, "Andrew", "London"));
+ CACHE_DATA.put(2L, new Person(2L, "John", "Amsterdam"));
+ CACHE_DATA.put(3L, new Person(3L, "Tom", null));
+ CACHE_DATA.put(4L, new Person(4L, "Ben", "Washington"));
+ CACHE_DATA.put(5L, new Person(5L, "Stan", null));
+ CACHE_DATA.put(6L, new Person(6L, "Leonard", null));
+ CACHE_DATA.put(7L, new Person(7L, "Richard", "Ryazan"));
+ CACHE_DATA.put(8L, new Person(8L, "Tom", "Paris"));
+ CACHE_DATA.put(9L, new Person(9L, "Andrew", "Moscow"));
+
+ CITY_SORTED_ASC = CACHE_DATA.values().stream()
+ .filter(person -> person.city() != null)
+ .sorted(Comparator.comparing(Person::city))
+ .map(Person::id)
+ .collect(Collectors.toList());
+
+ CITY_SORTED_DESC = new ArrayList<>(CITY_SORTED_ASC);
+ Collections.reverse(CITY_SORTED_DESC);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+ startGrids(1);
+ IgniteCache<Long, Person> cache = jcache(Long.class, Person.class);
+ cache.putAll(CACHE_DATA);
+ }
+
+ /**
+ * @param clsK Key class.
+ * @param clsV Value class.
+ * @return cache instance
+ */
+ protected <K, V> IgniteCache<K, V> jcache(Class<K> clsK, Class<V> clsV) {
+ return jcache(grid(0), cacheConfiguration(), clsK, clsV);
+ }
+
+ /**
+ * @return cache configuration
+ */
+ private CacheConfiguration cacheConfiguration() {
+ CacheConfiguration cc = defaultCacheConfiguration();
+ cc.setQueryParallelism(4);
+ return cc;
+ }
+
+ /**
+ * ASC NULLS LAST sorting test
+ */
+ @Test
+ public void testAscNullsLast() {
+ IgniteCache<Long, Person> cache = jcache(Long.class, Person.class);
+
+ List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT * FROM Person ORDER BY city ASC NULLS LAST")).getAll();
+
+ List<Long> resList = res.stream().map(val -> (Long)val.get(0)).collect(Collectors.toList());
+ Assert.assertTrue(CollectionUtils.isEqualCollection(NULLS_ID, resList.subList(resList.size() - NULLS_ID.size(), resList.size())));
+ Assert.assertEquals(CITY_SORTED_ASC, resList.subList(0, resList.size() - NULLS_ID.size()));
+ }
+
+ /**
+ * DESC NULLS LAST sorting test
+ */
+ @Test
+ public void testDescNullsLast() {
+ IgniteCache<Long, Person> cache = jcache(Long.class, Person.class);
+
+ List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT * FROM Person ORDER BY city DESC NULLS LAST")).getAll();
+
+ List<Long> resList = res.stream().map(val -> (Long)val.get(0)).collect(Collectors.toList());
+ Assert.assertTrue(CollectionUtils.isEqualCollection(NULLS_ID, resList.subList(resList.size() - NULLS_ID.size(), resList.size())));
+ Assert.assertEquals(CITY_SORTED_DESC, resList.subList(0, resList.size() - NULLS_ID.size()));
+ }
+
+ /**
+ * ASC NULLS FIRST sorting test
+ */
+ @Test
+ public void testAscNullsFirst() {
+ IgniteCache<Long, Person> cache = jcache(Long.class, Person.class);
+
+ List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT * FROM Person ORDER BY city ASC NULLS FIRST")).getAll();
+
+ List<Long> resList = res.stream().map(val -> (Long)val.get(0)).collect(Collectors.toList());
+ Assert.assertTrue(CollectionUtils.isEqualCollection(NULLS_ID, resList.subList(0, NULLS_ID.size())));
+ Assert.assertEquals(CITY_SORTED_ASC, resList.subList(NULLS_ID.size(), resList.size()));
+ }
+
+ /**
+ * DESC NULLS FIRST sorting test
+ */
+ @Test
+ public void testDescNullsFirst() {
+ IgniteCache<Long, Person> cache = jcache(Long.class, Person.class);
+
+ List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT * FROM Person ORDER BY city DESC NULLS FIRST")).getAll();
+
+ List<Long> resList = res.stream().map(val -> (Long)val.get(0)).collect(Collectors.toList());
+ Assert.assertTrue(CollectionUtils.isEqualCollection(NULLS_ID, resList.subList(0, NULLS_ID.size())));
+ Assert.assertEquals(CITY_SORTED_DESC, resList.subList(NULLS_ID.size(), resList.size()));
+ }
+
+ /**
+ * Sorting by two fields (ASC, DESC NULLS FIRST)
+ */
+ @Test
+ public void fewFieldAscNullsFirstSortTest() {
+ IgniteCache<Long, Person> cache = jcache(Long.class, Person.class);
+
+ List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT * FROM Person ORDER BY name, city DESC NULLS FIRST")).getAll();
+
+ List<Long> resList = res.stream().map(val -> (Long)val.get(0)).collect(Collectors.toList());
+ List<Long> sorted = CACHE_DATA.values().stream()
+ .sorted(Comparator.comparing(Person::name).thenComparing(Person::city, nullsFirst(reverseOrder())))
+ .map(Person::id)
+ .collect(Collectors.toList());
+ Assert.assertEquals(sorted, resList);
+
+ }
+
+ /**
+ * Sorting by two fields (ASC NULLS LAST, DESC)
+ */
+ @Test
+ public void fewFieldsNullsLastDescSortTest() {
+ IgniteCache<Long, Person> cache = jcache(Long.class, Person.class);
+
+ List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT * FROM Person ORDER BY city ASC NULLS LAST, name DESC")).getAll();
+
+ List<Long> resList = res.stream().map(val -> (Long)val.get(0)).collect(Collectors.toList());
+ List<Long> sorted = CACHE_DATA.values().stream()
+ .sorted(Comparator.comparing(Person::city, nullsLast(naturalOrder())).thenComparing(Person::name, reverseOrder()))
+ .map(Person::id)
+ .collect(Collectors.toList());
+ Assert.assertEquals(sorted, resList);
+ }
+
+ /**
+ *
+ */
+ static class Person {
+
+ /** Id. */
+ @QuerySqlField(index = true)
+ private final Long id;
+
+ /** Name. */
+ @QuerySqlField
+ private final String name;
+
+ /** City. */
+ @QuerySqlField
+ private final String city;
+
+ /**
+ * Constructor.
+ *
+ * @param id Long value.
+ * @param name String value.
+ * @param city String value.
+ */
+ Person(Long id, String name, String city) {
+ this.id = id;
+ this.name = name;
+ this.city = city;
+ }
+
+ /**
+ * Gets id.
+ *
+ * @return id.
+ */
+ public Long id() {
+ return id;
+ }
+
+ /**
+ * Gets name.
+ *
+ * @return name.
+ */
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Gets city.
+ *
+ * @return city.
+ */
+ public String city() {
+ return city;
+ }
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index 95800b8..766aeb4 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -82,6 +82,7 @@
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapEvictQueryTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapIndexScanTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheP2pUnmarshallingQueryErrorTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheParallelismQuerySortOrderTest;
import org.apache.ignite.internal.processors.cache.IgniteCachePrimitiveFieldsQuerySelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheQueryH2IndexingLeakTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheQueryIndexSelfTest;
@@ -380,6 +381,7 @@
IgniteCacheAtomicNearEnabledQuerySelfTest.class,
IgniteCachePartitionedQueryP2PDisabledSelfTest.class,
IgniteCachePartitionedQueryEvtsDisabledSelfTest.class,
+ IgniteCacheParallelismQuerySortOrderTest.class,
IgniteCacheUnionDuplicatesTest.class,
IgniteCacheJoinPartitionedAndReplicatedCollocationTest.class,