blob: c2e79b67b8bdbe70a875258521b6fff6d3d125c9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.table;
import java.lang.reflect.Method;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.internal.affinity.AffinityManager;
import org.apache.ignite.internal.affinity.event.AffinityEvent;
import org.apache.ignite.internal.affinity.event.AffinityEventParameters;
import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
import org.apache.ignite.internal.configuration.tree.NamedListNode;
import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
import org.apache.ignite.internal.manager.EventListener;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.client.Condition;
import org.apache.ignite.internal.metastorage.client.Entry;
import org.apache.ignite.internal.metastorage.client.Operation;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
import org.apache.ignite.internal.schema.event.SchemaEvent;
import org.apache.ignite.internal.schema.event.SchemaEventParameters;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.schema.ColumnType;
import org.apache.ignite.schema.SchemaBuilders;
import org.apache.ignite.schema.SchemaTable;
import org.apache.ignite.table.Table;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.platform.commons.util.ReflectionUtils;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
import static org.apache.ignite.configuration.annotation.ConfigurationType.LOCAL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Tests scenarios for table manager.
*/
@ExtendWith({MockitoExtension.class, WorkDirectoryExtension.class})
@MockitoSettings(strictness = Strictness.LENIENT)
public class TableManagerTest {
/** The logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(TableManagerTest.class);
/** Internal prefix for the metasorage. */
private static final String INTERNAL_PREFIX = "internal.tables.";
/** Public prefix for metastorage. */
private static final String PUBLIC_PREFIX = "dst-cfg.table.tables.";
/** The name of the table which is statically configured. */
private static final String STATIC_TABLE_NAME = "t1";
/** The name of the table which will be configured dynamically. */
private static final String DYNAMIC_TABLE_NAME = "t2";
/** The name of table to drop it. */
private static final String DYNAMIC_TABLE_FOR_DROP_NAME = "t3";
/** Table partitions. */
private static final int PARTITIONS = 32;
/** Node name. */
private static final String NODE_NAME = "node1";
/** Node configuration manager. */
private ConfigurationManager nodeCfgMgr;
/** Cluster configuration manager. */
private ConfigurationManager clusterCfgMgr;
/** MetaStorage manager. */
@Mock(lenient = true)
private MetaStorageManager mm;
/** Schema manager. */
@Mock(lenient = true)
private SchemaManager sm;
/** Affinity manager. */
@Mock(lenient = true)
private AffinityManager am;
/** Raft manager. */
@Mock(lenient = true)
private Loza rm;
@WorkDirectory
private Path workDir;
/** Test node. */
private final ClusterNode node = new ClusterNode(
UUID.randomUUID().toString(),
NODE_NAME,
new NetworkAddress("127.0.0.1", 2245)
);
/** Before all test scenarios. */
@BeforeEach
void setUp() {
try {
nodeCfgMgr = new ConfigurationManager(
List.of(NodeConfiguration.KEY),
Map.of(),
new TestConfigurationStorage(LOCAL),
List.of()
);
clusterCfgMgr = new ConfigurationManager(
List.of(ClusterConfiguration.KEY, TablesConfiguration.KEY),
Map.of(),
new TestConfigurationStorage(DISTRIBUTED),
List.of()
);
nodeCfgMgr.start();
clusterCfgMgr.start();
nodeCfgMgr.bootstrap("{\n" +
" \"node\":{\n" +
" \"metastorageNodes\":[\n" +
" \"" + NODE_NAME + "\"\n" +
" ]\n" +
" }\n" +
"}");
clusterCfgMgr.bootstrap("{\n" +
" \"cluster\":{\n" +
" \"metastorageNodes\":[\n" +
" \"" + NODE_NAME + "\"\n" +
" ]\n" +
"},\n" +
" \"table\":{\n" +
" \"tables\":{\n" +
" \"" + STATIC_TABLE_NAME + "\":{\n" +
" \"name\":\"TestTable\",\n" +
" \"partitions\":16,\n" +
" \"replicas\":1,\n" +
" \"columns\":{\n" +
" \"id\":{\n" +
" \"name\":\"id\",\n" +
" \"type\":{\n" +
" \"type\":\"Int64\"\n" +
" },\n" +
" \"nullable\":false\n" +
" }\n" +
" },\n" +
" \"indices\":{\n" +
" \"pk\":{\n" +
" \"name\":\"pk\",\n" +
" \"type\":\"primary\",\n" +
" \"uniq\":true,\n" +
" \"columns\":{\n" +
" \"id\":{\n" +
" \"name\":\"id\",\n" +
" \"asc\":true\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
"}");
}
catch (Exception e) {
LOG.error("Failed to bootstrap the test configuration manager.", e);
fail("Failed to configure manager [err=" + e.getMessage() + ']');
}
}
/** Stop configuration manager. */
@AfterEach
void tearDown() {
nodeCfgMgr.stop();
clusterCfgMgr.stop();
}
/**
* Tests a table which was defined before start through bootstrap configuration.
*/
@Disabled("https://issues.apache.org/jira/browse/IGNITE-15255")
@Test
public void testStaticTableConfigured() {
TableManager tableManager = new TableManager(nodeCfgMgr, clusterCfgMgr, mm, sm, am, rm, workDir);
assertEquals(1, tableManager.tables().size());
assertNotNull(tableManager.table(STATIC_TABLE_NAME));
}
/**
* Tests create a table through public API.
*/
@Test
public void testCreateTable() {
CompletableFuture<TableManager> tblManagerFut = new CompletableFuture<>();
SchemaTable scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_NAME).columns(
SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
).withPrimaryKey("key").build();
Table table = mockManagersAndCreateTable(scmTbl, tblManagerFut);
assertNotNull(table);
assertSame(table, tblManagerFut.join().table(scmTbl.canonicalName()));
}
/**
* Tests drop a table through public API.
*/
@Test
public void testDropTable() {
CompletableFuture<TableManager> tblManagerFut = new CompletableFuture<>();
SchemaTable scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_FOR_DROP_NAME).columns(
SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
).withPrimaryKey("key").build();
TableImpl table = mockManagersAndCreateTable(scmTbl, tblManagerFut);
TableManager tableManager = tblManagerFut.join();
when(sm.unregisterSchemas(any())).thenReturn(CompletableFuture.completedFuture(true));
doAnswer(invocation -> {
EventListener<SchemaEventParameters> schemaInitialized = invocation.getArgument(1);
SchemaRegistry schemaRegistry = mock(SchemaRegistry.class);
CompletableFuture.supplyAsync(() -> schemaInitialized.notify(
new SchemaEventParameters(table.tableId(), schemaRegistry),
null));
return null;
}).when(sm).listen(same(SchemaEvent.DROPPED), any());
when(am.removeAssignment(any())).thenReturn(CompletableFuture.completedFuture(true));
doAnswer(invocation -> {
EventListener<AffinityEventParameters> affinityRemovedDelegate = invocation.getArgument(1);
ArrayList<List<ClusterNode>> assignment = new ArrayList<>(PARTITIONS);
for (int part = 0; part < PARTITIONS; part++)
assignment.add(new ArrayList<ClusterNode>(Collections.singleton(node)));
CompletableFuture.supplyAsync(() -> affinityRemovedDelegate.notify(
new AffinityEventParameters(table.tableId(), assignment),
null));
return null;
}).when(am).listen(same(AffinityEvent.REMOVED), any());
tableManager.dropTable(scmTbl.canonicalName());
assertNull(tableManager.table(scmTbl.canonicalName()));
}
/**
* Instantiates a table and prepares Table manager.
*/
@Test
public void testGetTableDuringCreation() throws Exception {
CompletableFuture<TableManager> tblManagerFut = new CompletableFuture<>();
SchemaTable scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_FOR_DROP_NAME).columns(
SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
).withPrimaryKey("key").build();
Phaser phaser = new Phaser(2);
CompletableFuture<Table> createFut = CompletableFuture.supplyAsync(() ->
mockManagersAndCreateTableWithDelay(scmTbl, tblManagerFut, phaser)
);
CompletableFuture<Table> getFut = CompletableFuture.supplyAsync(() -> {
phaser.awaitAdvance(0);
return tblManagerFut.join().table(scmTbl.canonicalName());
});
CompletableFuture<Collection<Table>> getAllTablesFut = CompletableFuture.supplyAsync(() -> {
phaser.awaitAdvance(0);
return tblManagerFut.join().tables();
});
assertFalse(createFut.isDone());
assertFalse(getFut.isDone());
assertFalse(getAllTablesFut.isDone());
phaser.arrive();
assertSame(createFut.join(), getFut.join());
assertEquals(1, getAllTablesFut.join().size());
}
/**
* Tries to create a table that already exists.
*/
@Test
public void testDoubledCreateTable() {
CompletableFuture<TableManager> tblManagerFut = new CompletableFuture<>();
SchemaTable scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_NAME)
.columns(
SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
SchemaBuilders.column("val", ColumnType.INT64).asNullable().build())
.withPrimaryKey("key")
.build();
Table table = mockManagersAndCreateTable(scmTbl, tblManagerFut);
assertNotNull(table);
assertThrows(RuntimeException.class, () -> tblManagerFut.join().createTable(scmTbl.canonicalName(), tblCh -> SchemaConfigurationConverter.convert(scmTbl, tblCh)
.changeReplicas(1)
.changePartitions(10)));
assertSame(table, tblManagerFut.join().getOrCreateTable(scmTbl.canonicalName(), tblCh -> SchemaConfigurationConverter.convert(scmTbl, tblCh)
.changeReplicas(1)
.changePartitions(10)));
}
/**
* Instantiates Table manager and creates a table in it.
*
* @param schemaTable Configuration schema for a table.
* @param tblManagerFut Future for table manager.
* @return Table.
*/
private TableImpl mockManagersAndCreateTable(
SchemaTable schemaTable,
CompletableFuture<TableManager> tblManagerFut
) {
return mockManagersAndCreateTableWithDelay(schemaTable, tblManagerFut, null);
}
/**
* Instantiates a table and prepares Table manager. When the latch would open, the method completes.
*
* @param schemaTable Configuration schema for a table.
* @param tblManagerFut Future for table manager.
* @param phaser Phaser for the wait.
* @return Table manager.
*/
@NotNull private TableImpl mockManagersAndCreateTableWithDelay(
SchemaTable schemaTable,
CompletableFuture<TableManager> tblManagerFut,
Phaser phaser
) {
when(rm.prepareRaftGroup(any(), any(), any())).thenReturn(CompletableFuture.completedFuture(null));
when(mm.hasMetastorageLocally(any())).thenReturn(true);
CompletableFuture<UUID> tblIdFut = new CompletableFuture<>();
String keyForCheck = PUBLIC_PREFIX + ConfigurationUtil.escape(schemaTable.canonicalName()) + ".name";
AtomicBoolean tableCreatedFlag = new AtomicBoolean();
when(mm.invoke(any(Condition.class), any(Operation.class), any(Operation.class))).thenAnswer(invocation -> {
Condition condition = invocation.getArgument(0);
Object internalCondition = ReflectionUtils.tryToReadFieldValue(Condition.class, "cond", condition).get();
Method getKeyMethod = ReflectionUtils.findMethod(internalCondition.getClass(), "key").get();
String metastorageKey = new String((byte[])ReflectionUtils.invokeMethod(getKeyMethod, internalCondition));
if (keyForCheck.equals(metastorageKey))
return CompletableFuture.completedFuture(tableCreatedFlag.get());
tblIdFut.complete(UUID.fromString(metastorageKey.substring(INTERNAL_PREFIX.length())));
return CompletableFuture.completedFuture(true);
});
when(sm.initSchemaForTable(any(), eq(schemaTable.canonicalName()))).thenReturn(CompletableFuture.completedFuture(true));
doAnswer(invocation -> {
EventListener<SchemaEventParameters> schemaInitialized = invocation.getArgument(1);
assertTrue(tblIdFut.isDone());
SchemaRegistry schemaRegistry = mock(SchemaRegistry.class);
CompletableFuture.supplyAsync(() -> schemaInitialized.notify(
new SchemaEventParameters(tblIdFut.join(), schemaRegistry),
null));
return null;
}).when(sm).listen(same(SchemaEvent.INITIALIZED), any());
when(am.calculateAssignments(any(), eq(schemaTable.canonicalName()))).thenReturn(CompletableFuture.completedFuture(true));
doAnswer(invocation -> {
EventListener<AffinityEventParameters> affinityCalculatedDelegate = invocation.getArgument(1);
ArrayList<List<ClusterNode>> assignment = new ArrayList<>(PARTITIONS);
for (int part = 0; part < PARTITIONS; part++)
assignment.add(new ArrayList<ClusterNode>(Collections.singleton(node)));
assertTrue(tblIdFut.isDone());
CompletableFuture.supplyAsync(() -> affinityCalculatedDelegate.notify(
new AffinityEventParameters(tblIdFut.join(), assignment),
null));
return null;
}).when(am).listen(same(AffinityEvent.CALCULATED), any());
TableManager tableManager = new TableManager(nodeCfgMgr, clusterCfgMgr, mm, sm, am, rm, workDir);
TableImpl tbl2;
try {
tableManager.start();
tblManagerFut.complete(tableManager);
when(mm.range(eq(new ByteArray(PUBLIC_PREFIX)), any())).thenAnswer(invocation -> {
Cursor<Entry> cursor = mock(Cursor.class);
when(cursor.hasNext()).thenReturn(false);
return cursor;
});
int tablesBeforeCreation = tableManager.tables().size();
clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().listen(ctx -> {
boolean createTbl = ctx.newValue().get(schemaTable.canonicalName()) != null &&
ctx.oldValue().get(schemaTable.canonicalName()) == null;
boolean dropTbl = ctx.oldValue().get(schemaTable.canonicalName()) != null &&
ctx.newValue().get(schemaTable.canonicalName()) == null;
if (!createTbl && !dropTbl)
return CompletableFuture.completedFuture(null);
tableCreatedFlag.set(createTbl);
when(mm.range(eq(new ByteArray(PUBLIC_PREFIX)), any())).thenAnswer(invocation -> {
AtomicBoolean firstRecord = new AtomicBoolean(createTbl);
Cursor<Entry> cursor = mock(Cursor.class);
when(cursor.hasNext()).thenAnswer(hasNextInvocation ->
firstRecord.compareAndSet(true, false));
Entry mockEntry = mock(Entry.class);
when(mockEntry.key()).thenReturn(new ByteArray(PUBLIC_PREFIX + "uuid." + NamedListNode.NAME));
when(mockEntry.value()).thenReturn(ByteUtils.toBytes(schemaTable.canonicalName()));
when(cursor.next()).thenReturn(mockEntry);
return cursor;
});
if (phaser != null)
phaser.arriveAndAwaitAdvance();
return CompletableFuture.completedFuture(null);
});
tbl2 = (TableImpl)tableManager.createTable(schemaTable.canonicalName(), tblCh -> SchemaConfigurationConverter.convert(schemaTable, tblCh)
.changeReplicas(1)
.changePartitions(10)
);
assertNotNull(tbl2);
assertEquals(tablesBeforeCreation + 1, tableManager.tables().size());
}
finally {
tableManager.stop();
}
return tbl2;
}
}