blob: 61378a4f3f46b59657a96173a622cb3ec11463d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.runner.app;
import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static org.apache.ignite.internal.IndexTestUtils.waitForIndexToAppearInAnyState;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteTablesInternal;
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
import static org.apache.ignite.internal.test.WatchListenerInhibitor.metastorageEventsInhibitor;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
import org.apache.ignite.InitParameters;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.IndexExistsValidationException;
import org.apache.ignite.internal.catalog.TableExistsValidationException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.test.WatchListenerInhibitor;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.ErrorGroups.Sql;
import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
/**
* Integration tests to check consistent of java API on different nodes.
*/
@SuppressWarnings("ThrowableNotThrown")
public class ItTablesApiTest extends IgniteAbstractTest {
/** Table name. */
public static final String TABLE_NAME = "TBL1";
private static final String INDEX_NAME = "testHI".toUpperCase(Locale.ROOT);
/** Nodes bootstrap configuration. */
private final List<String> nodesBootstrapCfg = List.of(
"{\n"
+ " network.port :3344, clientConnector.port: 10800,\n"
+ " network.nodeFinder.netClusterNodes:[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ],\n"
+ " rest.port: 10300\n"
+ "}",
"{\n"
+ " network.port :3345, clientConnector.port: 10801,\n"
+ " network.nodeFinder.netClusterNodes:[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ],\n"
+ " rest.port: 10301\n"
+ "}",
"{\n"
+ " network.port :3346, clientConnector.port: 10802,\n"
+ " network.nodeFinder.netClusterNodes:[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ],\n"
+ " rest.port: 10302\n"
+ "}"
);
/** Cluster nodes. */
private final List<Ignite> clusterNodes = new ArrayList<>();
/**
* Before each.
*/
@BeforeEach
void beforeEach(TestInfo testInfo) {
List<CompletableFuture<Ignite>> futures = new ArrayList<>();
for (int i = 0; i < nodesBootstrapCfg.size(); i++) {
String nodeName = testNodeName(testInfo, i);
futures.add(TestIgnitionManager.start(nodeName, nodesBootstrapCfg.get(i), workDir.resolve(nodeName)));
}
String metaStorageNodeName = testNodeName(testInfo, 0);
InitParameters initParameters = InitParameters.builder()
.destinationNodeName(metaStorageNodeName)
.metaStorageNodeNames(List.of(metaStorageNodeName))
.clusterName("cluster")
.build();
TestIgnitionManager.init(initParameters);
for (CompletableFuture<Ignite> future : futures) {
assertThat(future, willCompleteSuccessfully());
clusterNodes.add(future.join());
}
}
/**
* After each.
*/
@AfterEach
void afterEach(TestInfo testInfo) throws Exception {
List<AutoCloseable> closeables = IntStream.range(0, nodesBootstrapCfg.size())
.mapToObj(i -> testNodeName(testInfo, i))
.map(name -> (AutoCloseable) () -> IgnitionManager.stop(name))
.collect(Collectors.toList());
IgniteUtils.closeAll(closeables);
}
/**
* Tries to create a table which is already created.
*/
@Test
public void testTableAlreadyCreated() {
clusterNodes.forEach(ign -> assertNull(ign.tables().table(TABLE_NAME)));
Ignite ignite0 = clusterNodes.get(0);
Table tbl = createTable(ignite0, TABLE_NAME);
assertThrowsSqlException(
Sql.STMT_VALIDATION_ERR,
"Table with name 'PUBLIC.TBL1' already exists",
() -> createTable(ignite0, TABLE_NAME));
assertEquals(unwrapTableImpl(tbl), unwrapTableImpl(createTableIfNotExists(ignite0, TABLE_NAME)));
}
/**
* Tries to create a table which is already created from lagged node.
*
* @throws Exception If failed.
*/
@Test
public void testTableAlreadyCreatedFromLaggedNode() {
clusterNodes.forEach(ign -> assertNull(ign.tables().table(TABLE_NAME)));
Ignite ignite0 = clusterNodes.get(0);
Ignite ignite1 = clusterNodes.get(1);
WatchListenerInhibitor ignite1Inhibitor = metastorageEventsInhibitor(ignite1);
ignite1Inhibitor.startInhibit();
createTable(ignite0, TABLE_NAME);
CompletableFuture<Void> createTblFut = runAsync(() -> createTable(ignite1, TABLE_NAME));
CompletableFuture<Table> createTblIfNotExistsFut = supplyAsync(() -> createTableIfNotExists(ignite1, TABLE_NAME));
for (Ignite ignite : clusterNodes) {
if (ignite != ignite1) {
assertThrowsSqlException(
Sql.STMT_VALIDATION_ERR,
"Table with name 'PUBLIC.TBL1' already exists",
() -> createTable(ignite, TABLE_NAME));
assertNotNull(createTableIfNotExists(ignite, TABLE_NAME));
}
}
assertFalse(createTblFut.isDone());
assertFalse(createTblIfNotExistsFut.isDone());
ignite1Inhibitor.stopInhibit();
assertThat(createTblFut, willThrowWithCauseOrSuppressed(TableExistsValidationException.class));
assertThat(createTblIfNotExistsFut, willCompleteSuccessfully());
}
/**
* Test scenario when we have lagged node, and tables with the same name are deleted and created again.
*/
@Test
public void testGetTableFromLaggedNode() {
clusterNodes.forEach(ign -> assertNull(ign.tables().table(TABLE_NAME)));
Ignite ignite0 = clusterNodes.get(0);
Ignite ignite1 = clusterNodes.get(1);
Table tbl = createTable(ignite0, TABLE_NAME);
Tuple tableKey = Tuple.create()
.set("key", 123L);
Tuple value = Tuple.create()
.set("valInt", 1234)
.set("valStr", "some string row");
tbl.keyValueView().put(null, tableKey, value);
assertEquals(value, tbl.keyValueView().get(null, tableKey));
assertEquals(value, ignite1.tables().table(TABLE_NAME).keyValueView().get(null, tableKey));
WatchListenerInhibitor ignite1Inhibitor = metastorageEventsInhibitor(ignite1);
ignite1Inhibitor.startInhibit();
Tuple otherValue = Tuple.create()
.set("valInt", 12345)
.set("valStr", "some other string row");
tbl.keyValueView().put(null, tableKey, otherValue);
assertEquals(otherValue, tbl.keyValueView().get(null, tableKey));
ignite1Inhibitor.stopInhibit();
assertEquals(otherValue, ignite1.tables().table(TABLE_NAME).keyValueView().get(null, tableKey));
}
/**
* Tries to create an index which is already created.
*/
@Test
public void testAddIndex() {
clusterNodes.forEach(ign -> assertNull(ign.tables().table(TABLE_NAME)));
Ignite ignite0 = clusterNodes.get(0);
createTable(ignite0, TABLE_NAME);
tryToCreateIndex(ignite0, TABLE_NAME, true);
assertThrowsWithCause(() -> tryToCreateIndex(ignite0, TABLE_NAME, true), IndexExistsValidationException.class);
tryToCreateIndex(ignite0, TABLE_NAME, false);
}
/**
* Tries to create an index which is already created from lagged node.
*
* @throws Exception If failed.
*/
@Test
public void testAddIndexFromLaggedNode() throws Exception {
clusterNodes.forEach(ign -> assertNull(ign.tables().table(TABLE_NAME)));
IgniteImpl ignite0 = (IgniteImpl) clusterNodes.get(0);
createTable(ignite0, TABLE_NAME);
Ignite ignite1 = clusterNodes.get(1);
CompletableFuture<Void> addIndexFut;
CompletableFuture<Void> addIndexIfNotExistsFut;
WatchListenerInhibitor ignite1Inhibitor = metastorageEventsInhibitor(ignite1);
ignite1Inhibitor.startInhibit();
try {
runAsync(() -> tryToCreateIndex(ignite0, TABLE_NAME, true));
waitForIndexToAppearInAnyState(INDEX_NAME, ignite0);
addIndexFut = runAsync(() -> tryToCreateIndex(ignite1, TABLE_NAME, true));
addIndexIfNotExistsFut = runAsync(() -> addIndexIfNotExists(ignite1, TABLE_NAME));
for (Ignite ignite : clusterNodes) {
if (ignite != ignite1) {
assertThrowsWithCause(() -> tryToCreateIndex(ignite, TABLE_NAME, true), IndexExistsValidationException.class);
addIndexIfNotExists(ignite, TABLE_NAME);
}
}
assertFalse(addIndexFut.isDone());
assertFalse(addIndexIfNotExistsFut.isDone());
} finally {
ignite1Inhibitor.stopInhibit();
}
assertThat(addIndexFut, willThrowWithCauseOrSuppressed(IndexExistsValidationException.class));
addIndexIfNotExistsFut.get(10, TimeUnit.SECONDS);
}
/**
* Tries to create a column which is already created.
*/
@Test
public void testAddColumn() {
clusterNodes.forEach(ign -> assertNull(ign.tables().table(TABLE_NAME)));
Ignite ignite0 = clusterNodes.get(0);
createTable(ignite0, TABLE_NAME);
addColumn(ignite0, TABLE_NAME);
assertThrowsWithCause(() -> addColumn(ignite0, TABLE_NAME), CatalogValidationException.class);
}
/** Tries to create a column which is already created from lagged node. */
@Test
public void testAddColumnFromLaggedNode() {
clusterNodes.forEach(ign -> assertNull(ign.tables().table(TABLE_NAME)));
Ignite ignite0 = clusterNodes.get(0);
createTable(ignite0, TABLE_NAME);
Ignite ignite1 = clusterNodes.get(1);
WatchListenerInhibitor ignite1Inhibitor = metastorageEventsInhibitor(ignite1);
ignite1Inhibitor.startInhibit();
addColumn(ignite0, TABLE_NAME);
CompletableFuture<Void> addColFut = runAsync(() -> addColumn(ignite1, TABLE_NAME));
for (Ignite ignite : clusterNodes) {
if (ignite != ignite1) {
assertThrowsSqlException(
Sql.STMT_VALIDATION_ERR,
"Failed to validate query. Column with name 'VALINT3' already exists",
() -> addColumn(ignite, TABLE_NAME));
}
}
assertFalse(addColFut.isDone());
ignite1Inhibitor.stopInhibit();
assertThat(addColFut, willThrowWithCauseOrSuppressed(CatalogValidationException.class));
}
/**
* Checks that if a table would be created/dropped in any cluster node, this action reflects on all others. Table management operations
* should pass in linearize order: if an action completed in one node, the result has to be visible to another one.
*
* @throws Exception If failed.
*/
@Test
public void testCreateDropTable() throws Exception {
clusterNodes.forEach(ign -> assertNull(ign.tables().table(TABLE_NAME)));
Ignite ignite1 = clusterNodes.get(1);
WatchListenerInhibitor ignite1Inhibitor = metastorageEventsInhibitor(ignite1);
ignite1Inhibitor.startInhibit();
Table table = createTable(clusterNodes.get(0), TABLE_NAME);
int tblId = unwrapTableViewInternal(table).tableId();
CompletableFuture<Table> tableByNameFut = supplyAsync(() -> ignite1.tables().table(TABLE_NAME));
CompletableFuture<Table> tableByIdFut = supplyAsync(() -> {
try {
return unwrapIgniteTablesInternal(ignite1.tables()).table(tblId);
} catch (NodeStoppingException e) {
throw new AssertionError(e.getMessage());
}
});
// Because the event inhibitor was started, last metastorage updates do not reach to one node.
// Therefore the table still doesn't exists locally, but API prevents getting null and waits events.
for (Ignite ignite : clusterNodes) {
if (ignite != ignite1) {
assertNotNull(ignite.tables().table(TABLE_NAME));
assertNotNull(unwrapIgniteTablesInternal(ignite.tables()).table(tblId));
}
}
assertFalse(tableByNameFut.isDone());
assertFalse(tableByIdFut.isDone());
ignite1Inhibitor.stopInhibit();
assertNotNull(tableByNameFut.get(10, TimeUnit.SECONDS));
assertNotNull(tableByIdFut.get(10, TimeUnit.SECONDS));
ignite1Inhibitor.startInhibit();
dropTable(clusterNodes.get(0), TABLE_NAME);
// Because the event inhibitor was started, last metastorage updates do not reach to one node.
// Therefore the table still exists locally, but API prevents getting it.
for (Ignite ignite : clusterNodes) {
assertNull(ignite.tables().table(TABLE_NAME));
assertNull(unwrapIgniteTablesInternal(ignite.tables()).table(tblId));
assertThrowsSqlException(
Sql.STMT_VALIDATION_ERR,
"Table with name 'PUBLIC.TBL1' not found",
() -> dropTable(ignite, TABLE_NAME));
dropTableIfExists(ignite, TABLE_NAME);
}
ignite1Inhibitor.stopInhibit();
}
@Test
public void usingTableAfterDrop() {
Ignite ignite0 = clusterNodes.get(0);
Table tbl = createTable(ignite0, TABLE_NAME);
RecordView<Tuple> view = tbl.recordView();
sql(ignite0, "DROP TABLE " + TABLE_NAME);
assertThrows(
TableNotFoundException.class,
() -> view.insert(null, Tuple.create().set("key", 1L).set("valInt", 1).set("valStr", "1")),
"Table does not exist or was dropped concurrently"
);
}
/**
* Creates table.
*
* @param node Cluster node.
* @param tableName Table name.
*/
protected static Table createTable(Ignite node, String tableName) {
sql(node, String.format("CREATE TABLE %s (key BIGINT PRIMARY KEY, valInt INT, valStr VARCHAR)", tableName));
return node.tables().table(tableName);
}
/**
* Adds an index if it does not exist.
*
* @param node Cluster node.
* @param tableName Table name.
*/
private static Table createTableIfNotExists(Ignite node, String tableName) {
sql(node, String.format("CREATE TABLE IF NOT EXISTS %s (key BIGINT PRIMARY KEY, valInt INT, valStr VARCHAR)", tableName));
return node.tables().table(tableName);
}
/**
* Drops the table which name is specified. If the table does not exist, an exception will be thrown.
*
* @param node Cluster node.
* @param tableName Table name.
*/
private static void dropTable(Ignite node, String tableName) {
sql(node, String.format("DROP TABLE %s", tableName));
}
/**
* Drops the table which name is specified. If the table did not exist, a dropping would ignore.
*
* @param node Cluster node.
* @param tableName Table name.
*/
private static void dropTableIfExists(Ignite node, String tableName) {
sql(node, String.format("DROP TABLE IF EXISTS %s", tableName));
}
/**
* Adds an index.
*
* @param node Cluster node.
* @param tableName Table name.
*/
private static void addColumn(Ignite node, String tableName) {
sql(node, String.format("ALTER TABLE %s ADD COLUMN valint3 INT", tableName));
}
/**
* Adds a column.
*
* @param node Cluster node.
* @param tableName Table name.
*/
protected void tryToCreateIndex(Ignite node, String tableName, boolean failIfNotExist) {
sql(
node,
String.format("CREATE INDEX %s %s ON %s (valInt, valStr)", failIfNotExist ? "" : "IF NOT EXISTS", INDEX_NAME, tableName)
);
}
/**
* Creates a table if it does not exist.
*
* @param node Cluster node.
* @param tableName Table name.
*/
protected void addIndexIfNotExists(Ignite node, String tableName) {
sql(node, String.format("CREATE INDEX IF NOT EXISTS %s ON %s (valInt)", INDEX_NAME, tableName));
}
private static void sql(Ignite node, String sql) {
node.sql().execute(null, sql);
}
}