blob: 5a9b615750fd71f8e71424008f835dc2dc1a8f75 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal;
import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME;
import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_ROCKSDB_PROFILE_NAME;
import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Path;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.ignite.Ignite;
import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.Statement;
import org.apache.ignite.sql.Statement.StatementBuilder;
import org.apache.ignite.table.Table;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.TestInstance;
/**
* Abstract basic integration test that starts a cluster once for all the tests it runs.
*/
@SuppressWarnings("resource")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class ClusterPerClassIntegrationTest extends IgniteIntegrationTest {
/** Test default table name. */
protected static final String DEFAULT_TABLE_NAME = "person";
/** Nodes bootstrap configuration pattern. */
private static final String NODE_BOOTSTRAP_CFG_TEMPLATE = "{\n"
+ " network: {\n"
+ " port: {},\n"
+ " nodeFinder: {\n"
+ " netClusterNodes: [ {} ]\n"
+ " }\n"
+ " },\n"
+ " storage.profiles: {"
+ " " + DEFAULT_TEST_PROFILE_NAME + ".engine: test, "
+ " " + DEFAULT_AIPERSIST_PROFILE_NAME + ".engine: aipersist, "
+ " " + DEFAULT_AIMEM_PROFILE_NAME + ".engine: aimem, "
+ " " + DEFAULT_ROCKSDB_PROFILE_NAME + ".engine: rocksDb"
+ " },\n"
+ " clientConnector: { port:{} },\n"
+ " rest.port: {},\n"
+ " compute.threadPoolSize: 1\n"
+ "}";
/** Cluster nodes. */
protected static Cluster CLUSTER;
/** Work directory. */
@WorkDirectory
protected static Path WORK_DIR;
/**
* Before all.
*
* @param testInfo Test information object.
*/
@BeforeAll
protected void beforeAll(TestInfo testInfo) {
CLUSTER = new Cluster(testInfo, WORK_DIR, getNodeBootstrapConfigTemplate());
if (initialNodes() > 0) {
CLUSTER.startAndInit(initialNodes(), cmgMetastoreNodes(), this::configureInitParameters);
}
}
/**
* Get a count of nodes in the Ignite cluster.
*
* @return Count of nodes.
*/
protected int initialNodes() {
return 3;
}
protected int[] cmgMetastoreNodes() {
return new int[] { 0 };
}
/**
* This method can be overridden to add custom init parameters during cluster initialization.
*/
protected void configureInitParameters(InitParametersBuilder builder) {
}
/**
* Returns node bootstrap config template.
*
* @return Node bootstrap config template.
*/
protected String getNodeBootstrapConfigTemplate() {
return NODE_BOOTSTRAP_CFG_TEMPLATE;
}
/**
* After all.
*/
@AfterAll
void afterAll() {
CLUSTER.shutdown();
}
/** Drops all visible tables. */
protected static void dropAllTables() {
for (Table t : CLUSTER.aliveNode().tables().tables()) {
sql("DROP TABLE " + t.name());
}
}
/** Drops all visible zones. */
protected static void dropAllZonesExceptDefaultOne() {
CatalogManager catalogManager = CLUSTER.aliveNode().catalogManager();
int latestCatalogVersion = catalogManager.latestCatalogVersion();
Catalog catalog = Objects.requireNonNull(catalogManager.catalog(latestCatalogVersion));
CatalogZoneDescriptor defaultZone = catalog.defaultZone();
for (CatalogZoneDescriptor z : catalogManager.zones(latestCatalogVersion)) {
String zoneName = z.name();
if (defaultZone != null && zoneName.equals(defaultZone.name())) {
continue;
}
sql("DROP ZONE " + zoneName);
}
}
/**
* Creates a table.
*
* @param name Table name.
* @param replicas Replica factor.
* @param partitions Partitions count.
*/
protected static Table createTable(String name, int replicas, int partitions) {
return createZoneAndTable(zoneName(name), name, replicas, partitions);
}
/**
* Creates a table.
*
* @param tableName Table name.
* @param zoneName Zone name.
*/
protected static Table createTableOnly(String tableName, String zoneName) {
sql(format(
"CREATE TABLE IF NOT EXISTS {} (id INT PRIMARY KEY, name VARCHAR, salary DOUBLE) WITH PRIMARY_ZONE='{}'",
tableName, zoneName
));
return CLUSTER.node(0).tables().table(tableName);
}
/**
* Creates a zone.
*
* @param zoneName Zone name.
* @param replicas Replica factor.
* @param partitions Partitions count.
* @param storageProfile Storage profile.
*/
protected static void createZoneOnlyIfNotExists(String zoneName, int replicas, int partitions, String storageProfile) {
sql(format(
"CREATE ZONE IF NOT EXISTS {} WITH REPLICAS={}, PARTITIONS={}, STORAGE_PROFILES='{}';",
zoneName, replicas, partitions, storageProfile
));
}
/**
* Creates zone and table.
*
* @param zoneName Zone name.
* @param tableName Table name.
* @param replicas Replica factor.
* @param partitions Partitions count.
*/
protected static Table createZoneAndTable(String zoneName, String tableName, int replicas, int partitions) {
createZoneOnlyIfNotExists(zoneName, replicas, partitions, DEFAULT_STORAGE_PROFILE);
return createTableOnly(tableName, zoneName);
}
/**
* Creates zone and table.
*
* @param zoneName Zone name.
* @param tableName Table name.
* @param replicas Replica factor.
* @param partitions Partitions count.
* @param storageProfile Storage profile.
*/
protected static Table createZoneAndTable(
String zoneName,
String tableName,
int replicas,
int partitions,
String storageProfile
) {
createZoneOnlyIfNotExists(zoneName, replicas, partitions, storageProfile);
return createTableOnly(tableName, zoneName);
}
/**
* Inserts data into the table created by {@link #createZoneAndTable(String, String, int, int)}.
*
* @param tx Transaction.
* @param tableName Table name.
* @param people People to insert into the table.
*/
protected static void insertPeople(Transaction tx, String tableName, Person... people) {
insertDataInTransaction(
tx,
tableName,
List.of("ID", "NAME", "SALARY"),
Stream.of(people).map(person -> new Object[]{person.id, person.name, person.salary}).toArray(Object[][]::new)
);
}
/**
* Inserts data into the table created by {@link #createZoneAndTable(String, String, int, int)}.
*
* @param tableName Table name.
* @param people People to insert into the table.
*/
protected static void insertPeople(String tableName, Person... people) {
insertData(
tableName,
List.of("ID", "NAME", "SALARY"),
Stream.of(people).map(person -> new Object[]{person.id, person.name, person.salary}).toArray(Object[][]::new)
);
}
/**
* Updates data in the table created by {@link #createZoneAndTable(String, String, int, int)}.
*
* @param tableName Table name.
* @param people People to update in the table.
*/
protected static void updatePeople(String tableName, Person... people) {
Transaction tx = CLUSTER.node(0).transactions().begin();
String sql = String.format("UPDATE %s SET NAME=?, SALARY=? WHERE ID=?", tableName);
for (Person person : people) {
sql(tx, sql, person.name, person.salary, person.id);
}
tx.commit();
}
/**
* Deletes data in the table created by {@link #createZoneAndTable(String, String, int, int)}.
*
* @param tableName Table name.
* @param personIds Person IDs to delete.
*/
protected static void deletePeople(String tableName, int... personIds) {
Transaction tx = CLUSTER.node(0).transactions().begin();
String sql = String.format("DELETE FROM %s WHERE ID=?", tableName);
for (int personId : personIds) {
sql(tx, sql, personId);
}
tx.commit();
}
/**
* Creates an index for the table created by {@link #createZoneAndTable(String, String, int, int)}..
*
* @param tableName Table name.
* @param indexName Index name.
* @param columnName Column name.
*/
protected static void createIndex(String tableName, String indexName, String columnName) {
sql(format("CREATE INDEX {} ON {} ({})", indexName, tableName, columnName));
}
/**
* Drops an index for the table created by {@link #createZoneAndTable(String, String, int, int)}.
*
* @param indexName Index name.
*/
protected static void dropIndex(String indexName) {
sql(format("DROP INDEX {}", indexName));
}
protected static void insertData(String tblName, List<String> columnNames, Object[]... tuples) {
Transaction tx = CLUSTER.node(0).transactions().begin();
insertDataInTransaction(tx, tblName, columnNames, tuples);
tx.commit();
}
protected static void insertDataInTransaction(Transaction tx, String tblName, List<String> columnNames, Object[]... tuples) {
String insertStmt = "INSERT INTO " + tblName + "(" + String.join(", ", columnNames) + ")"
+ " VALUES (" + ", ?".repeat(columnNames.size()).substring(2) + ")";
for (Object[] args : tuples) {
sql(tx, insertStmt, args);
}
}
protected static List<List<Object>> sql(String sql, Object... args) {
return sql(null, sql, args);
}
protected static List<List<Object>> sql(int nodeIndex, String sql, Object... args) {
return sql(nodeIndex, null, sql, args);
}
/**
* Run SQL on given Ignite instance with given transaction and parameters.
*
* @param node Ignite instance to run a query.
* @param tx Transaction to run a given query. Can be {@code null} to run within implicit transaction.
* @param zoneId Client time zone.
* @param query Query to be run.
* @param args Dynamic parameters for a given query.
* @return List of lists, where outer list represents a rows, internal lists represents a columns.
*/
public static List<List<Object>> sql(Ignite node, @Nullable Transaction tx, @Nullable ZoneId zoneId, String query, Object... args) {
IgniteSql sql = node.sql();
StatementBuilder builder = sql.statementBuilder()
.query(query);
if (zoneId != null) {
builder.timeZoneId(zoneId);
}
Statement statement = builder.build();
try (ResultSet<SqlRow> rs = sql.execute(tx, statement, args)) {
return getAllResultSet(rs);
}
}
protected static List<List<Object>> sql(@Nullable Transaction tx, String sql, Object... args) {
return sql(0, tx, sql, args);
}
protected static List<List<Object>> sql(int nodeIndex, @Nullable Transaction tx, String sql, Object[] args) {
return sql(nodeIndex, tx, null, sql, args);
}
protected static List<List<Object>> sql(int nodeIndex, @Nullable Transaction tx, @Nullable ZoneId zoneId, String sql, Object[] args) {
return sql(CLUSTER.node(nodeIndex), tx, zoneId, sql, args);
}
private static List<List<Object>> getAllResultSet(ResultSet<SqlRow> resultSet) {
List<List<Object>> res = new ArrayList<>();
while (resultSet.hasNext()) {
SqlRow sqlRow = resultSet.next();
ArrayList<Object> row = new ArrayList<>(sqlRow.columnCount());
for (int i = 0; i < sqlRow.columnCount(); i++) {
row.add(sqlRow.value(i));
}
res.add(row);
}
return res;
}
/**
* Looks up a node by a consistent ID, {@code null} if absent.
*
* @param consistentId Node consistent ID.
*/
protected static @Nullable IgniteImpl findByConsistentId(String consistentId) {
return CLUSTER.runningNodes()
.filter(Objects::nonNull)
.map(IgniteImpl.class::cast)
.filter(ignite -> consistentId.equals(ignite.name()))
.findFirst()
.orElse(null);
}
protected static String zoneName(String tableName) {
return "ZONE_" + tableName.toUpperCase();
}
/**
* Class for updating table in {@link #insertPeople(String, Person...)}, {@link #updatePeople(String, Person...)}. You can use
* {@link #deletePeople(String, int...)} to remove people.
*/
protected static class Person {
final int id;
final String name;
final double salary;
public Person(int id, String name, double salary) {
this.id = id;
this.name = name;
this.salary = salary;
}
}
/**
* Waits for some amount of time so that read-only transactions can observe the most recent version of the catalog.
*/
protected static void waitForReadTimestampThatObservesMostRecentCatalog() {
// See TxManagerImpl::currentReadTimestamp.
long delay = TestIgnitionManager.DEFAULT_MAX_CLOCK_SKEW_MS + TestIgnitionManager.DEFAULT_PARTITION_IDLE_SYNC_TIME_INTERVAL_MS;
try {
TimeUnit.MILLISECONDS.sleep(delay);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* Returns {@code true} if the index exists and is available in the latest catalog version.
*
* @param ignite Node.
* @param indexName Index name that is being checked.
*/
protected static boolean isIndexAvailable(IgniteImpl ignite, String indexName) {
CatalogManager catalogManager = ignite.catalogManager();
HybridClock clock = ignite.clock();
CatalogIndexDescriptor indexDescriptor = catalogManager.aliveIndex(indexName, clock.nowLong());
return indexDescriptor != null && indexDescriptor.status() == AVAILABLE;
}
/**
* Awaits for all requested indexes to become available in the latest catalog version.
*
* @param ignite Node.
* @param indexNames Names of indexes that are of interest.
*/
protected static void awaitIndexesBecomeAvailable(IgniteImpl ignite, String... indexNames) throws Exception {
assertTrue(waitForCondition(
() -> Arrays.stream(indexNames).allMatch(indexName -> isIndexAvailable(ignite, indexName)),
10_000L
));
}
/**
* Inserts data into the table created by {@link #createZoneAndTable(String, String, int, int)} in transaction.
*
* @param tableName Table name.
* @param people People to insert into the table.
*/
protected static void insertPeopleInTransaction(Transaction tx, String tableName, Person... people) {
insertDataInTransaction(
tx,
tableName,
List.of("ID", "NAME", "SALARY"),
Stream.of(people).map(person -> new Object[]{person.id, person.name, person.salary}).toArray(Object[][]::new)
);
}
}