blob: fc094742b6f8829bf2a9635ea5688fe291e0b58d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.table;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiPredicate;
import org.apache.ignite.configuration.RootKey;
import org.apache.ignite.configuration.internal.ConfigurationManager;
import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.configuration.storage.ConfigurationType;
import org.apache.ignite.internal.affinity.AffinityManager;
import org.apache.ignite.internal.affinity.event.AffinityEvent;
import org.apache.ignite.internal.affinity.event.AffinityEventParameters;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
import org.apache.ignite.internal.schema.event.SchemaEvent;
import org.apache.ignite.internal.schema.event.SchemaEventParameters;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.internal.metastorage.client.Condition;
import org.apache.ignite.internal.metastorage.client.Operation;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.schema.ColumnType;
import org.apache.ignite.schema.SchemaBuilders;
import org.apache.ignite.schema.SchemaTable;
import org.apache.ignite.table.Table;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.platform.commons.util.ReflectionUtils;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Tests scenarios for table manager.
*/
public class TableManagerTest {
/** The logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(TableManagerTest.class);
/** Internal prefix for the metasorage. */
private static final String INTERNAL_PREFIX = "internal.tables.";
/** The name of the table which is statically configured. */
private static final String STATIC_TABLE_NAME = "t1";
/** The name of the table which will be configured dynamically. */
private static final String DYNAMIC_TABLE_NAME = "t2";
/** The name of table to drop it. */
private static final String DYNAMIC_TABLE_FOR_DROP_NAME = "t3";
/** Table partitions. */
public static final int PARTITIONS = 32;
/** Node port. */
public static final int PORT = 2245;
/** Node name. */
public static final String NODE_NAME = "node1";
/** Configuration manager. */
private ConfigurationManager cfrMgr;
/** Before all test scenarios. */
@BeforeEach
private void before() {
try {
cfrMgr = new ConfigurationManager(rootConfigurationKeys(), Arrays.asList(
new TestConfigurationStorage(ConfigurationType.LOCAL),
new TestConfigurationStorage(ConfigurationType.DISTRIBUTED)));
cfrMgr.bootstrap("{\n" +
" \"node\":{\n" +
" \"name\":\"node1\",\n" +
" \"metastorageNodes\":[\n" +
" \"" + NODE_NAME + "\"\n" +
" ]\n" +
" }\n" +
"}", ConfigurationType.LOCAL);
cfrMgr.bootstrap("{\n" +
" \"cluster\":{\n" +
" \"metastorageNodes\":[\n" +
" \"" + NODE_NAME + "\"\n" +
" ]\n" +
"},\n" +
" \"table\":{\n" +
" \"tables\":{\n" +
" \"" + STATIC_TABLE_NAME + "\":{\n" +
" \"name\":\"TestTable\",\n" +
" \"partitions\":16,\n" +
" \"replicas\":1,\n" +
" \"columns\":{\n" +
" \"id\":{\n" +
" \"name\":\"id\",\n" +
" \"type\":{\n" +
" \"type\":\"Int64\"\n" +
" },\n" +
" \"nullable\":false\n" +
" }\n" +
" },\n" +
" \"indices\":{\n" +
" \"pk\":{\n" +
" \"name\":\"pk\",\n" +
" \"type\":\"primary\",\n" +
" \"uniq\":true,\n" +
" \"columns\":{\n" +
" \"id\":{\n" +
" \"name\":\"id\",\n" +
" \"asc\":true\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
"}", ConfigurationType.DISTRIBUTED);
}
catch (Exception e) {
LOG.error("Failed to bootstrap the test configuration manager.", e);
fail("Failed to configure manager [err=" + e.getMessage() + ']');
}
}
/**
* Tests a table which was defined before start through bootstrap configuration.
*/
@Disabled("https://issues.apache.org/jira/browse/IGNITE-14578")
@Test
public void testStaticTableConfigured() {
MetaStorageManager mm = mock(MetaStorageManager.class);
SchemaManager sm = mock(SchemaManager.class);
AffinityManager am = mock(AffinityManager.class);
Loza rm = mock(Loza.class);
VaultManager vm = mock(VaultManager.class);
TableManager tableManager = new TableManager(cfrMgr, mm, sm, am, rm, vm);
assertEquals(1, tableManager.tables().size());
assertNotNull(tableManager.table(STATIC_TABLE_NAME));
}
/**
* Tests create a table through public API.
*/
@Test
public void testCreateTable() {
MetaStorageManager mm = mock(MetaStorageManager.class);
SchemaManager sm = mock(SchemaManager.class);
AffinityManager am = mock(AffinityManager.class);
Loza rm = mock(Loza.class);
VaultManager vm = mock(VaultManager.class);
ClusterNode node = new ClusterNode(UUID.randomUUID().toString(), NODE_NAME, "127.0.0.1", PORT);
CompletableFuture<UUID> tblIdFut = new CompletableFuture<>();
TableManager tableManager = mockManagersAndCreateTable(DYNAMIC_TABLE_NAME, mm, sm, am, rm, vm, node, tblIdFut);
assertNotNull(tableManager.table("PUBLIC." + DYNAMIC_TABLE_NAME));
}
/**
* Tests drop a table through public API.
*/
@Test
public void testDropTable() {
MetaStorageManager mm = mock(MetaStorageManager.class);
SchemaManager sm = mock(SchemaManager.class);
AffinityManager am = mock(AffinityManager.class);
Loza rm = mock(Loza.class);
VaultManager vm = mock(VaultManager.class);
ClusterNode node = new ClusterNode(UUID.randomUUID().toString(), NODE_NAME, "127.0.0.1", PORT);
CompletableFuture<UUID> tblIdFut = new CompletableFuture<>();
TableManager tableManager = mockManagersAndCreateTable(DYNAMIC_TABLE_FOR_DROP_NAME, mm, sm, am, rm, vm, node, tblIdFut);
assertNotNull(tableManager.table("PUBLIC." + DYNAMIC_TABLE_FOR_DROP_NAME));
when(sm.unregisterSchemas(any())).thenReturn(CompletableFuture.completedFuture(true));
doAnswer(invokation -> {
BiPredicate<SchemaEventParameters, Exception> schemaInitialized = invokation.getArgument(1);
assertTrue(tblIdFut.isDone());
SchemaRegistry schemaRegistry = mock(SchemaRegistry.class);
CompletableFuture.supplyAsync(() -> schemaInitialized.test(
new SchemaEventParameters(tblIdFut.join(), schemaRegistry),
null));
return null;
}).when(sm).listen(same(SchemaEvent.DROPPED), any());
when(am.removeAssignment(any())).thenReturn(CompletableFuture.completedFuture(true));
doAnswer(invokation -> {
BiPredicate<AffinityEventParameters, Exception> affinityRemovedDelegate = (BiPredicate)invokation.getArgument(1);
ArrayList<List<ClusterNode>> assignment = new ArrayList<>(PARTITIONS);
for (int part = 0; part < PARTITIONS; part++) {
assignment.add(new ArrayList<ClusterNode>(Collections.singleton(node)));
}
assertTrue(tblIdFut.isDone());
CompletableFuture.supplyAsync(() -> affinityRemovedDelegate.test(
new AffinityEventParameters(tblIdFut.join(), assignment),
null));
return null;
}).when(am).listen(same(AffinityEvent.REMOVED), any());
tableManager.dropTable("PUBLIC." + DYNAMIC_TABLE_FOR_DROP_NAME);
assertNull(tableManager.table("PUBLIC." + DYNAMIC_TABLE_FOR_DROP_NAME));
}
/**
* Instantiates Table manager and creates a table in it.
*
* @param tableName Table name.
* @param mm Metastorage manager mock.
* @param sm Schema manager mock.
* @param am Affinity manager mock.
* @param rm Raft manager mock.
* @param vm Vault manager mock.
* @param node This cluster node.
* @param tblIdFut Future which will determine a table id.
* @return Table manager.
*/
private TableManager mockManagersAndCreateTable(
String tableName,
MetaStorageManager mm,
SchemaManager sm,
AffinityManager am,
Loza rm,
VaultManager vm,
ClusterNode node,
CompletableFuture<UUID> tblIdFut
) {
when(mm.invoke((Condition)any(), (Operation)any(), (Operation)any())).thenAnswer(invokation -> {
Condition condition = (Condition)invokation.getArgument(0);
Object internalCondition = ReflectionUtils.tryToReadFieldValue(Condition.class, "cond", condition).get();
Method getKeyMethod = ReflectionUtils.findMethod(internalCondition.getClass(), "key").get();
byte[] metastorageKeyBytes = (byte[])ReflectionUtils.invokeMethod(getKeyMethod, internalCondition);
tblIdFut.complete(UUID.fromString(new String(metastorageKeyBytes, StandardCharsets.UTF_8)
.substring(INTERNAL_PREFIX.length())));
return CompletableFuture.completedFuture(true);
});
when(sm.initSchemaForTable(any(), eq(tableName))).thenReturn(CompletableFuture.completedFuture(true));
doAnswer(invokation -> {
BiPredicate<SchemaEventParameters, Exception> schemaInitialized = invokation.getArgument(1);
assertTrue(tblIdFut.isDone());
SchemaRegistry schemaRegistry = mock(SchemaRegistry.class);
CompletableFuture.supplyAsync(() -> schemaInitialized.test(
new SchemaEventParameters(tblIdFut.join(), schemaRegistry),
null));
return null;
}).when(sm).listen(same(SchemaEvent.INITIALIZED), any());
when(am.calculateAssignments(any(), eq(tableName))).thenReturn(CompletableFuture.completedFuture(true));
doAnswer(invokation -> {
BiPredicate<AffinityEventParameters, Exception> affinityClaculatedDelegate = (BiPredicate)invokation.getArgument(1);
ArrayList<List<ClusterNode>> assignment = new ArrayList<>(PARTITIONS);
for (int part = 0; part < PARTITIONS; part++) {
assignment.add(new ArrayList<ClusterNode>(Collections.singleton(node)));
}
assertTrue(tblIdFut.isDone());
CompletableFuture.supplyAsync(() -> affinityClaculatedDelegate.test(
new AffinityEventParameters(tblIdFut.join(), assignment),
null));
return null;
}).when(am).listen(same(AffinityEvent.CALCULATED), any());
TableManager tableManager = new TableManager(cfrMgr, mm, sm, am, rm, vm);
int tablesBeforeCreation = tableManager.tables().size();
SchemaTable scmTbl2 = SchemaBuilders.tableBuilder("PUBLIC", tableName).columns(
SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
).withPrimaryKey("key").build();
Table tbl2 = tableManager.createTable(scmTbl2.canonicalName(), tblCh -> SchemaConfigurationConverter.convert(scmTbl2, tblCh)
.changeReplicas(1)
.changePartitions(10)
);
assertNotNull(tbl2);
assertEquals(tablesBeforeCreation + 1, tableManager.tables().size());
return tableManager;
}
/**
* Gets a list of configuration keys to use in the test scenario.
*
* @return List of root configuration keys.
*/
private static List<RootKey<?, ?>> rootConfigurationKeys() {
return Arrays.asList(
NodeConfiguration.KEY,
ClusterConfiguration.KEY,
TablesConfiguration.KEY
);
}
}