IGNITE-14893 Fixed IndexOutOfBoundsException in GridCacheWriteBehindStore flushers lookup by key. Fixes #9177

Signed-off-by: Slava Koptilin <slava.koptilin@gmail.com>
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
index d3da3a7..d9120ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
@@ -222,7 +222,7 @@
      */
     public void setFlushThreadCount(int flushThreadCnt) {
         this.flushThreadCnt = flushThreadCnt;
-        this.flushThreadCntIsPowerOfTwo = (flushThreadCnt & (flushThreadCnt - 1)) == 0;
+        this.flushThreadCntIsPowerOfTwo = U.isPow2(flushThreadCnt);
     }
 
     /**
@@ -660,20 +660,29 @@
     }
 
     /**
-     * Return flusher by by key.
+     * Return flusher by key.
      *
      * @param key Key for search.
      * @return flusher.
      */
     private Flusher flusher(K key) {
-        int h, idx;
+        return flushThreads[resolveFlusherByKeyHash(key.hashCode())];
+    }
 
-        if (flushThreadCntIsPowerOfTwo)
-            idx = ((h = key.hashCode()) ^ (h >>> 16)) & (flushThreadCnt - 1);
-        else
-            idx = ((h = key.hashCode()) ^ (h >>> 16)) % flushThreadCnt;
+    /**
+     * Lookup flusher index by provided key hash using
+     * approach similar to {@link HashMap#hash(Object)}. In case
+     * <code>size</code> is not a power of 2 we fallback to modulo operation.
+     *
+     * @param hash Object hash.
+     * @return Calculated flucher index [0..flushThreadCnt).
+     */
+    int resolveFlusherByKeyHash(int hash) {
+        int h = (hash ^ (hash >>> 16));
 
-        return flushThreads[idx];
+        return flushThreadCntIsPowerOfTwo
+            ? h & (flushThreadCnt - 1)
+            : U.hashToIndex(h, flushThreadCnt);
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 069ca37..deb83061 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -12233,4 +12233,16 @@
             return size;
         }
     }
+
+    /**
+     * Maps object hash to some index between 0 and specified size via modulo operation.
+     *
+     * @param hash Object hash.
+     * @param size Size greater than 0.
+     * @return Calculated index in range [0..size).
+     */
+    public static int hashToIndex(int hash, int size) {
+        return safeAbs(hash % size);
+    }
+
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
index 5a00327..77d91b6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
@@ -36,6 +36,21 @@
  * This class provides basic tests for {@link org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore}.
  */
 public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStoreAbstractSelfTest {
+    /** Sizes set for {@link #testResolveFlusherByKeyHash()}. */
+    private static final int[] DISTRIBUTION_TESTING_SIZES = new int[] {
+        1, 2, 4, 8, 16, 32, 64, 128, 256, 0x10000, 0x80000,
+
+        3, 5, 7, 9, 10, 12, 15, 17, 19, 23, 29, 31, 37, 66, 146, 100500
+    };
+
+    /** Hashes set for {@link #testResolveFlusherByKeyHash()}. */
+    private static final int[] DISTRIBUTION_TESTING_HASHES = new int[] {
+        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 13, 17, 19, 23, 29, 31, 37, 123, 100500,
+        0xdeadbeef,
+        "abc".hashCode(),
+        "accb2e8ea33e4a89b4189463cacc3c4e".hashCode(),
+    };
+
     /**
      * Tests correct store (with write coalescing) shutdown when underlying store fails.
      *
@@ -308,9 +323,8 @@
             log().info(">>> [putCnt = " + actualPutCnt.get() + ", delegatePutCnt=" + delegatePutCnt + "]");
 
             assertTrue("No puts were made to the underlying store", delegatePutCnt > 0);
-            if (store.getWriteCoalescing()) {
+            if (store.getWriteCoalescing())
                 assertTrue("Too many puts were made to the underlying store", delegatePutCnt < actualPutCnt.get() / 10);
-            }
             else {
                 assertTrue(
                     "Too few puts cnt=" + actualPutCnt.get() + " << storePutCnt=" + delegatePutCnt,
@@ -453,4 +467,28 @@
 
         assertTrue("Store map key set: " + underlyingMap.keySet(), F.eqOrdered(underlyingMap.keySet(), intList));
     }
+
+    /**
+     * Test to verify the {@link GridCacheWriteBehindStore#resolveFlusherByKeyHash(int)}.
+     */
+    @Test
+    public void testResolveFlusherByKeyHash() {
+        store = new GridCacheWriteBehindStore<>(null, "", "", log, delegate);
+
+        Arrays.stream(DISTRIBUTION_TESTING_SIZES).forEach(size -> {
+            store.setFlushThreadCount(size);
+
+            Arrays.stream(DISTRIBUTION_TESTING_HASHES).forEach(hash -> {
+                hashToIndexAdvancedDistributionAssertion(hash, size);
+                hashToIndexAdvancedDistributionAssertion((-1) * hash, size);
+            });
+        });
+    }
+
+    private void hashToIndexAdvancedDistributionAssertion(int hash, int size) {
+        int idx = store.resolveFlusherByKeyHash(hash);
+
+        assertTrue("index=" + idx + " is negative, when hash=" + hash + ", size=" + size, idx >= 0);
+        assertTrue("index=" + idx + " is bigger than " + size + " bound, when hash=" + hash + ", size=" + size, idx < size);
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
index 3f1cf07..2ab8f97 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
@@ -1527,4 +1527,32 @@
          */
         void accept(@Nullable T t) throws Exception;
     }
+
+    /**
+     * Test to verify the {@link U#hashToIndex(int, int)}.
+     */
+    @Test
+    public void testHashToIndexDistribution() {
+        assertEquals(0, U.hashToIndex(1, 1));
+        assertEquals(0, U.hashToIndex(2, 1));
+        assertEquals(1, U.hashToIndex(1, 2));
+        assertEquals(0, U.hashToIndex(2, 2));
+
+        assertEquals(1, U.hashToIndex(1, 4));
+        assertEquals(2, U.hashToIndex(2, 4));
+        assertEquals(3, U.hashToIndex(3, 4));
+        assertEquals(0, U.hashToIndex(4, 4));
+        assertEquals(1, U.hashToIndex(5, 4));
+        assertEquals(0, U.hashToIndex(8, 4));
+        assertEquals(3, U.hashToIndex(15, 4));
+
+        assertEquals(1, U.hashToIndex(-1, 4));
+        assertEquals(2, U.hashToIndex(-2, 4));
+        assertEquals(3, U.hashToIndex(-3, 4));
+        assertEquals(0, U.hashToIndex(-4, 4));
+        assertEquals(1, U.hashToIndex(-5, 4));
+        assertEquals(0, U.hashToIndex(-8, 4));
+        assertEquals(3, U.hashToIndex(-15, 4));
+    }
+
 }