blob: 23d715f4cb05ee69c1f78f976529718e6d6cc9ba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.sql.engine.exec;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema.UNKNOWN_DATA_STORAGE;
import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine.ENGINE_NAME;
import static org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.ConstantValueDefaultConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.FunctionCallDefaultConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.TableView;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.configuration.notifications.ConfigurationStorageRevisionListenerHolder;
import org.apache.ignite.internal.configuration.schema.ExtendedTableConfigurationSchema;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.configuration.testframework.InjectRevisionListenerHolder;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaUtils;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.QueryContext;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
import org.apache.ignite.internal.sql.engine.session.SessionId;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.DataStorageModules;
import org.apache.ignite.internal.storage.chm.TestConcurrentHashMapDataStorageModule;
import org.apache.ignite.internal.storage.chm.TestConcurrentHashMapStorageEngine;
import org.apache.ignite.internal.storage.chm.schema.TestConcurrentHashMapDataStorageConfigurationSchema;
import org.apache.ignite.internal.storage.chm.schema.TestConcurrentHashMapDataStorageView;
import org.apache.ignite.internal.storage.rocksdb.RocksDbDataStorageModule;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageConfigurationSchema;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageView;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.ColumnAlreadyExistsException;
import org.apache.ignite.lang.ColumnNotFoundException;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IndexAlreadyExistsException;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.lang.TableAlreadyExistsException;
import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TopologyService;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.sql.SqlException;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
/** Mock ddl usage. */
@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
@MockitoSettings(strictness = Strictness.LENIENT)
public class MockedStructuresTest extends IgniteAbstractTest {
/** Node name. */
private static final String NODE_NAME = "node1";
/** Schema manager. */
@Mock
private BaselineManager bm;
/** Topology service. */
@Mock
private TopologyService ts;
/** Cluster service. */
@Mock(lenient = true)
private ClusterService cs;
/** Raft manager. */
@Mock
private Loza rm;
/** TX manager. */
@Mock(lenient = true)
private TxManager tm;
/** Meta storage manager. */
@Mock
MetaStorageManager msm;
/**
* Revision listener holder. It uses for the test configurations:
* <ul>
* <li>{@link MockedStructuresTest#tblsCfg},</li>
* </ul>
*/
@InjectRevisionListenerHolder
private ConfigurationStorageRevisionListenerHolder fieldRevisionListenerHolder;
/** Revision updater. */
private Consumer<Function<Long, CompletableFuture<?>>> revisionUpdater;
/** Tables configuration. */
@InjectConfiguration(
internalExtensions = ExtendedTableConfigurationSchema.class,
polymorphicExtensions = {
HashIndexConfigurationSchema.class,
SortedIndexConfigurationSchema.class,
UnknownDataStorageConfigurationSchema.class,
RocksDbDataStorageConfigurationSchema.class,
TestConcurrentHashMapDataStorageConfigurationSchema.class,
ConstantValueDefaultConfigurationSchema.class,
FunctionCallDefaultConfigurationSchema.class,
NullValueDefaultConfigurationSchema.class
}
)
private TablesConfiguration tblsCfg;
TableManager tblManager;
SqlQueryProcessor queryProc;
/** Test node. */
private final ClusterNode node = new ClusterNode(
UUID.randomUUID().toString(),
NODE_NAME,
new NetworkAddress("127.0.0.1", 2245)
);
@InjectConfiguration
private RocksDbStorageEngineConfiguration rocksDbEngineConfig;
@Mock
private ConfigurationRegistry configRegistry;
DataStorageManager dataStorageManager;
SchemaManager schemaManager;
/** Returns current method name. */
private static String getCurrentMethodName() {
return StackWalker.getInstance()
.walk(s -> s.skip(1).findFirst())
.get()
.getMethodName();
}
/** Stop configuration manager. */
@AfterEach
void after() {
try {
Objects.requireNonNull(queryProc).stop();
} catch (Exception e) {
fail(e);
}
try {
Objects.requireNonNull(dataStorageManager).stop();
} catch (Exception e) {
fail(e);
}
Objects.requireNonNull(tblManager).stop();
}
/** Inner initialisation. */
@BeforeEach
void before() throws Exception {
revisionUpdater = (Function<Long, CompletableFuture<?>> function) -> {
function.apply(0L).join();
fieldRevisionListenerHolder.listenUpdateStorageRevision(newStorageRevision -> {
log.info("Notify about revision: {}", newStorageRevision);
return function.apply(newStorageRevision);
});
};
when(configRegistry.getConfiguration(RocksDbStorageEngineConfiguration.KEY)).thenReturn(rocksDbEngineConfig);
DataStorageModules dataStorageModules = new DataStorageModules(List.of(
new RocksDbDataStorageModule(),
new TestConcurrentHashMapDataStorageModule()
));
dataStorageManager = new DataStorageManager(
tblsCfg,
dataStorageModules.createStorageEngines(NODE_NAME, configRegistry, workDir, null)
);
dataStorageManager.start();
schemaManager = new SchemaManager(revisionUpdater, tblsCfg);
schemaManager.start();
tblManager = mockManagers();
queryProc = new SqlQueryProcessor(
revisionUpdater,
cs,
tblManager,
schemaManager,
dataStorageManager,
() -> dataStorageModules.collectSchemasFields(List.of(
RocksDbDataStorageConfigurationSchema.class,
TestConcurrentHashMapDataStorageConfigurationSchema.class
))
);
queryProc.start();
tblsCfg.defaultDataStorage().update(ENGINE_NAME).get(1, TimeUnit.SECONDS);
rocksDbEngineConfig.regions()
.change(c -> c.create("test_region", rocksDbDataRegionChange -> {}))
.get(1, TimeUnit.SECONDS);
}
/**
* Checks inner transactions are initialized correctly.
*/
@Test
public void testInnerTxInitiated() throws Exception {
SessionId sesId = queryProc.createSession(PropertiesHolder.holderFor(Map.of()));
InternalTransaction tx = mock(InternalTransaction.class);
when(tm.begin()).thenReturn(tx);
String sql = "CREATE TABLE TEST (c1 int PRIMARY KEY, c2 varbinary(255))";
CompletableFuture<AsyncSqlCursor<List<Object>>> f = queryProc.querySingleAsync(sesId, QueryContext.of(), sql);
AsyncSqlCursor<List<Object>> asyncRes = f.get();
asyncRes.closeAsync();
verify(tm, never()).begin();
}
/**
* Tests create a table through public API.
*/
@Test
public void testCreateTable() {
SqlQueryProcessor finalQueryProc = queryProc;
String curMethodName = getCurrentMethodName();
String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) "
+ "with partitions=1,replicas=1", curMethodName);
awaitFirst(queryProc.queryAsync("PUBLIC", newTblSql));
assertTrue(tblManager.tables().stream().anyMatch(t -> t.name()
.equalsIgnoreCase("PUBLIC." + curMethodName)));
String finalNewTblSql1 = newTblSql;
assertThrows(TableAlreadyExistsException.class, () -> awaitFirst(finalQueryProc.queryAsync("PUBLIC", finalNewTblSql1)));
String finalNewTblSql2 = String.format("CREATE TABLE \"PUBLIC\".%s (c1 int PRIMARY KEY, c2 varbinary(255)) "
+ "with partitions=1,replicas=1", curMethodName);
assertThrows(TableAlreadyExistsException.class, () -> awaitFirst(finalQueryProc.queryAsync("PUBLIC", finalNewTblSql2)));
// todo: correct exception need to be thrown https://issues.apache.org/jira/browse/IGNITE-16084
assertThrows(SqlException.class, () -> awaitFirst(finalQueryProc.queryAsync("PUBLIC",
"CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with partitions__wrong=1,replicas=1")));
assertThrows(SqlException.class, () -> awaitFirst(finalQueryProc.queryAsync("PUBLIC",
"CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with partitions=1,replicas__wrong=1")));
newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varchar(255))",
" IF NOT EXISTS " + curMethodName);
String finalNewTblSql3 = newTblSql;
assertDoesNotThrow(() -> await(finalQueryProc.queryAsync("PUBLIC", finalNewTblSql3).get(0)));
}
/**
* Tests create a table with multiple pk through public API.
*/
@Test
public void testCreateTableMultiplePk() {
String curMethodName = getCurrentMethodName();
String newTblSql = String.format("CREATE TABLE %s (c1 int, c2 int NOT NULL DEFAULT 1, c3 int, primary key(c1, c2))", curMethodName);
awaitFirst(queryProc.queryAsync("PUBLIC", newTblSql));
assertTrue(tblManager.tables().stream().anyMatch(t -> t.name()
.equalsIgnoreCase("PUBLIC." + curMethodName)));
}
/**
* Tests create and drop table through public API.
*/
@Test
public void testDropTable() {
String curMethodName = getCurrentMethodName();
String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varchar(255))", curMethodName);
awaitFirst(queryProc.queryAsync("PUBLIC", newTblSql));
awaitFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE " + curMethodName));
SqlQueryProcessor finalQueryProc = queryProc;
assertThrows(TableNotFoundException.class, () -> awaitFirst(finalQueryProc.queryAsync("PUBLIC",
"DROP TABLE " + curMethodName + "_not_exist")));
assertThrows(TableNotFoundException.class, () -> awaitFirst(finalQueryProc.queryAsync("PUBLIC",
"DROP TABLE " + curMethodName)));
assertThrows(TableNotFoundException.class, () -> awaitFirst(finalQueryProc.queryAsync("PUBLIC",
"DROP TABLE PUBLIC." + curMethodName)));
awaitFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE IF EXISTS PUBLIC." + curMethodName + "_not_exist"));
awaitFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE IF EXISTS PUBLIC." + curMethodName));
assertTrue(tblManager.tables().stream().noneMatch(t -> t.name()
.equalsIgnoreCase("PUBLIC." + curMethodName)));
}
/**
* Tests alter and drop columns through public API.
*/
@Test
public void testAlterAndDropSimpleCase() {
SqlQueryProcessor finalQueryProc = queryProc;
String curMethodName = getCurrentMethodName();
String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varchar(255))", curMethodName);
awaitFirst(queryProc.queryAsync("PUBLIC", newTblSql));
String alterCmd = String.format("ALTER TABLE %s ADD COLUMN (c3 varchar, c4 int)", curMethodName);
awaitFirst(queryProc.queryAsync("PUBLIC", alterCmd));
String alterCmd1 = String.format("ALTER TABLE %s ADD COLUMN c5 int NOT NULL DEFAULT 1", curMethodName);
awaitFirst(queryProc.queryAsync("PUBLIC", alterCmd1));
assertThrows(ColumnAlreadyExistsException.class, () -> awaitFirst(finalQueryProc.queryAsync("PUBLIC", alterCmd)));
String alterCmdNoTbl = String.format("ALTER TABLE %s ADD COLUMN (c3 varchar, c4 int)", curMethodName + "_notExist");
assertThrows(TableNotFoundException.class, () -> awaitFirst(queryProc.queryAsync("PUBLIC", alterCmdNoTbl)));
String alterIfExistsCmd = String.format("ALTER TABLE IF EXISTS %s ADD COLUMN (c3 varchar, c4 int)", curMethodName + "NotExist");
awaitFirst(queryProc.queryAsync("PUBLIC", alterIfExistsCmd));
assertThrows(ColumnAlreadyExistsException.class, () -> awaitFirst(finalQueryProc.queryAsync("PUBLIC", alterCmd)));
awaitFirst(finalQueryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s DROP COLUMN c4", curMethodName)));
awaitFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s ADD COLUMN IF NOT EXISTS c3 varchar", curMethodName)));
awaitFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s DROP COLUMN c3", curMethodName)));
awaitFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s DROP COLUMN IF EXISTS c3", curMethodName)));
assertThrows(ColumnNotFoundException.class, () -> awaitFirst(finalQueryProc.queryAsync("PUBLIC",
String.format("ALTER TABLE %s DROP COLUMN (c3, c4)", curMethodName))));
assertThrows(IgniteException.class, () -> awaitFirst(finalQueryProc.queryAsync("PUBLIC",
String.format("ALTER TABLE %s DROP COLUMN c1", curMethodName))));
}
/**
* Tests alter add multiple columns through public API.
*/
@Test
public void testAlterColumnsAddBatch() {
String curMethodName = getCurrentMethodName();
awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varchar(255))", curMethodName)));
awaitFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s ADD COLUMN (c3 varchar, c4 varchar)", curMethodName)));
awaitFirst(queryProc
.queryAsync("PUBLIC", String.format("ALTER TABLE %s ADD COLUMN IF NOT EXISTS (c3 varchar, c4 varchar)", curMethodName)));
awaitFirst(
queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s ADD COLUMN IF NOT EXISTS (c3 varchar, c4 varchar, c5 varchar)",
curMethodName)));
SqlQueryProcessor finalQueryProc = queryProc;
assertThrows(ColumnAlreadyExistsException.class, () -> awaitFirst(finalQueryProc.queryAsync("PUBLIC",
String.format("ALTER TABLE %s ADD COLUMN (c5 varchar)", curMethodName))));
}
/**
* Tests alter drop multiple columns through public API.
*/
@Test
public void testAlterColumnsDropBatch() {
String curMethodName = getCurrentMethodName();
awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE TABLE %s "
+ "(c1 int PRIMARY KEY, c2 decimal(10), c3 varchar, c4 varchar, c5 varchar)", curMethodName)));
awaitFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s DROP COLUMN c4", curMethodName)));
awaitFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s DROP COLUMN IF EXISTS (c3, c4, c5)", curMethodName)));
SqlQueryProcessor finalQueryProc = queryProc;
assertThrows(ColumnNotFoundException.class, () -> awaitFirst(finalQueryProc.queryAsync("PUBLIC",
String.format("ALTER TABLE %s DROP COLUMN c4", curMethodName))));
}
/**
* Tests create a table through public API.
*/
@Disabled("https://issues.apache.org/jira/browse/IGNITE-16032")
@Test
public void testCreateDropIndex() {
SqlQueryProcessor finalQueryProc = queryProc;
String curMethodName = getCurrentMethodName();
String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with partitions=1", curMethodName);
awaitFirst(queryProc.queryAsync("PUBLIC", newTblSql));
assertTrue(tblManager.tables().stream().anyMatch(t -> t.name()
.equalsIgnoreCase("PUBLIC." + curMethodName)));
awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX index1 ON %s (c1)", curMethodName)));
awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX IF NOT EXISTS index1 ON %s (c1)", curMethodName)));
awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX index2 ON %s (c1)", curMethodName)));
awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX index3 ON %s (c2)", curMethodName)));
assertThrows(IndexAlreadyExistsException.class, () ->
awaitFirst(finalQueryProc.queryAsync("PUBLIC", String.format("CREATE INDEX index3 ON %s (c1)", curMethodName))));
assertThrows(IgniteException.class, () ->
awaitFirst(finalQueryProc
.queryAsync("PUBLIC", String.format("CREATE INDEX index_3 ON %s (c1)", curMethodName + "_nonExist"))));
awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX index4 ON %s (c2 desc, c1 asc)", curMethodName)));
awaitFirst(queryProc.queryAsync("PUBLIC", String.format("DROP INDEX index4 ON %s", curMethodName)));
awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX index4 ON %s (c2 desc, c1 asc)", curMethodName)));
awaitFirst(queryProc.queryAsync("PUBLIC", String.format("DROP INDEX index4 ON %s", curMethodName)));
awaitFirst(queryProc.queryAsync("PUBLIC", String.format("DROP INDEX IF EXISTS index4 ON %s", curMethodName)));
}
@Disabled("https://issues.apache.org/jira/browse/IGNITE-17197")
@Test
void createTableWithEngine() throws Exception {
String method = getCurrentMethodName();
// Without engine.
assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255))", method + 0)
)));
assertThat(tableView(method + 0).dataStorage(), instanceOf(RocksDbDataStorageView.class));
// With existing engine.
assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
"PUBLIC",
String.format(
"CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) engine %s",
method + 1,
TestConcurrentHashMapStorageEngine.ENGINE_NAME
)
)));
assertThat(tableView(method + 1).dataStorage(), instanceOf(TestConcurrentHashMapDataStorageView.class));
// With existing engine in mixed case
assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) engine %s", method + 2, "\"RocksDb\"")
)));
assertThat(tableView(method + 2).dataStorage(), instanceOf(RocksDbDataStorageView.class));
IgniteException exception = assertThrows(
IgniteException.class,
() -> awaitFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) engine %s", method + 3, method)
))
);
assertThat(exception.getMessage(), startsWith("Unexpected data storage engine"));
tblsCfg.defaultDataStorage().update(UNKNOWN_DATA_STORAGE).get(1, TimeUnit.SECONDS);
exception = assertThrows(
IgniteException.class,
() -> awaitFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255))", method + 4)
))
);
assertThat(exception.getMessage(), startsWith("Default data storage is not defined"));
}
@Test
void createTableWithTableOptions() {
String method = getCurrentMethodName();
assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with replicas=1", method + 0)
)));
assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with REPLICAS=1", method + 1)
)));
assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with \"replicas\"=1", method + 2)
)));
assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with \"replICAS\"=1", method + 3)
)));
assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with replicas=1, partitions=1", method + 4)
)));
IgniteException exception = assertThrows(
IgniteException.class,
() -> awaitFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with replicas='%s'", method + 5, method)
))
);
assertThat(exception.getMessage(), containsString("Unsuspected table option type"));
exception = assertThrows(
IgniteException.class,
() -> awaitFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with %s='%s'", method + 6, method, method)
))
);
assertThat(exception.getMessage(), containsString("Unexpected table option"));
exception = assertThrows(
IgniteException.class,
() -> awaitFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with replicas=-1", method + 7)
))
);
assertThat(exception.getMessage(), containsString("Table option validation failed"));
}
@Test
void createTableWithDataStorageOptions() {
String method = getCurrentMethodName();
assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with dataRegion='default'", method + 0)
)));
assertThat(
((RocksDbDataStorageView) tableView(method + 0).dataStorage()).dataRegion(),
equalTo(DEFAULT_DATA_REGION_NAME)
);
assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with DATAREGION='test_region'", method + 1)
)));
assertThat(
((RocksDbDataStorageView) tableView(method + 1).dataStorage()).dataRegion(),
equalTo("test_region")
);
}
// todo copy-paste from TableManagerTest will be removed after https://issues.apache.org/jira/browse/IGNITE-16050
/**
* Instantiates a table and prepares Table manager.
*
* @return Table manager.
*/
private TableManager mockManagers() throws NodeStoppingException {
when(rm.prepareRaftGroup(any(), any(), any(), any())).thenAnswer(mock -> {
RaftGroupService raftGrpSrvcMock = mock(RaftGroupService.class);
when(raftGrpSrvcMock.leader()).thenReturn(new Peer(new NetworkAddress("localhost", 47500)));
return completedFuture(raftGrpSrvcMock);
});
when(rm.startRaftGroupService(any(), any())).thenAnswer(mock -> {
RaftGroupService raftGrpSrvcMock = mock(RaftGroupService.class);
when(raftGrpSrvcMock.leader()).thenReturn(new Peer(new NetworkAddress("localhost", 47500)));
return completedFuture(raftGrpSrvcMock);
});
when(ts.getByAddress(any(NetworkAddress.class))).thenReturn(new ClusterNode(
UUID.randomUUID().toString(),
"node0",
new NetworkAddress("localhost", 47500)
));
try (MockedStatic<SchemaUtils> schemaServiceMock = mockStatic(SchemaUtils.class)) {
schemaServiceMock.when(() -> SchemaUtils.prepareSchemaDescriptor(anyInt(), any()))
.thenReturn(mock(SchemaDescriptor.class));
}
when(cs.messagingService()).thenAnswer(invocation -> {
MessagingService ret = mock(MessagingService.class);
return ret;
});
when(cs.localConfiguration()).thenAnswer(invocation -> {
ClusterLocalConfiguration ret = mock(ClusterLocalConfiguration.class);
when(ret.getName()).thenReturn("node1");
return ret;
});
when(cs.topologyService()).thenAnswer(invocation -> {
TopologyService ret = mock(TopologyService.class);
when(ret.localMember()).thenReturn(new ClusterNode("1", "node1", null));
return ret;
});
when(msm.registerWatch(any(ByteArray.class), any())).thenReturn(CompletableFuture.completedFuture(1L));
TableManager tableManager = createTableManager();
return tableManager;
}
private TableManager createTableManager() {
TableManager tableManager = new TableManager(
revisionUpdater,
tblsCfg,
rm,
bm,
ts,
tm,
dataStorageManager,
msm,
schemaManager
);
tableManager.start();
return tableManager;
}
private <T> AsyncSqlCursor<T> awaitFirst(List<CompletableFuture<AsyncSqlCursor<T>>> cursors) {
return await(cursors.get(0));
}
private @Nullable TableView tableView(String tableName) {
return tblsCfg.tables().value().get("PUBLIC." + tableName.toUpperCase());
}
}