| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.storage; |
| |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.List; |
| import java.util.stream.Collectors; |
| import java.util.stream.IntStream; |
| import java.util.stream.Stream; |
| import java.util.stream.StreamSupport; |
| import org.apache.ignite.internal.storage.basic.DeleteExactInvokeClosure; |
| import org.apache.ignite.internal.storage.basic.GetAndRemoveInvokeClosure; |
| import org.apache.ignite.internal.storage.basic.GetAndReplaceInvokeClosure; |
| import org.apache.ignite.internal.storage.basic.InsertInvokeClosure; |
| import org.apache.ignite.internal.storage.basic.ReplaceExactInvokeClosure; |
| import org.apache.ignite.internal.storage.basic.SimpleDataRow; |
| import org.apache.ignite.internal.storage.basic.SimpleReadInvokeClosure; |
| import org.apache.ignite.internal.storage.basic.SimpleRemoveInvokeClosure; |
| import org.apache.ignite.internal.storage.basic.SimpleWriteInvokeClosure; |
| import org.apache.ignite.internal.util.Cursor; |
| import org.jetbrains.annotations.NotNull; |
| import org.junit.jupiter.api.Test; |
| |
| import static java.util.Collections.emptyList; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.hamcrest.collection.IsCollectionWithSize.hasSize; |
| import static org.junit.jupiter.api.Assertions.assertArrayEquals; |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertFalse; |
| import static org.junit.jupiter.api.Assertions.assertNotNull; |
| import static org.junit.jupiter.api.Assertions.assertNull; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| |
| /** |
| * Abstract test that covers basic scenarios of the storage API. |
| */ |
| public abstract class AbstractStorageTest { |
| /** Test key. */ |
| private static final String KEY = "key"; |
| |
| /** Test value. */ |
| private static final String VALUE = "value"; |
| |
| /** Storage instance. */ |
| protected Storage storage; |
| |
| /** |
| * Tests that read / write / remove work consistently on the same key. |
| */ |
| @Test |
| public void readWriteRemove() { |
| SearchRow searchRow = searchRow(KEY); |
| |
| assertNull(storage.read(searchRow).value()); |
| |
| DataRow dataRow = dataRow(KEY, VALUE); |
| |
| storage.write(dataRow); |
| |
| assertArrayEquals(dataRow.value().array(), storage.read(searchRow).value().array()); |
| |
| storage.remove(searchRow); |
| |
| assertNull(storage.read(searchRow).value()); |
| } |
| |
| /** |
| * Tests that invoke method works consistently with default read / write / remove closures implementations on the |
| * same key. |
| */ |
| @Test |
| public void invoke() { |
| SearchRow searchRow = searchRow(KEY); |
| |
| SimpleReadInvokeClosure readClosure = new SimpleReadInvokeClosure(); |
| |
| storage.invoke(searchRow, readClosure); |
| |
| assertNull(readClosure.row().value()); |
| |
| DataRow dataRow = dataRow(KEY, VALUE); |
| |
| storage.invoke(searchRow, new SimpleWriteInvokeClosure(dataRow)); |
| |
| storage.invoke(searchRow, readClosure); |
| |
| assertArrayEquals(dataRow.value().array(), readClosure.row().value().array()); |
| |
| storage.invoke(searchRow, new SimpleRemoveInvokeClosure()); |
| |
| storage.invoke(searchRow, readClosure); |
| |
| assertNull(readClosure.row().value()); |
| } |
| |
| /** |
| * Tests that scan operation works properly without filter. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void scanSimple() throws Exception { |
| List<DataRow> list = toList(storage.scan(key -> true)); |
| |
| assertEquals(emptyList(), list); |
| |
| DataRow dataRow1 = dataRow("key1", "value1"); |
| |
| storage.write(dataRow1); |
| |
| list = toList(storage.scan(key -> true)); |
| |
| assertThat(list, hasSize(1)); |
| |
| assertArrayEquals(dataRow1.value().array(), list.get(0).value().array()); |
| |
| DataRow dataRow2 = dataRow("key2", "value2"); |
| |
| storage.write(dataRow2); |
| |
| list = toList(storage.scan(key -> true)); |
| |
| assertThat(list, hasSize(2)); |
| |
| // "key1" and "key2" have the same order both by hash and lexicographically. |
| assertArrayEquals(dataRow1.value().array(), list.get(0).value().array()); |
| assertArrayEquals(dataRow2.value().array(), list.get(1).value().array()); |
| } |
| |
| /** |
| * Tests that scan operation works properly with passed filter. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void scanFiltered() throws Exception { |
| DataRow dataRow1 = dataRow("key1", "value1"); |
| DataRow dataRow2 = dataRow("key2", "value2"); |
| |
| storage.write(dataRow1); |
| storage.write(dataRow2); |
| |
| List<DataRow> list = toList(storage.scan(key -> key.keyBytes()[3] == '1')); |
| |
| assertThat(list, hasSize(1)); |
| |
| assertArrayEquals(dataRow1.value().array(), list.get(0).value().array()); |
| |
| list = toList(storage.scan(key -> key.keyBytes()[3] == '2')); |
| |
| assertThat(list, hasSize(1)); |
| |
| assertArrayEquals(dataRow2.value().array(), list.get(0).value().array()); |
| |
| list = toList(storage.scan(key -> false)); |
| |
| assertTrue(list.isEmpty()); |
| } |
| |
| /** |
| * Tests that {@link InsertInvokeClosure} inserts a data row if there is no existing data row with the same key. |
| */ |
| @Test |
| public void testInsertClosure() { |
| DataRow dataRow = dataRow(KEY, VALUE); |
| |
| var closure = new InsertInvokeClosure(dataRow); |
| |
| storage.invoke(dataRow, closure); |
| |
| assertTrue(closure.result()); |
| |
| checkHasSameEntry(dataRow); |
| } |
| |
| /** |
| * Tests that {@link InsertInvokeClosure} doesn't insert a data row if there is an existing data row with the same key. |
| */ |
| @Test |
| public void testInsertClosure_failureBranch() { |
| DataRow dataRow = dataRow(KEY, VALUE); |
| |
| var closure = new InsertInvokeClosure(dataRow); |
| |
| storage.invoke(dataRow, closure); |
| |
| assertTrue(closure.result()); |
| |
| DataRow sameKeyRow = dataRow(KEY, "test"); |
| |
| var sameClosure = new InsertInvokeClosure(sameKeyRow); |
| |
| storage.invoke(sameKeyRow, sameClosure); |
| |
| assertFalse(sameClosure.result()); |
| |
| checkHasSameEntry(dataRow); |
| } |
| |
| /** |
| * Tests that {@link DeleteExactInvokeClosure} deletes a data row if a key and a value matches the ones passed |
| * in the closure. |
| */ |
| @Test |
| public void testDeleteExactClosure() { |
| DataRow dataRow = dataRow(KEY, VALUE); |
| |
| storage.write(dataRow); |
| |
| checkHasSameEntry(dataRow); |
| |
| var closure = new DeleteExactInvokeClosure(dataRow); |
| |
| storage.invoke(dataRow, closure); |
| |
| assertTrue(closure.result()); |
| |
| checkHasNoEntry(dataRow); |
| } |
| |
| /** |
| * Tests that {@link DeleteExactInvokeClosure} doesn't delete a data row if a key and a value don't match |
| * the ones passed in the closure. |
| */ |
| @Test |
| public void testDeleteExactClosure_failureBranch() { |
| DataRow dataRow = dataRow(KEY, VALUE); |
| |
| storage.write(dataRow); |
| |
| checkHasSameEntry(dataRow); |
| |
| var closure = new DeleteExactInvokeClosure(dataRow(KEY, "test")); |
| |
| storage.invoke(dataRow, closure); |
| |
| assertFalse(closure.result()); |
| |
| checkHasSameEntry(dataRow); |
| } |
| |
| /** |
| * Tests that {@link GetAndRemoveInvokeClosure} successfully retrieves and removes a data row. |
| */ |
| @Test |
| public void testGetAndRemoveClosure() { |
| DataRow dataRow = dataRow(KEY, VALUE); |
| |
| storage.write(dataRow); |
| |
| checkHasSameEntry(dataRow); |
| |
| var closure = new GetAndRemoveInvokeClosure(); |
| |
| storage.invoke(dataRow, closure); |
| |
| assertTrue(closure.result()); |
| |
| checkRowsEqual(dataRow, closure.oldRow()); |
| |
| checkHasNoEntry(dataRow); |
| } |
| |
| /** |
| * Tests that {@link GetAndRemoveInvokeClosure} doesn't retrieve and remove a data row if it doesn't exist. |
| */ |
| @Test |
| public void testGetAndRemoveClosure_failureBranch() { |
| DataRow dataRow = dataRow(KEY, VALUE); |
| |
| storage.write(dataRow); |
| |
| checkHasSameEntry(dataRow); |
| |
| var closure = new GetAndRemoveInvokeClosure(); |
| |
| storage.invoke(searchRow("test"), closure); |
| |
| assertFalse(closure.result()); |
| |
| assertNull(closure.oldRow().valueBytes()); |
| |
| checkHasSameEntry(dataRow); |
| } |
| |
| /** |
| * Tests that {@link GetAndReplaceInvokeClosure} with the {@link GetAndReplaceInvokeClosure#onlyIfExists} set to |
| * {@code false} retrieves and replaces the existing entry in the storage. |
| */ |
| @Test |
| public void testGetAndReplaceClosureIfExistsFalse_entryExists() { |
| String newValue = "newValue"; |
| |
| DataRow dataRow = dataRow(KEY, VALUE); |
| |
| storage.write(dataRow); |
| |
| checkHasSameEntry(dataRow); |
| |
| DataRow newRow = dataRow(KEY, newValue); |
| |
| var closure = new GetAndReplaceInvokeClosure(newRow, false); |
| |
| storage.invoke(dataRow, closure); |
| |
| DataRow replaced = closure.oldRow(); |
| |
| assertNotNull(replaced); |
| |
| assertTrue(closure.result()); |
| |
| checkRowsEqual(dataRow, replaced); |
| |
| checkHasDifferentEntry(dataRow); |
| checkHasSameEntry(newRow); |
| } |
| |
| /** |
| * Tests that {@link GetAndReplaceInvokeClosure} with the {@link GetAndReplaceInvokeClosure#onlyIfExists} set to |
| * {@code false} successfully inserts a new data row and returns an empty row if a previous row with the same key |
| * doesn't exist . |
| */ |
| @Test |
| public void testGetAndReplaceClosureIfExistsFalse_entryNotExists() { |
| DataRow dataRow = dataRow(KEY, VALUE); |
| |
| var closure = new GetAndReplaceInvokeClosure(dataRow, false); |
| |
| storage.invoke(dataRow, closure); |
| |
| DataRow replaced = closure.oldRow(); |
| |
| assertNotNull(replaced); |
| |
| assertTrue(closure.result()); |
| |
| assertFalse(replaced.hasValueBytes()); |
| |
| checkHasSameEntry(dataRow); |
| } |
| |
| /** |
| * Tests that {@link GetAndReplaceInvokeClosure} with the {@link GetAndReplaceInvokeClosure#onlyIfExists} set to |
| * {@code true} retrieves and replaces the existing entry in the storage. |
| */ |
| @Test |
| public void testGetAndReplaceClosureIfExistsTrue_entryExists() { |
| DataRow dataRow = dataRow(KEY, VALUE); |
| |
| storage.write(dataRow); |
| |
| checkHasSameEntry(dataRow); |
| |
| DataRow newRow = dataRow(KEY, "test"); |
| |
| var closure = new GetAndReplaceInvokeClosure(newRow, true); |
| |
| storage.invoke(dataRow, closure); |
| |
| DataRow replaced = closure.oldRow(); |
| |
| assertNotNull(replaced); |
| |
| assertTrue(closure.result()); |
| |
| assertTrue(replaced.hasValueBytes()); |
| |
| checkHasDifferentEntry(dataRow); |
| checkHasSameEntry(newRow); |
| } |
| |
| /** |
| * Tests that {@link GetAndReplaceInvokeClosure} with the {@link GetAndReplaceInvokeClosure#onlyIfExists} set to |
| * {@code true} doesn't insert a new entry if a previous one doesn't exist. |
| */ |
| @Test |
| public void testGetAndReplaceClosureIfExistsTrue_entryNotExists() { |
| DataRow dataRow = dataRow(KEY, VALUE); |
| |
| var closure = new GetAndReplaceInvokeClosure(dataRow, true); |
| |
| storage.invoke(dataRow, closure); |
| |
| DataRow replaced = closure.oldRow(); |
| |
| assertNotNull(replaced); |
| |
| assertFalse(closure.result()); |
| |
| assertFalse(replaced.hasValueBytes()); |
| |
| checkHasNoEntry(dataRow); |
| } |
| |
| /** |
| * Tests that {@link ReplaceExactInvokeClosure} replaces the data row with a given key and a given value |
| * in the storage. |
| */ |
| @Test |
| public void testReplaceExactClosure() { |
| String newValue = "newValue"; |
| |
| DataRow dataRow = dataRow(KEY, VALUE); |
| |
| storage.write(dataRow); |
| |
| checkHasSameEntry(dataRow); |
| |
| DataRow newRow = dataRow(KEY, newValue); |
| |
| var closure = new ReplaceExactInvokeClosure(dataRow, newRow); |
| |
| storage.invoke(dataRow, closure); |
| |
| assertTrue(closure.result()); |
| |
| checkHasDifferentEntry(dataRow); |
| checkHasSameEntry(newRow); |
| } |
| |
| /** |
| * Tests that {@link ReplaceExactInvokeClosure} doesn't replace the data row with a given key and a given value |
| * if the value in the storage doesn't match the value passed via the closure. |
| */ |
| @Test |
| public void testReplaceExactClosure_failureBranch() { |
| String newValue = "newValue"; |
| |
| DataRow dataRow = dataRow(KEY, VALUE); |
| |
| storage.write(dataRow); |
| |
| checkHasSameEntry(dataRow); |
| |
| DataRow newRow = dataRow(KEY, newValue); |
| |
| var closure = new ReplaceExactInvokeClosure(newRow, dataRow); |
| |
| storage.invoke(dataRow, closure); |
| |
| assertFalse(closure.result()); |
| |
| checkHasSameEntry(dataRow); |
| } |
| |
| /** |
| * Tests the {@link Storage#readAll(Collection)} operation successfully reads data rows from the storage. |
| */ |
| @Test |
| public void testReadAll() { |
| List<DataRow> rows = insertBulk(100); |
| |
| List<DataRow> rowsFromStorage = new ArrayList<>(storage.readAll(rows)); |
| |
| Comparator<DataRow> comparator = Comparator.comparing(DataRow::keyBytes, Arrays::compare) |
| .thenComparing(DataRow::valueBytes, Arrays::compare); |
| |
| rows.sort(comparator); |
| rowsFromStorage.sort(comparator); |
| |
| assertEquals(rows, rowsFromStorage); |
| } |
| |
| /** |
| * Tests that {@link Storage#writeAll(Collection)} operation successfully writes a collection of data rows into the |
| * storage. |
| */ |
| @Test |
| public void testWriteAll() { |
| List<DataRow> rows = IntStream.range(0, 100) |
| .mapToObj(i -> dataRow(KEY + i, VALUE + i)) |
| .collect(Collectors.toList()); |
| |
| storage.writeAll(rows); |
| rows.forEach(this::checkHasSameEntry); |
| } |
| |
| /** |
| * Tests that {@link Storage#insertAll(Collection)} operation doesn't insert data rows which keys |
| * are already present in the storage. This operation must also return the list of such data rows. |
| */ |
| @Test |
| public void testInsertAll() { |
| List<DataRow> rows = insertBulk(100); |
| |
| List<DataRow> oldRows = rows.subList(0, 50); |
| |
| List<DataRow> newInsertion = Stream.concat( |
| oldRows.stream(), |
| IntStream.range(100, 150).mapToObj(i -> dataRow(KEY + "_" + i, VALUE + "_" + i)) |
| ).collect(Collectors.toList()); |
| |
| Collection<DataRow> cantInsert = storage.insertAll(newInsertion); |
| |
| assertEquals(oldRows, cantInsert); |
| } |
| |
| /** |
| * Tests that {@link Storage#removeAll(Collection)} operation successfully retrieves and removes a collection of |
| * {@link SearchRow}s. |
| */ |
| @Test |
| public void testRemoveAll() throws Exception { |
| List<DataRow> rows = insertBulk(100); |
| |
| Collection<DataRow> removed = storage.removeAll( |
| rows.stream().map(r -> new SimpleDataRow(r.keyBytes(), null)).collect(Collectors.toList()) |
| ); |
| |
| assertEquals(rows, removed); |
| |
| Cursor<DataRow> scan = storage.scan(row -> true); |
| |
| assertFalse(scan.hasNext()); |
| |
| scan.close(); |
| } |
| |
| @Test |
| public void testRemoveAllKeyNotExists() throws Exception { |
| Collection<DataRow> removed = storage.removeAll(Collections.singleton(searchRow(KEY))); |
| |
| assertNotNull(removed); |
| |
| assertTrue(removed.isEmpty()); |
| } |
| |
| /** |
| * Tests that {@link Storage#removeAllExact(Collection)} operation successfully removes and retrieves a collection |
| * of data rows with the given exact keys and values from the storage. |
| */ |
| @Test |
| public void testRemoveAllExact() throws Exception { |
| List<DataRow> rows = insertBulk(100); |
| |
| Collection<DataRow> removed = storage.removeAllExact(rows); |
| |
| assertEquals(rows, removed); |
| |
| Cursor<DataRow> scan = storage.scan(row -> true); |
| |
| assertFalse(scan.hasNext()); |
| |
| scan.close(); |
| } |
| |
| /** |
| * Tests that {@link Storage#removeAllExact(Collection)} operation doesn't remove and retrieve a collection |
| * of data rows with the given exact keys and values from the storage if the value in the storage doesn't match |
| * the given value. |
| */ |
| @Test |
| public void testRemoveAllExact_failureBranch() throws Exception { |
| List<DataRow> rows = insertBulk(100); |
| |
| List<DataRow> notExactRows = IntStream.range(0, 100) |
| .mapToObj(i -> dataRow(KEY + i, VALUE + (i + 1))) |
| .collect(Collectors.toList()); |
| |
| Collection<DataRow> removed = storage.removeAllExact(notExactRows); |
| |
| assertEquals(0, removed.size()); |
| |
| rows.forEach(this::checkHasSameEntry); |
| } |
| |
| /** |
| * Inserts and returns a given amount of data rows with {@link #KEY}_i as a key and {@link #VALUE}_i as a value |
| * where i is an index of the data row. |
| * |
| * @param numberOfEntries Amount of entries to insert. |
| * @return List of inserted rows. |
| */ |
| private List<DataRow> insertBulk(int numberOfEntries) { |
| List<DataRow> rows = IntStream.range(0, numberOfEntries) |
| .mapToObj(i -> dataRow(KEY + "_" + i, VALUE + "_" + i)) |
| .collect(Collectors.toList()); |
| |
| storage.insertAll(rows); |
| rows.forEach(this::checkHasSameEntry); |
| |
| // Clone key and value byte arrays so that returned rows have new references. |
| // This way we check that the ConcurrentHashMapStorage performs an actual array comparison. |
| return rows.stream().map(row -> { |
| byte[] valueBytes = row.valueBytes(); |
| |
| assert valueBytes != null; |
| |
| return new SimpleDataRow(row.keyBytes().clone(), valueBytes.clone()); |
| }).collect(Collectors.toList()); |
| } |
| |
| /** |
| * Checks that the storage has no value for the given search row. |
| * |
| * @param row Search row. |
| */ |
| private void checkHasNoEntry(SearchRow row) { |
| DataRow read = storage.read(row); |
| |
| assertFalse(read.hasValueBytes()); |
| } |
| |
| /** |
| * Checks that the storage contains a row with a given key but with a different value. |
| * |
| * @param row Data row. |
| */ |
| private void checkHasDifferentEntry(DataRow row) { |
| DataRow read = storage.read(row); |
| |
| assertFalse(Arrays.equals(row.valueBytes(), read.valueBytes())); |
| } |
| |
| /** |
| * Checks that the storage contains a specific data row. |
| * |
| * @param row Expected data row. |
| */ |
| private void checkHasSameEntry(DataRow row) { |
| DataRow read = storage.read(row); |
| |
| checkRowsEqual(row, read); |
| } |
| |
| /** |
| * Checks that two rows are equal. |
| * |
| * @param expected Expected data row. |
| * @param actual Actual data row. |
| */ |
| private void checkRowsEqual(DataRow expected, DataRow actual) { |
| assertArrayEquals(expected.keyBytes(), actual.keyBytes()); |
| assertArrayEquals(expected.valueBytes(), actual.valueBytes()); |
| } |
| |
| /** |
| * Wraps string key into a search row. |
| * |
| * @param key String key. |
| * @return Search row. |
| */ |
| private SearchRow searchRow(String key) { |
| return new SimpleDataRow( |
| key.getBytes(StandardCharsets.UTF_8), |
| null |
| ); |
| } |
| |
| /** |
| * Wraps string key/value pair into a data row. |
| * |
| * @param key String key. |
| * @param value String value. |
| * @return Data row. |
| */ |
| private DataRow dataRow(String key, String value) { |
| return new SimpleDataRow( |
| key.getBytes(StandardCharsets.UTF_8), |
| value.getBytes(StandardCharsets.UTF_8) |
| ); |
| } |
| |
| /** |
| * Converts cursor to list. |
| * |
| * @param cursor Cursor. |
| * @param <T> Type of cursor content. |
| * @return List. |
| * @throws Exception If error occurred during iteration or while closing the cursor. |
| */ |
| @NotNull |
| private <T> List<T> toList(Cursor<T> cursor) throws Exception { |
| try (cursor) { |
| return StreamSupport.stream(cursor.spliterator(), false).collect(Collectors.toList()); |
| } |
| } |
| } |