| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.catalog; |
| |
| import static java.util.stream.Collectors.toList; |
| import static org.apache.ignite.internal.catalog.CatalogManagerImpl.DEFAULT_ZONE_NAME; |
| import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME; |
| import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE; |
| import static org.apache.ignite.internal.catalog.CatalogService.SYSTEM_SCHEMA_NAME; |
| import static org.apache.ignite.internal.catalog.CatalogTestUtils.addColumnParams; |
| import static org.apache.ignite.internal.catalog.CatalogTestUtils.applyNecessaryLength; |
| import static org.apache.ignite.internal.catalog.CatalogTestUtils.applyNecessaryPrecision; |
| import static org.apache.ignite.internal.catalog.CatalogTestUtils.columnParams; |
| import static org.apache.ignite.internal.catalog.CatalogTestUtils.columnParamsBuilder; |
| import static org.apache.ignite.internal.catalog.CatalogTestUtils.dropColumnParams; |
| import static org.apache.ignite.internal.catalog.CatalogTestUtils.initializeColumnWithDefaults; |
| import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_FILTER; |
| import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT; |
| import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PRECISION; |
| import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_REPLICA_COUNT; |
| import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_SCALE; |
| import static org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE; |
| import static org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE; |
| import static org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName; |
| import static org.apache.ignite.internal.catalog.commands.DefaultValue.constant; |
| import static org.apache.ignite.internal.catalog.descriptors.CatalogColumnCollation.ASC_NULLS_LAST; |
| import static org.apache.ignite.internal.catalog.descriptors.CatalogColumnCollation.DESC_NULLS_FIRST; |
| import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE; |
| import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.BUILDING; |
| import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.REGISTERED; |
| import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.STOPPING; |
| import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; |
| import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; |
| import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; |
| import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; |
| import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; |
| import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast; |
| import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; |
| import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; |
| import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; |
| import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; |
| import static org.apache.ignite.sql.ColumnType.DECIMAL; |
| import static org.apache.ignite.sql.ColumnType.INT32; |
| import static org.apache.ignite.sql.ColumnType.INT64; |
| import static org.apache.ignite.sql.ColumnType.NULL; |
| import static org.apache.ignite.sql.ColumnType.STRING; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.hamcrest.Matchers.empty; |
| import static org.hamcrest.Matchers.equalTo; |
| import static org.hamcrest.Matchers.greaterThan; |
| import static org.hamcrest.Matchers.greaterThanOrEqualTo; |
| import static org.hamcrest.Matchers.hasItems; |
| import static org.hamcrest.Matchers.is; |
| import static org.hamcrest.Matchers.notNullValue; |
| import static org.hamcrest.Matchers.nullValue; |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertFalse; |
| import static org.junit.jupiter.api.Assertions.assertNotEquals; |
| import static org.junit.jupiter.api.Assertions.assertNotNull; |
| import static org.junit.jupiter.api.Assertions.assertNotSame; |
| import static org.junit.jupiter.api.Assertions.assertNull; |
| import static org.junit.jupiter.api.Assertions.assertSame; |
| import static org.junit.jupiter.api.Assertions.assertThrows; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| import static org.junit.jupiter.api.Assertions.fail; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.Mockito.clearInvocations; |
| import static org.mockito.Mockito.doNothing; |
| import static org.mockito.Mockito.doReturn; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.reset; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.timeout; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.verifyNoInteractions; |
| import static org.mockito.Mockito.verifyNoMoreInteractions; |
| import static org.mockito.Mockito.when; |
| |
| import java.util.ArrayList; |
| import java.util.EnumSet; |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Consumer; |
| import java.util.function.Supplier; |
| import java.util.stream.Stream; |
| import org.apache.ignite.internal.catalog.commands.AlterTableAlterColumnCommand; |
| import org.apache.ignite.internal.catalog.commands.AlterTableAlterColumnCommandBuilder; |
| import org.apache.ignite.internal.catalog.commands.AlterZoneCommand; |
| import org.apache.ignite.internal.catalog.commands.AlterZoneSetDefaultCommand; |
| import org.apache.ignite.internal.catalog.commands.CatalogUtils; |
| import org.apache.ignite.internal.catalog.commands.ColumnParams; |
| import org.apache.ignite.internal.catalog.commands.ColumnParams.Builder; |
| import org.apache.ignite.internal.catalog.commands.CreateSchemaCommand; |
| import org.apache.ignite.internal.catalog.commands.CreateZoneCommand; |
| import org.apache.ignite.internal.catalog.commands.DefaultValue; |
| import org.apache.ignite.internal.catalog.commands.DropIndexCommand; |
| import org.apache.ignite.internal.catalog.commands.DropZoneCommand; |
| import org.apache.ignite.internal.catalog.commands.MakeIndexAvailableCommand; |
| import org.apache.ignite.internal.catalog.commands.RemoveIndexCommand; |
| import org.apache.ignite.internal.catalog.commands.RenameTableCommand; |
| import org.apache.ignite.internal.catalog.commands.RenameZoneCommand; |
| import org.apache.ignite.internal.catalog.commands.StartBuildingIndexCommand; |
| import org.apache.ignite.internal.catalog.commands.StorageProfileParams; |
| import org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor; |
| import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; |
| import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor.CatalogIndexDescriptorType; |
| import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus; |
| import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor; |
| import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor; |
| import org.apache.ignite.internal.catalog.descriptors.CatalogSortedIndexDescriptor; |
| import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor; |
| import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; |
| import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; |
| import org.apache.ignite.internal.catalog.events.AddColumnEventParameters; |
| import org.apache.ignite.internal.catalog.events.CatalogEvent; |
| import org.apache.ignite.internal.catalog.events.CatalogEventParameters; |
| import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters; |
| import org.apache.ignite.internal.catalog.events.CreateTableEventParameters; |
| import org.apache.ignite.internal.catalog.events.CreateZoneEventParameters; |
| import org.apache.ignite.internal.catalog.events.DropColumnEventParameters; |
| import org.apache.ignite.internal.catalog.events.DropTableEventParameters; |
| import org.apache.ignite.internal.catalog.events.DropZoneEventParameters; |
| import org.apache.ignite.internal.catalog.events.MakeIndexAvailableEventParameters; |
| import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters; |
| import org.apache.ignite.internal.catalog.events.RenameTableEventParameters; |
| import org.apache.ignite.internal.catalog.events.StartBuildingIndexEventParameters; |
| import org.apache.ignite.internal.catalog.events.StoppingIndexEventParameters; |
| import org.apache.ignite.internal.catalog.storage.ObjectIdGenUpdateEntry; |
| import org.apache.ignite.internal.catalog.storage.UpdateLog; |
| import org.apache.ignite.internal.catalog.storage.UpdateLog.OnUpdateHandler; |
| import org.apache.ignite.internal.catalog.storage.VersionedUpdate; |
| import org.apache.ignite.internal.event.EventListener; |
| import org.apache.ignite.internal.hlc.HybridTimestamp; |
| import org.apache.ignite.internal.lang.IgniteInternalException; |
| import org.apache.ignite.sql.ColumnType; |
| import org.hamcrest.TypeSafeMatcher; |
| import org.jetbrains.annotations.Nullable; |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.params.ParameterizedTest; |
| import org.junit.jupiter.params.provider.Arguments; |
| import org.junit.jupiter.params.provider.EnumSource; |
| import org.junit.jupiter.params.provider.EnumSource.Mode; |
| import org.junit.jupiter.params.provider.MethodSource; |
| import org.junit.jupiter.params.provider.ValueSource; |
| import org.mockito.ArgumentCaptor; |
| |
| /** |
| * Catalog manager self test. |
| */ |
| public class CatalogManagerSelfTest extends BaseCatalogManagerTest { |
| private static final String SCHEMA_NAME = DEFAULT_SCHEMA_NAME; |
| private static final String TEST_ZONE_NAME = "TEST_ZONE_NAME"; |
| private static final String NEW_COLUMN_NAME = "NEWCOL"; |
| private static final String NEW_COLUMN_NAME_2 = "NEWCOL2"; |
| private static final int DFLT_TEST_PRECISION = 11; |
| |
| @Test |
| public void testEmptyCatalog() { |
| CatalogSchemaDescriptor defaultSchema = manager.schema(DEFAULT_SCHEMA_NAME, 1); |
| |
| assertNotNull(defaultSchema); |
| assertSame(defaultSchema, manager.activeSchema(DEFAULT_SCHEMA_NAME, clock.nowLong())); |
| assertSame(defaultSchema, manager.schema(1)); |
| assertSame(defaultSchema, manager.schema(defaultSchema.id(), 1)); |
| assertSame(defaultSchema, manager.activeSchema(clock.nowLong())); |
| |
| int nonExistingVersion = manager.latestCatalogVersion() + 1; |
| |
| assertNull(manager.schema(nonExistingVersion)); |
| assertNull(manager.schema(defaultSchema.id(), nonExistingVersion)); |
| assertThrows(IllegalStateException.class, () -> manager.activeSchema(-1L)); |
| |
| // Validate default schema. |
| assertEquals(DEFAULT_SCHEMA_NAME, defaultSchema.name()); |
| assertEquals(1, defaultSchema.id()); |
| assertEquals(0, defaultSchema.tables().length); |
| assertEquals(0, defaultSchema.indexes().length); |
| |
| // Default distribution zone must exists. |
| CatalogZoneDescriptor zone = latestActiveCatalog().defaultZone(); |
| |
| assertEquals(DEFAULT_ZONE_NAME, zone.name()); |
| assertEquals(DEFAULT_PARTITION_COUNT, zone.partitions()); |
| assertEquals(DEFAULT_REPLICA_COUNT, zone.replicas()); |
| assertEquals(DEFAULT_FILTER, zone.filter()); |
| assertEquals(INFINITE_TIMER_VALUE, zone.dataNodesAutoAdjust()); |
| assertEquals(IMMEDIATE_TIMER_VALUE, zone.dataNodesAutoAdjustScaleUp()); |
| assertEquals(INFINITE_TIMER_VALUE, zone.dataNodesAutoAdjustScaleDown()); |
| |
| // System schema should exist. |
| |
| CatalogSchemaDescriptor systemSchema = manager.schema(SYSTEM_SCHEMA_NAME, 1); |
| assertNotNull(systemSchema, "system schema"); |
| assertSame(systemSchema, manager.activeSchema(SYSTEM_SCHEMA_NAME, clock.nowLong())); |
| assertSame(systemSchema, manager.schema(SYSTEM_SCHEMA_NAME, 1)); |
| assertSame(systemSchema, manager.schema(systemSchema.id(), 1)); |
| |
| // Validate system schema. |
| assertEquals(SYSTEM_SCHEMA_NAME, systemSchema.name()); |
| assertEquals(2, systemSchema.id()); |
| assertEquals(0, systemSchema.tables().length); |
| assertEquals(0, systemSchema.indexes().length); |
| |
| assertThat(manager.latestCatalogVersion(), is(1)); |
| } |
| |
| @Test |
| public void assignsSuccessiveCatalogVersions() { |
| CompletableFuture<Integer> version1Future = manager.execute(simpleTable(TABLE_NAME)); |
| assertThat(version1Future, willCompleteSuccessfully()); |
| |
| CompletableFuture<Integer> version2Future = manager.execute(simpleIndex()); |
| assertThat(version2Future, willCompleteSuccessfully()); |
| |
| CompletableFuture<Integer> version3Future = manager.execute(simpleTable(TABLE_NAME_2)); |
| assertThat(version3Future, willCompleteSuccessfully()); |
| |
| int firstVersion = version1Future.join(); |
| assertThat(version2Future.join(), is(firstVersion + 1)); |
| assertThat(version3Future.join(), is(firstVersion + 2)); |
| } |
| |
| @Test |
| public void testNoInteractionsAfterStop() { |
| clearInvocations(updateLog); |
| |
| int futureVersion = manager.latestCatalogVersion() + 1; |
| |
| CompletableFuture<Void> readyFuture = manager.catalogReadyFuture(futureVersion); |
| assertFalse(readyFuture.isDone()); |
| |
| assertThat(manager.stopAsync(), willCompleteSuccessfully()); |
| |
| verify(updateLog).stopAsync(); |
| |
| assertTrue(readyFuture.isDone()); |
| |
| manager.execute(catalog -> null); |
| manager.execute(List.of(catalog -> null)); |
| |
| verifyNoMoreInteractions(updateLog); |
| } |
| |
| @Test |
| public void testCreateTable() { |
| long timePriorToTableCreation = clock.nowLong(); |
| |
| int tableCreationVersion = await( |
| manager.execute(createTableCommand( |
| TABLE_NAME, |
| List.of(columnParams("key1", INT32), columnParams("key2", INT32), columnParams("val", INT32, true)), |
| List.of("key1", "key2"), |
| List.of("key2") |
| )) |
| ); |
| |
| // Validate catalog version from the past. |
| CatalogSchemaDescriptor schema = manager.schema(tableCreationVersion - 1); |
| |
| assertNotNull(schema); |
| assertEquals(SCHEMA_NAME, schema.name()); |
| assertSame(schema, manager.activeSchema(timePriorToTableCreation)); |
| |
| assertNull(schema.table(TABLE_NAME)); |
| assertNull(manager.table(TABLE_NAME, 123L)); |
| assertNull(manager.aliveIndex(pkIndexName(TABLE_NAME), 123L)); |
| |
| // Validate actual catalog |
| schema = manager.schema(SCHEMA_NAME, tableCreationVersion); |
| CatalogTableDescriptor table = schema.table(TABLE_NAME); |
| CatalogHashIndexDescriptor pkIndex = (CatalogHashIndexDescriptor) schema.aliveIndex(pkIndexName(TABLE_NAME)); |
| |
| assertNotNull(schema); |
| assertEquals(SCHEMA_NAME, schema.name()); |
| assertSame(schema, manager.activeSchema(clock.nowLong())); |
| |
| assertSame(table, manager.table(TABLE_NAME, clock.nowLong())); |
| assertSame(table, manager.table(table.id(), clock.nowLong())); |
| |
| assertSame(pkIndex, manager.aliveIndex(pkIndexName(TABLE_NAME), clock.nowLong())); |
| assertSame(pkIndex, manager.index(pkIndex.id(), clock.nowLong())); |
| |
| // Validate newly created table |
| assertEquals(TABLE_NAME, table.name()); |
| |
| CatalogZoneDescriptor defaultZone = latestActiveCatalog().defaultZone(); |
| |
| assertEquals(defaultZone.id(), table.zoneId()); |
| |
| // Validate newly created pk index |
| assertEquals(pkIndexName(TABLE_NAME), pkIndex.name()); |
| assertEquals(table.id(), pkIndex.tableId()); |
| assertEquals(table.primaryKeyColumns(), pkIndex.columns()); |
| assertTrue(pkIndex.unique()); |
| assertEquals(AVAILABLE, pkIndex.status()); |
| assertEquals(manager.latestCatalogVersion(), pkIndex.txWaitCatalogVersion()); |
| |
| CatalogTableColumnDescriptor desc = table.columnDescriptor("key1"); |
| assertNotNull(desc); |
| // INT32 key |
| assertThat(desc.precision(), is(DEFAULT_PRECISION)); |
| |
| // Validate another table creation. |
| int secondTableCreationVersion = await(manager.execute(simpleTable(TABLE_NAME_2))); |
| |
| // Validate actual catalog has both tables. |
| schema = manager.schema(secondTableCreationVersion); |
| table = schema.table(TABLE_NAME); |
| pkIndex = (CatalogHashIndexDescriptor) schema.aliveIndex(pkIndexName(TABLE_NAME)); |
| CatalogTableDescriptor table2 = schema.table(TABLE_NAME_2); |
| CatalogHashIndexDescriptor pkIndex2 = (CatalogHashIndexDescriptor) schema.aliveIndex(pkIndexName(TABLE_NAME_2)); |
| |
| assertNotNull(schema); |
| assertEquals(SCHEMA_NAME, schema.name()); |
| assertSame(schema, manager.activeSchema(clock.nowLong())); |
| |
| assertSame(table, manager.table(TABLE_NAME, clock.nowLong())); |
| assertSame(table, manager.table(table.id(), clock.nowLong())); |
| |
| assertSame(pkIndex, manager.aliveIndex(pkIndexName(TABLE_NAME), clock.nowLong())); |
| assertSame(pkIndex, manager.index(pkIndex.id(), clock.nowLong())); |
| |
| assertSame(table2, manager.table(TABLE_NAME_2, clock.nowLong())); |
| assertSame(table2, manager.table(table2.id(), clock.nowLong())); |
| |
| assertSame(pkIndex2, manager.aliveIndex(pkIndexName(TABLE_NAME_2), clock.nowLong())); |
| assertSame(pkIndex2, manager.index(pkIndex2.id(), clock.nowLong())); |
| |
| assertNotSame(table, table2); |
| assertNotSame(pkIndex, pkIndex2); |
| |
| // Try to create another table with same name. |
| assertThat( |
| manager.execute(simpleTable(TABLE_NAME_2)), |
| willThrowFast(CatalogValidationException.class) |
| ); |
| |
| // Validate schema wasn't changed. |
| assertSame(schema, manager.activeSchema(clock.nowLong())); |
| } |
| |
| @Test |
| public void testDropTable() { |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| assertThat(manager.execute(simpleTable(TABLE_NAME_2)), willCompleteSuccessfully()); |
| |
| long beforeDropTimestamp = clock.nowLong(); |
| |
| int tableDropVersion = await(manager.execute(dropTableCommand(TABLE_NAME))); |
| |
| // Validate catalog version from the past. |
| CatalogSchemaDescriptor schema = manager.schema(tableDropVersion - 1); |
| CatalogTableDescriptor table1 = schema.table(TABLE_NAME); |
| CatalogTableDescriptor table2 = schema.table(TABLE_NAME_2); |
| CatalogIndexDescriptor pkIndex1 = schema.aliveIndex(pkIndexName(TABLE_NAME)); |
| CatalogIndexDescriptor pkIndex2 = schema.aliveIndex(pkIndexName(TABLE_NAME_2)); |
| |
| assertNotEquals(table1.id(), table2.id()); |
| assertNotEquals(pkIndex1.id(), pkIndex2.id()); |
| |
| assertNotNull(schema); |
| assertEquals(SCHEMA_NAME, schema.name()); |
| assertSame(schema, manager.activeSchema(beforeDropTimestamp)); |
| |
| assertSame(table1, manager.table(TABLE_NAME, beforeDropTimestamp)); |
| assertSame(table1, manager.table(table1.id(), beforeDropTimestamp)); |
| |
| assertSame(pkIndex1, manager.aliveIndex(pkIndexName(TABLE_NAME), beforeDropTimestamp)); |
| assertSame(pkIndex1, manager.index(pkIndex1.id(), beforeDropTimestamp)); |
| |
| assertSame(table2, manager.table(TABLE_NAME_2, beforeDropTimestamp)); |
| assertSame(table2, manager.table(table2.id(), beforeDropTimestamp)); |
| |
| assertSame(pkIndex2, manager.aliveIndex(pkIndexName(TABLE_NAME_2), beforeDropTimestamp)); |
| assertSame(pkIndex2, manager.index(pkIndex2.id(), beforeDropTimestamp)); |
| |
| // Validate actual catalog |
| schema = manager.schema(tableDropVersion); |
| |
| assertNotNull(schema); |
| assertEquals(SCHEMA_NAME, schema.name()); |
| assertSame(schema, manager.activeSchema(clock.nowLong())); |
| |
| assertNull(schema.table(TABLE_NAME)); |
| assertNull(manager.table(TABLE_NAME, clock.nowLong())); |
| assertNull(manager.table(table1.id(), clock.nowLong())); |
| |
| assertThat(schema.aliveIndex(pkIndexName(TABLE_NAME)), is(nullValue())); |
| assertThat(manager.aliveIndex(pkIndexName(TABLE_NAME), clock.nowLong()), is(nullValue())); |
| assertThat(manager.index(pkIndex1.id(), clock.nowLong()), is(nullValue())); |
| |
| assertSame(table2, manager.table(TABLE_NAME_2, clock.nowLong())); |
| assertSame(table2, manager.table(table2.id(), clock.nowLong())); |
| |
| assertSame(pkIndex2, manager.aliveIndex(pkIndexName(TABLE_NAME_2), clock.nowLong())); |
| assertSame(pkIndex2, manager.index(pkIndex2.id(), clock.nowLong())); |
| |
| // Validate schema wasn't changed. |
| assertSame(schema, manager.activeSchema(clock.nowLong())); |
| } |
| |
| @Test |
| void testReCreateTableWithSameName() { |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| |
| int catalogVersion = manager.latestCatalogVersion(); |
| CatalogTableDescriptor table1 = manager.table(TABLE_NAME, clock.nowLong()); |
| assertNotNull(table1); |
| |
| // Drop table. |
| assertThat(manager.execute(dropTableCommand(TABLE_NAME)), willCompleteSuccessfully()); |
| assertNull(manager.table(TABLE_NAME, clock.nowLong())); |
| |
| // Re-create table with same name. |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| |
| CatalogTableDescriptor table2 = manager.table(TABLE_NAME, clock.nowLong()); |
| assertNotNull(table2); |
| |
| // Ensure these are different tables. |
| assertNotEquals(table1.id(), table2.id()); |
| |
| // Ensure table is available for historical queries. |
| assertNotNull(manager.table(table1.id(), catalogVersion)); |
| } |
| |
| @Test |
| public void testAddColumn() { |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| |
| long beforeAddedTimestamp = clock.nowLong(); |
| |
| assertThat( |
| manager.execute(addColumnParams(TABLE_NAME, |
| columnParamsBuilder(NEW_COLUMN_NAME, STRING, 11, true).defaultValue(constant("Ignite!")).build() |
| )), |
| willCompleteSuccessfully() |
| ); |
| |
| // Validate catalog version from the past. |
| CatalogSchemaDescriptor schema = manager.activeSchema(beforeAddedTimestamp); |
| assertNotNull(schema); |
| assertNotNull(schema.table(TABLE_NAME)); |
| |
| assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME)); |
| |
| // Validate actual catalog |
| schema = manager.activeSchema(clock.nowLong()); |
| assertNotNull(schema); |
| assertNotNull(schema.table(TABLE_NAME)); |
| |
| // Validate column descriptor. |
| CatalogTableColumnDescriptor column = schema.table(TABLE_NAME).column(NEW_COLUMN_NAME); |
| |
| assertEquals(NEW_COLUMN_NAME, column.name()); |
| assertEquals(STRING, column.type()); |
| assertTrue(column.nullable()); |
| |
| assertEquals(DefaultValue.Type.CONSTANT, column.defaultValue().type()); |
| assertEquals("Ignite!", ((DefaultValue.ConstantValue) column.defaultValue()).value()); |
| |
| assertEquals(11, column.length()); |
| assertEquals(DEFAULT_PRECISION, column.precision()); |
| assertEquals(DEFAULT_SCALE, column.scale()); |
| } |
| |
| @Test |
| public void testDropColumn() { |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| |
| long beforeAddedTimestamp = clock.nowLong(); |
| |
| assertThat(manager.execute(dropColumnParams(TABLE_NAME, "VAL")), willCompleteSuccessfully()); |
| |
| // Validate catalog version from the past. |
| CatalogSchemaDescriptor schema = manager.activeSchema(beforeAddedTimestamp); |
| assertNotNull(schema); |
| assertNotNull(schema.table(TABLE_NAME)); |
| |
| assertNotNull(schema.table(TABLE_NAME).column("VAL")); |
| |
| // Validate actual catalog |
| schema = manager.activeSchema(clock.nowLong()); |
| assertNotNull(schema); |
| assertNotNull(schema.table(TABLE_NAME)); |
| |
| assertNull(schema.table(TABLE_NAME).column("VAL")); |
| } |
| |
| @Test |
| public void testAddDropMultipleColumns() { |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| |
| // Add duplicate column. |
| assertThat( |
| manager.execute(addColumnParams(TABLE_NAME, columnParams(NEW_COLUMN_NAME, INT32, true), columnParams("VAL", INT32, true))), |
| willThrow(CatalogValidationException.class) |
| ); |
| |
| // Validate no column added. |
| CatalogSchemaDescriptor schema = manager.activeSchema(clock.nowLong()); |
| |
| assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME)); |
| |
| // Add multiple columns. |
| assertThat( |
| manager.execute(addColumnParams(TABLE_NAME, |
| columnParams(NEW_COLUMN_NAME, INT32, true), columnParams(NEW_COLUMN_NAME_2, INT32, true) |
| )), |
| willCompleteSuccessfully() |
| ); |
| |
| // Validate both columns added. |
| schema = manager.activeSchema(clock.nowLong()); |
| |
| assertNotNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME)); |
| assertNotNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME_2)); |
| |
| // Drop multiple columns. |
| assertThat(manager.execute(dropColumnParams(TABLE_NAME, NEW_COLUMN_NAME, NEW_COLUMN_NAME_2)), willCompleteSuccessfully()); |
| |
| // Validate both columns dropped. |
| schema = manager.activeSchema(clock.nowLong()); |
| |
| assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME)); |
| assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME_2)); |
| } |
| |
| /** |
| * Checks for possible changes to the default value of a column descriptor. |
| * |
| * <p>Set/drop default value allowed for any column. |
| */ |
| @Test |
| public void testAlterColumnDefault() { |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| |
| int schemaVer = manager.latestCatalogVersion(); |
| assertNotNull(manager.schema(schemaVer)); |
| assertNull(manager.schema(schemaVer + 1)); |
| |
| // NULL-> NULL : No-op. |
| assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () -> constant(null)), |
| willCompleteSuccessfully()); |
| assertNull(manager.schema(schemaVer + 1)); |
| |
| // NULL -> 1 : Ok. |
| assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () -> constant(1)), |
| willCompleteSuccessfully()); |
| assertNotNull(manager.schema(++schemaVer)); |
| |
| // 1 -> 1 : No-op. |
| assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () -> constant(1)), |
| willCompleteSuccessfully()); |
| assertNull(manager.schema(schemaVer + 1)); |
| |
| // 1 -> 2 : Ok. |
| assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () -> constant(2)), |
| willCompleteSuccessfully()); |
| assertNotNull(manager.schema(++schemaVer)); |
| |
| // 2 -> NULL : Ok. |
| assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () -> constant(null)), |
| willCompleteSuccessfully()); |
| assertNotNull(manager.schema(++schemaVer)); |
| } |
| |
| /** |
| * Checks for possible changes of the nullable flag of a column descriptor. |
| * |
| * <ul> |
| * <li>{@code DROP NOT NULL} is allowed on any non-PK column. |
| * <li>{@code SET NOT NULL} is forbidden. |
| * </ul> |
| */ |
| @Test |
| public void testAlterColumnNotNull() { |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| |
| int schemaVer = manager.latestCatalogVersion(); |
| assertNotNull(manager.schema(schemaVer)); |
| assertNull(manager.schema(schemaVer + 1)); |
| |
| // NULLABLE -> NULLABLE : No-op. |
| // NOT NULL -> NOT NULL : No-op. |
| assertThat(changeColumn(TABLE_NAME, "VAL", null, false, null), willCompleteSuccessfully()); |
| assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", null, true, null), willCompleteSuccessfully()); |
| assertNull(manager.schema(schemaVer + 1)); |
| |
| // NOT NULL -> NULlABLE : Ok. |
| assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", null, false, null), willCompleteSuccessfully()); |
| assertNotNull(manager.schema(++schemaVer)); |
| |
| // DROP NOT NULL for PK : PK column can't be `null`. |
| assertThat(changeColumn(TABLE_NAME, "ID", null, false, null), |
| willThrowFast(CatalogValidationException.class, "Dropping NOT NULL constraint on key column is not allowed")); |
| |
| // NULlABLE -> NOT NULL : Forbidden because this change lead to incompatible schemas. |
| assertThat(changeColumn(TABLE_NAME, "VAL", null, true, null), |
| willThrowFast(CatalogValidationException.class, "Adding NOT NULL constraint is not allowed")); |
| assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", null, true, null), |
| willThrowFast(CatalogValidationException.class, "Adding NOT NULL constraint is not allowed")); |
| |
| assertNull(manager.schema(schemaVer + 1)); |
| } |
| |
| /** |
| * Checks for possible changes of the precision of a column descriptor. |
| * |
| * <ul> |
| * <li>Increasing precision is allowed for non-PK {@link ColumnType#DECIMAL} column.</li> |
| * <li>Decreasing precision is forbidden.</li> |
| * </ul> |
| */ |
| @Test |
| public void testAlterColumnTypePrecision() { |
| ColumnParams pkCol = columnParams("ID", INT32); |
| ColumnParams col1 = columnParamsBuilder("COL_DECIMAL1", DECIMAL).precision(DFLT_TEST_PRECISION - 1).scale(1).build(); |
| ColumnParams col2 = columnParamsBuilder("COL_DECIMAL2", DECIMAL).precision(DFLT_TEST_PRECISION).scale(1).build(); |
| |
| assertThat(manager.execute(simpleTable(TABLE_NAME, List.of(pkCol, col1, col2))), willCompleteSuccessfully()); |
| |
| int schemaVer = manager.latestCatalogVersion(); |
| assertNotNull(manager.schema(schemaVer)); |
| assertNull(manager.schema(schemaVer + 1)); |
| |
| // precision increment : Ok. |
| assertThat( |
| changeColumn(TABLE_NAME, col1.name(), new TestColumnTypeParams(col1.type(), DFLT_TEST_PRECISION, null, null), null, null), |
| willCompleteSuccessfully() |
| ); |
| assertNotNull(manager.schema(++schemaVer)); |
| |
| assertThat( |
| changeColumn(TABLE_NAME, col2.name(), new TestColumnTypeParams(col2.type(), DFLT_TEST_PRECISION, null, null), null, null), |
| willCompleteSuccessfully() |
| ); |
| assertNull(manager.schema(schemaVer + 1)); |
| |
| // No change. |
| assertThat( |
| changeColumn(TABLE_NAME, col1.name(), new TestColumnTypeParams(col1.type(), DFLT_TEST_PRECISION, null, null), null, null), |
| willCompleteSuccessfully() |
| ); |
| assertNull(manager.schema(schemaVer + 1)); |
| |
| // precision decrement : Forbidden because this change lead to incompatible schemas. |
| assertThat( |
| changeColumn(TABLE_NAME, col1.name(), |
| new TestColumnTypeParams(col1.type(), DFLT_TEST_PRECISION - 1, null, null), null, null), |
| willThrowFast(CatalogValidationException.class, "Decreasing the precision for column of type '" |
| + col1.type() + "' is not allowed") |
| ); |
| assertNull(manager.schema(schemaVer + 1)); |
| |
| assertThat( |
| changeColumn(TABLE_NAME, col2.name(), |
| new TestColumnTypeParams(col2.type(), DFLT_TEST_PRECISION - 1, null, null), null, null), |
| willThrowFast(CatalogValidationException.class, "Decreasing the precision for column of type '" |
| + col1.type() + "' is not allowed") |
| ); |
| assertNull(manager.schema(schemaVer + 1)); |
| } |
| |
| /** |
| * Changing precision is not supported for all types other than DECIMAL. |
| */ |
| @ParameterizedTest |
| @EnumSource(value = ColumnType.class, names = {"NULL", "DECIMAL"}, mode = Mode.EXCLUDE) |
| public void testAlterColumnTypeAnyPrecisionChangeIsRejected(ColumnType type) { |
| ColumnParams pkCol = columnParams("ID", INT32); |
| ColumnParams colWithPrecision; |
| Builder colWithPrecisionBuilder = columnParamsBuilder("COL_PRECISION", type).precision(3); |
| applyNecessaryLength(type, colWithPrecisionBuilder); |
| |
| if (type.scaleAllowed()) { |
| colWithPrecisionBuilder.scale(0); |
| } |
| |
| if (!type.precisionAllowed() && !type.scaleAllowed()) { |
| assertThrowsWithCause(colWithPrecisionBuilder::build, CatalogValidationException.class); |
| return; |
| } |
| |
| colWithPrecision = colWithPrecisionBuilder.build(); |
| |
| assertThat(manager.execute( |
| simpleTable(TABLE_NAME, List.of(pkCol, colWithPrecision))), willCompleteSuccessfully() |
| ); |
| |
| int schemaVer = manager.latestCatalogVersion(); |
| assertNotNull(manager.schema(schemaVer)); |
| assertNull(manager.schema(schemaVer + 1)); |
| |
| int origPrecision = colWithPrecision.precision() == null ? 11 : colWithPrecision.precision(); |
| |
| // change precision different from default |
| assertThat(changeColumn(TABLE_NAME, colWithPrecision.name(), |
| new TestColumnTypeParams(type, origPrecision - 1, null, null), null, null), |
| willThrowFast(CatalogValidationException.class, |
| "Changing the precision for column of type '" + colWithPrecision.type() + "' is not allowed")); |
| |
| assertThat(changeColumn(TABLE_NAME, colWithPrecision.name(), |
| new TestColumnTypeParams(type, origPrecision + 1, null, null), null, null), |
| willThrowFast(CatalogValidationException.class, |
| "Changing the precision for column of type '" + colWithPrecision.type() + "' is not allowed")); |
| |
| assertNull(manager.schema(schemaVer + 1)); |
| } |
| |
| /** |
| * Checks for possible changes of the length of a column descriptor. |
| * |
| * <ul> |
| * <li>Increasing length is allowed for non-PK {@link ColumnType#STRING} and {@link ColumnType#BYTE_ARRAY} column.</li> |
| * <li>Decreasing length is forbidden.</li> |
| * </ul> |
| */ |
| @ParameterizedTest |
| @EnumSource(value = ColumnType.class, names = {"STRING", "BYTE_ARRAY"}, mode = Mode.INCLUDE) |
| public void testAlterColumnTypeLength(ColumnType type) { |
| ColumnParams pkCol = columnParams("ID", INT32); |
| ColumnParams col = columnParamsBuilder("COL_" + type, type).length(10).build(); |
| |
| assertThat(manager.execute(simpleTable(TABLE_NAME, List.of(pkCol, col))), willCompleteSuccessfully()); |
| |
| int schemaVer = manager.latestCatalogVersion(); |
| assertNotNull(manager.schema(schemaVer)); |
| assertNull(manager.schema(schemaVer + 1)); |
| |
| // 10 -> 11 : Ok. |
| assertThat( |
| changeColumn(TABLE_NAME, col.name(), new TestColumnTypeParams(col.type(), null, 11, null), null, null), |
| willCompleteSuccessfully() |
| ); |
| |
| CatalogSchemaDescriptor schema = manager.schema(++schemaVer); |
| assertNotNull(schema); |
| |
| // 11 -> 10 : Error. |
| assertThat( |
| changeColumn(TABLE_NAME, col.name(), new TestColumnTypeParams(col.type(), null, 10, null), null, null), |
| willThrowFast(CatalogValidationException.class, "Decreasing the length for column of type '" |
| + col.type() + "' is not allowed") |
| ); |
| assertNull(manager.schema(schemaVer + 1)); |
| |
| // 11 -> 11 : No-op. |
| assertThat( |
| changeColumn(TABLE_NAME, col.name(), new TestColumnTypeParams(col.type(), null, 11, null), null, null), |
| willCompleteSuccessfully() |
| ); |
| assertNull(manager.schema(schemaVer + 1)); |
| |
| // No change. |
| assertThat(changeColumn(TABLE_NAME, col.name(), new TestColumnTypeParams(col.type()), null, null), |
| willCompleteSuccessfully()); |
| assertNull(manager.schema(schemaVer + 1)); |
| |
| // 11 -> 10 : failed. |
| assertThat( |
| changeColumn(TABLE_NAME, col.name(), new TestColumnTypeParams(col.type(), null, 10, null), null, null), |
| willThrowFast(CatalogValidationException.class) |
| ); |
| assertNull(manager.schema(schemaVer + 1)); |
| } |
| |
| /** |
| * Changing length is forbidden for all types other than STRING and BYTE_ARRAY. |
| */ |
| @ParameterizedTest |
| @EnumSource(value = ColumnType.class, names = {"STRING", "BYTE_ARRAY", "NULL"}, mode = Mode.EXCLUDE) |
| public void testAlterColumnTypeAnyLengthChangeIsRejected(ColumnType type) { |
| ColumnParams pkCol = columnParams("ID", INT32); |
| Builder colBuilder = columnParamsBuilder("COL", type); |
| Builder colWithLengthBuilder = columnParamsBuilder("COL_PRECISION", type).length(10); |
| |
| applyNecessaryLength(type, colWithLengthBuilder); |
| |
| initializeColumnWithDefaults(type, colBuilder); |
| initializeColumnWithDefaults(type, colWithLengthBuilder); |
| |
| ColumnParams col = colBuilder.build(); |
| |
| if (!type.lengthAllowed()) { |
| assertThrowsWithCause(colWithLengthBuilder::build, CatalogValidationException.class); |
| return; |
| } |
| |
| ColumnParams colWithLength = colWithLengthBuilder.build(); |
| |
| assertThat( |
| manager.execute(simpleTable(TABLE_NAME, List.of(pkCol, col, colWithLength))), |
| willCompleteSuccessfully() |
| ); |
| |
| int schemaVer = manager.latestCatalogVersion(); |
| assertNotNull(manager.schema(schemaVer)); |
| assertNull(manager.schema(schemaVer + 1)); |
| |
| assertThat(changeColumn(TABLE_NAME, col.name(), new TestColumnTypeParams(type, null, 1 << 6, null), null, null), |
| willThrowFast(CatalogValidationException.class, |
| "Changing the length for column of type '" + col.type() + "' is not allowed")); |
| |
| assertThat(changeColumn(TABLE_NAME, colWithLength.name(), new TestColumnTypeParams(type, null, 1 << 5, null), null, null), |
| willCompleteSuccessfully()); |
| |
| assertThat(changeColumn(TABLE_NAME, colWithLength.name(), new TestColumnTypeParams(type, null, 9, null), null, null), |
| willThrowFast(CatalogValidationException.class, |
| "Changing the length for column of type '" + colWithLength.type() + "' is not allowed")); |
| |
| assertThat(changeColumn(TABLE_NAME, colWithLength.name(), new TestColumnTypeParams(type, null, 11, null), null, null), |
| willThrowFast(CatalogValidationException.class, |
| "Changing the length for column of type '" + colWithLength.type() + "' is not allowed")); |
| |
| assertNull(manager.schema(schemaVer + 1)); |
| } |
| |
| /** |
| * Changing scale is incompatible change, thus it's forbidden for all types. |
| */ |
| @ParameterizedTest |
| @EnumSource(ColumnType.class) |
| public void testAlterColumnTypeScaleIsRejected(ColumnType type) { |
| ColumnParams pkCol = columnParams("ID", INT32); |
| Builder colWithPrecisionBuilder = columnParamsBuilder("COL_" + type, type).scale(3); |
| ColumnParams col; |
| |
| applyNecessaryPrecision(type, colWithPrecisionBuilder); |
| applyNecessaryLength(type, colWithPrecisionBuilder); |
| |
| if (!type.scaleAllowed()) { |
| assertThrowsWithCause(colWithPrecisionBuilder::build, CatalogValidationException.class); |
| return; |
| } |
| |
| col = colWithPrecisionBuilder.build(); |
| |
| assertThat(manager.execute(simpleTable(TABLE_NAME, List.of(pkCol, col))), willCompleteSuccessfully()); |
| |
| int schemaVer = manager.latestCatalogVersion(); |
| assertNotNull(manager.schema(schemaVer)); |
| assertNull(manager.schema(schemaVer + 1)); |
| |
| // ANY-> UNDEFINED SCALE : No-op. |
| assertThat(changeColumn(TABLE_NAME, col.name(), new TestColumnTypeParams(col.type()), null, null), |
| willCompleteSuccessfully()); |
| assertNull(manager.schema(schemaVer + 1)); |
| |
| // 3 -> 3 : No-op. |
| assertThat(changeColumn(TABLE_NAME, col.name(), new TestColumnTypeParams(col.type(), null, null, 3), null, null), |
| willCompleteSuccessfully()); |
| assertNull(manager.schema(schemaVer + 1)); |
| |
| // 3 -> 4 : Error. |
| assertThat(changeColumn(TABLE_NAME, col.name(), new TestColumnTypeParams(col.type(), null, null, 4), null, null), |
| willThrowFast(CatalogValidationException.class, "Changing the scale for column of type")); |
| assertNull(manager.schema(schemaVer + 1)); |
| |
| // 3 -> 2 : Error. |
| assertThat(changeColumn(TABLE_NAME, col.name(), new TestColumnTypeParams(col.type(), null, null, 2), null, null), |
| willThrowFast(CatalogValidationException.class, "Changing the scale for column of type")); |
| assertNull(manager.schema(schemaVer + 1)); |
| } |
| |
| /** |
| * Checks for possible changes of the type of a column descriptor. |
| * |
| * <p>The following transitions are allowed for non-PK columns: |
| * <ul> |
| * <li>INT8 -> INT16 -> INT32 -> INT64</li> |
| * <li>FLOAT -> DOUBLE</li> |
| * </ul> |
| * All other transitions are forbidden because they lead to incompatible schemas. |
| */ |
| @ParameterizedTest(name = "set data type {0}") |
| @EnumSource(value = ColumnType.class, names = "NULL", mode = Mode.EXCLUDE) |
| public void testAlterColumnType(ColumnType target) { |
| EnumSet<ColumnType> types = EnumSet.allOf(ColumnType.class); |
| types.remove(NULL); |
| |
| List<ColumnParams> testColumns = types.stream() |
| .map(t -> initializeColumnWithDefaults(t, columnParamsBuilder("COL_" + t, t))) |
| .map(Builder::build) |
| .collect(toList()); |
| |
| List<ColumnParams> tableColumns = new ArrayList<>(List.of(columnParams("ID", INT32))); |
| tableColumns.addAll(testColumns); |
| |
| assertThat(manager.execute(simpleTable(TABLE_NAME, tableColumns)), willCompleteSuccessfully()); |
| |
| int schemaVer = manager.latestCatalogVersion(); |
| assertNotNull(manager.schema(schemaVer)); |
| assertNull(manager.schema(schemaVer + 1)); |
| |
| for (ColumnParams col : testColumns) { |
| TypeSafeMatcher<CompletableFuture<?>> matcher; |
| boolean sameType = col.type() == target; |
| |
| if (sameType || CatalogUtils.isSupportedColumnTypeChange(col.type(), target)) { |
| matcher = willCompleteSuccessfully(); |
| schemaVer += sameType ? 0 : 1; |
| } else { |
| matcher = willThrowFast(CatalogValidationException.class, |
| format("Changing the type from {} to {} is not allowed", col.type(), target)); |
| } |
| |
| TestColumnTypeParams typeParams = new TestColumnTypeParams(target); |
| |
| assertThat(col.type() + " -> " + target, changeColumn(TABLE_NAME, col.name(), typeParams, null, null), matcher); |
| assertNotNull(manager.schema(schemaVer)); |
| assertNull(manager.schema(schemaVer + 1)); |
| } |
| } |
| |
| @Test |
| public void testAlterColumnTypeRejectedForPrimaryKey() { |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| |
| assertThat(changeColumn(TABLE_NAME, "ID", new TestColumnTypeParams(INT64), null, null), |
| willThrowFast(CatalogValidationException.class, "Changing the type of key column is not allowed")); |
| } |
| |
| /** |
| * Ensures that the compound change command {@code SET DATA TYPE BIGINT NULL DEFAULT NULL} will change the type, drop NOT NULL and the |
| * default value at the same time. |
| */ |
| @Test |
| public void testAlterColumnMultipleChanges() { |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| |
| int schemaVer = manager.latestCatalogVersion(); |
| assertNotNull(manager.schema(schemaVer)); |
| assertNull(manager.schema(schemaVer + 1)); |
| |
| Supplier<DefaultValue> dflt = () -> constant(null); |
| boolean notNull = false; |
| TestColumnTypeParams typeParams = new TestColumnTypeParams(INT64); |
| |
| // Ensures that 3 different actions applied. |
| assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", typeParams, notNull, dflt), willCompleteSuccessfully()); |
| |
| CatalogSchemaDescriptor schema = manager.schema(++schemaVer); |
| assertNotNull(schema); |
| |
| CatalogTableColumnDescriptor desc = schema.table(TABLE_NAME).column("VAL_NOT_NULL"); |
| assertEquals(constant(null), desc.defaultValue()); |
| assertTrue(desc.nullable()); |
| assertEquals(INT64, desc.type()); |
| |
| // Ensures that only one of three actions applied. |
| dflt = () -> constant(2); |
| assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", typeParams, notNull, dflt), willCompleteSuccessfully()); |
| |
| schema = manager.schema(++schemaVer); |
| assertNotNull(schema); |
| assertEquals(constant(2), schema.table(TABLE_NAME).column("VAL_NOT_NULL").defaultValue()); |
| |
| // Ensures that no action will be applied. |
| assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", typeParams, notNull, dflt), willCompleteSuccessfully()); |
| assertNull(manager.schema(schemaVer + 1)); |
| } |
| |
| @Test |
| public void testAlterColumnForNonExistingTableRejected() { |
| int versionBefore = manager.latestCatalogVersion(); |
| |
| assertThat(changeColumn(TABLE_NAME, "ID", null, null, null), willThrowFast(TableNotFoundValidationException.class)); |
| |
| int versionAfter = manager.latestCatalogVersion(); |
| |
| assertEquals(versionBefore, versionAfter); |
| } |
| |
| @Test |
| public void testDropTableWithIndex() { |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| assertThat(manager.execute(simpleIndex(TABLE_NAME, INDEX_NAME)), willCompleteSuccessfully()); |
| startBuildingIndex(indexId(INDEX_NAME)); |
| makeIndexAvailable(indexId(INDEX_NAME)); |
| |
| long beforeDropTimestamp = clock.nowLong(); |
| int beforeDropVersion = manager.latestCatalogVersion(); |
| |
| assertThat(manager.execute(dropTableCommand(TABLE_NAME)), willCompleteSuccessfully()); |
| |
| // Validate catalog version from the past. |
| CatalogSchemaDescriptor schema = manager.schema(beforeDropVersion); |
| CatalogTableDescriptor table = schema.table(TABLE_NAME); |
| CatalogIndexDescriptor index = schema.aliveIndex(INDEX_NAME); |
| |
| assertNotNull(schema); |
| assertEquals(SCHEMA_NAME, schema.name()); |
| assertSame(schema, manager.activeSchema(beforeDropTimestamp)); |
| |
| assertSame(table, manager.table(TABLE_NAME, beforeDropTimestamp)); |
| assertSame(table, manager.table(table.id(), beforeDropTimestamp)); |
| |
| assertSame(index, manager.aliveIndex(INDEX_NAME, beforeDropTimestamp)); |
| assertSame(index, manager.index(index.id(), beforeDropTimestamp)); |
| |
| // Validate actual catalog |
| schema = manager.schema(manager.latestCatalogVersion()); |
| |
| assertNotNull(schema); |
| assertEquals(SCHEMA_NAME, schema.name()); |
| assertSame(schema, manager.activeSchema(clock.nowLong())); |
| |
| assertNull(schema.table(TABLE_NAME)); |
| assertNull(manager.table(TABLE_NAME, clock.nowLong())); |
| assertNull(manager.table(table.id(), clock.nowLong())); |
| |
| assertThat(schema.aliveIndex(INDEX_NAME), is(nullValue())); |
| assertThat(manager.aliveIndex(INDEX_NAME, clock.nowLong()), is(nullValue())); |
| assertThat(manager.index(index.id(), clock.nowLong()), is(nullValue())); |
| } |
| |
| @Test |
| public void testCreateHashIndex() { |
| int tableCreationVersion = await(manager.execute(simpleTable(TABLE_NAME))); |
| |
| int indexCreationVersion = await(manager.execute(createHashIndexCommand(INDEX_NAME, List.of("VAL", "ID")))); |
| |
| // Validate catalog version from the past. |
| CatalogSchemaDescriptor schema = manager.schema(tableCreationVersion); |
| |
| assertNotNull(schema); |
| assertNull(schema.aliveIndex(INDEX_NAME)); |
| assertNull(manager.aliveIndex(INDEX_NAME, 123L)); |
| |
| // Validate actual catalog |
| schema = manager.schema(indexCreationVersion); |
| |
| CatalogHashIndexDescriptor index = (CatalogHashIndexDescriptor) schema.aliveIndex(INDEX_NAME); |
| |
| assertNotNull(schema); |
| assertSame(index, manager.aliveIndex(INDEX_NAME, clock.nowLong())); |
| assertSame(index, manager.index(index.id(), clock.nowLong())); |
| |
| // Validate newly created hash index |
| assertEquals(INDEX_NAME, index.name()); |
| assertEquals(schema.table(TABLE_NAME).id(), index.tableId()); |
| assertEquals(List.of("VAL", "ID"), index.columns()); |
| assertFalse(index.unique()); |
| assertEquals(REGISTERED, index.status()); |
| assertEquals(manager.latestCatalogVersion(), index.txWaitCatalogVersion()); |
| } |
| |
| @Test |
| public void testCreateSortedIndex() { |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| |
| CatalogCommand command = createSortedIndexCommand( |
| INDEX_NAME, |
| true, |
| List.of("VAL", "ID"), |
| List.of(DESC_NULLS_FIRST, ASC_NULLS_LAST) |
| ); |
| |
| int indexCreationVersion = await(manager.execute(command)); |
| |
| // Validate catalog version from the past. |
| CatalogSchemaDescriptor schema = manager.schema(indexCreationVersion - 1); |
| |
| assertNotNull(schema); |
| assertNull(schema.aliveIndex(INDEX_NAME)); |
| assertNull(manager.aliveIndex(INDEX_NAME, 123L)); |
| assertNull(manager.index(4, 123L)); |
| |
| // Validate actual catalog |
| schema = manager.schema(indexCreationVersion); |
| |
| CatalogSortedIndexDescriptor index = (CatalogSortedIndexDescriptor) schema.aliveIndex(INDEX_NAME); |
| |
| assertNotNull(schema); |
| assertSame(index, manager.aliveIndex(INDEX_NAME, clock.nowLong())); |
| assertSame(index, manager.index(index.id(), clock.nowLong())); |
| |
| // Validate newly created sorted index |
| assertEquals(INDEX_NAME, index.name()); |
| assertEquals(schema.table(TABLE_NAME).id(), index.tableId()); |
| assertEquals("VAL", index.columns().get(0).name()); |
| assertEquals("ID", index.columns().get(1).name()); |
| assertEquals(DESC_NULLS_FIRST, index.columns().get(0).collation()); |
| assertEquals(ASC_NULLS_LAST, index.columns().get(1).collation()); |
| assertTrue(index.unique()); |
| assertEquals(REGISTERED, index.status()); |
| assertEquals(manager.latestCatalogVersion(), index.txWaitCatalogVersion()); |
| } |
| |
| @Test |
| public void operationWillBeRetriedFiniteAmountOfTimes() { |
| UpdateLog updateLogMock = mock(UpdateLog.class); |
| |
| ArgumentCaptor<OnUpdateHandler> updateHandlerCapture = ArgumentCaptor.forClass(OnUpdateHandler.class); |
| |
| doNothing().when(updateLogMock).registerUpdateHandler(updateHandlerCapture.capture()); |
| when(updateLogMock.startAsync()).thenReturn(nullCompletedFuture()); |
| when(updateLogMock.append(any())).thenReturn(CompletableFuture.completedFuture(true)); |
| |
| CatalogManagerImpl manager = new CatalogManagerImpl(updateLogMock, clockService); |
| assertThat(manager.startAsync(), willCompleteSuccessfully()); |
| |
| reset(updateLogMock); |
| |
| when(updateLogMock.append(any())).thenAnswer(invocation -> { |
| // here we emulate concurrent updates. First of all, we return a future completed with "false" |
| // as if someone has concurrently appended an update. Besides, in order to unblock manager and allow to |
| // make another attempt, we must notify manager with the same version as in current attempt. |
| VersionedUpdate updateFromInvocation = invocation.getArgument(0, VersionedUpdate.class); |
| |
| VersionedUpdate update = new VersionedUpdate( |
| updateFromInvocation.version(), |
| updateFromInvocation.delayDurationMs(), |
| List.of(new ObjectIdGenUpdateEntry(1)) |
| ); |
| |
| updateHandlerCapture.getValue().handle(update, clock.now(), 0); |
| |
| return falseCompletedFuture(); |
| }); |
| |
| // It should not matter what a command does |
| CatalogCommand catalogCommand = catalog -> List.of(new ObjectIdGenUpdateEntry(1)); |
| |
| CompletableFuture<?> fut = manager.execute(List.of(catalogCommand)); |
| |
| assertThat(fut, willThrow(IgniteInternalException.class, "Max retry limit exceeded")); |
| |
| // retry limit is hardcoded at org.apache.ignite.internal.catalog.CatalogServiceImpl.MAX_RETRY_COUNT |
| verify(updateLogMock, times(10)).append(any()); |
| } |
| |
| @Test |
| public void catalogActivationTime() { |
| delayDuration.set(TimeUnit.DAYS.toMillis(365)); |
| reset(updateLog, clockWaiter); |
| |
| CompletableFuture<Integer> createTableFuture = manager.execute(simpleTable(TABLE_NAME)); |
| |
| assertFalse(createTableFuture.isDone()); |
| |
| verify(updateLog).append(any()); |
| // TODO IGNITE-19400: recheck createTable future completion guarantees |
| |
| // This waits till the new Catalog version lands in the internal structures. |
| verify(clockWaiter, timeout(10_000)).waitFor(any()); |
| |
| int latestVersion = manager.latestCatalogVersion(); |
| |
| assertSame(manager.schema(latestVersion - 1), manager.activeSchema(clock.nowLong())); |
| assertNull(manager.table(TABLE_NAME, clock.nowLong())); |
| |
| clock.update(clock.now().addPhysicalTime(delayDuration.get())); |
| |
| assertSame(manager.schema(latestVersion), manager.activeSchema(clock.nowLong())); |
| assertNotNull(manager.table(TABLE_NAME, clock.nowLong())); |
| } |
| |
| @Test |
| public void createTableIfNotExistWaitsActivationEvenIfTableExists() throws Exception { |
| delayDuration.set(TimeUnit.DAYS.toMillis(365)); |
| partitionIdleSafeTimePropagationPeriod.set(0); |
| reset(updateLog); |
| |
| CatalogCommand createTableCommand = spy(simpleTable(TABLE_NAME)); |
| |
| CompletableFuture<Integer> createTableFuture1 = manager.execute(createTableCommand); |
| |
| assertFalse(createTableFuture1.isDone()); |
| |
| ArgumentCaptor<VersionedUpdate> appendCapture = ArgumentCaptor.forClass(VersionedUpdate.class); |
| |
| verify(updateLog).append(appendCapture.capture()); |
| |
| int catalogVerAfterTableCreate = appendCapture.getValue().version(); |
| |
| CompletableFuture<Integer> createTableFuture2 = manager.execute(createTableCommand); |
| |
| verify(createTableCommand, times(2)).get(any()); |
| |
| assertFalse(createTableFuture2.isDone()); |
| |
| verify(clockWaiter, timeout(10_000).times(3)).waitFor(any()); |
| |
| Catalog catalog0 = manager.catalog(manager.latestCatalogVersion()); |
| |
| assertNotNull(catalog0); |
| |
| HybridTimestamp activationSkew = CatalogUtils.clusterWideEnsuredActivationTsSafeForRoReads( |
| catalog0, |
| () -> partitionIdleSafeTimePropagationPeriod.get(), clockService.maxClockSkewMillis()); |
| |
| clock.update(activationSkew); |
| |
| assertTrue(waitForCondition(createTableFuture1::isDone, 2_000)); |
| assertTrue(waitForCondition(createTableFuture2::isDone, 2_000)); |
| |
| assertSame(manager.schema(catalogVerAfterTableCreate), manager.activeSchema(clock.nowLong())); |
| } |
| |
| @Test |
| public void catalogServiceManagesUpdateLogLifecycle() { |
| UpdateLog updateLogMock = mock(UpdateLog.class); |
| when(updateLogMock.startAsync()).thenReturn(nullCompletedFuture()); |
| when(updateLogMock.stopAsync()).thenReturn(nullCompletedFuture()); |
| when(updateLogMock.append(any())).thenReturn(CompletableFuture.completedFuture(true)); |
| |
| CatalogManagerImpl manager = new CatalogManagerImpl(updateLogMock, clockService); |
| |
| assertThat(manager.startAsync(), willCompleteSuccessfully()); |
| |
| verify(updateLogMock).startAsync(); |
| |
| assertThat(manager.stopAsync(), willCompleteSuccessfully()); |
| |
| verify(updateLogMock).stopAsync(); |
| } |
| |
| @Test |
| public void testTableEvents() { |
| EventListener<CatalogEventParameters> eventListener = mock(EventListener.class); |
| when(eventListener.notify(any())).thenReturn(falseCompletedFuture()); |
| |
| manager.listen(CatalogEvent.TABLE_CREATE, eventListener); |
| manager.listen(CatalogEvent.TABLE_DROP, eventListener); |
| |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| assertThat(manager.execute(simpleTable(TABLE_NAME_2)), willCompleteSuccessfully()); |
| assertThat(manager.execute(simpleTable(TABLE_NAME_3)), willCompleteSuccessfully()); |
| verify(eventListener, times(3)).notify(any(CreateTableEventParameters.class)); |
| |
| assertThat(manager.execute(dropTableCommand(TABLE_NAME)), willCompleteSuccessfully()); |
| assertThat(manager.execute(dropTableCommand(TABLE_NAME_2)), willCompleteSuccessfully()); |
| verify(eventListener, times(2)).notify(any(DropTableEventParameters.class)); |
| |
| verifyNoMoreInteractions(eventListener); |
| clearInvocations(eventListener); |
| } |
| |
| @Test |
| public void testIndexEvents() { |
| CatalogCommand createIndexCmd = createHashIndexCommand(INDEX_NAME, List.of("ID")); |
| |
| CatalogCommand dropIndexCmd = DropIndexCommand.builder().schemaName(SCHEMA_NAME).indexName(INDEX_NAME).build(); |
| |
| EventListener<CatalogEventParameters> eventListener = mock(EventListener.class); |
| when(eventListener.notify(any())).thenReturn(falseCompletedFuture()); |
| |
| manager.listen(CatalogEvent.INDEX_CREATE, eventListener); |
| manager.listen(CatalogEvent.INDEX_BUILDING, eventListener); |
| manager.listen(CatalogEvent.INDEX_AVAILABLE, eventListener); |
| manager.listen(CatalogEvent.INDEX_STOPPING, eventListener); |
| manager.listen(CatalogEvent.INDEX_REMOVED, eventListener); |
| |
| // Try to create index without table. |
| assertThat(manager.execute(createIndexCmd), willThrow(TableNotFoundValidationException.class)); |
| verifyNoInteractions(eventListener); |
| |
| // Create table with PK index. |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| verify(eventListener).notify(any(CreateIndexEventParameters.class)); |
| verify(eventListener).notify(any(MakeIndexAvailableEventParameters.class)); |
| |
| verifyNoMoreInteractions(eventListener); |
| clearInvocations(eventListener); |
| |
| // Create index. |
| assertThat(manager.execute(createIndexCmd), willCompleteSuccessfully()); |
| verify(eventListener).notify(any(CreateIndexEventParameters.class)); |
| |
| int indexId = indexId(INDEX_NAME); |
| |
| startBuildingIndex(indexId); |
| verify(eventListener).notify(any(StartBuildingIndexEventParameters.class)); |
| |
| makeIndexAvailable(indexId); |
| verify(eventListener).notify(any(MakeIndexAvailableEventParameters.class)); |
| |
| verifyNoMoreInteractions(eventListener); |
| clearInvocations(eventListener); |
| |
| // Drop index. |
| assertThat(manager.execute(dropIndexCmd), willCompleteSuccessfully()); |
| verify(eventListener).notify(any(StoppingIndexEventParameters.class)); |
| |
| // Remove index. |
| removeIndex(indexId); |
| verify(eventListener).notify(any(RemoveIndexEventParameters.class)); |
| |
| verifyNoMoreInteractions(eventListener); |
| clearInvocations(eventListener); |
| |
| // Drop table with pk index. |
| assertThat(manager.execute(dropTableCommand(TABLE_NAME)), willCompleteSuccessfully()); |
| |
| // Try drop index once again. |
| assertThat(manager.execute(dropIndexCmd), willThrow(IndexNotFoundValidationException.class)); |
| |
| verify(eventListener).notify(any(RemoveIndexEventParameters.class)); |
| verifyNoMoreInteractions(eventListener); |
| clearInvocations(eventListener); |
| } |
| |
| @Test |
| public void testCreateZone() { |
| String zoneName = TEST_ZONE_NAME; |
| |
| CatalogCommand cmd = CreateZoneCommand.builder() |
| .zoneName(zoneName) |
| .partitions(42) |
| .replicas(15) |
| .dataNodesAutoAdjust(73) |
| .filter("expression") |
| .storageProfilesParams(List.of(StorageProfileParams.builder().storageProfile("test_profile").build())) |
| .build(); |
| |
| assertThat(manager.execute(cmd), willCompleteSuccessfully()); |
| |
| // Validate catalog version from the past. |
| assertNull(manager.zone(zoneName, 0)); |
| assertNull(manager.zone(zoneName, 123L)); |
| |
| // Validate actual catalog |
| CatalogZoneDescriptor zone = manager.zone(zoneName, clock.nowLong()); |
| |
| assertNotNull(zone); |
| assertSame(zone, manager.zone(zone.id(), clock.nowLong())); |
| |
| // Validate that catalog returns null for previous timestamps. |
| assertNull(manager.zone(zone.id(), 0)); |
| assertNull(manager.zone(zone.id(), 123L)); |
| |
| // Validate newly created zone |
| assertEquals(zoneName, zone.name()); |
| assertEquals(42, zone.partitions()); |
| assertEquals(15, zone.replicas()); |
| assertEquals(73, zone.dataNodesAutoAdjust()); |
| assertEquals(INFINITE_TIMER_VALUE, zone.dataNodesAutoAdjustScaleUp()); |
| assertEquals(INFINITE_TIMER_VALUE, zone.dataNodesAutoAdjustScaleDown()); |
| assertEquals("expression", zone.filter()); |
| assertEquals("test_profile", zone.storageProfiles().profiles().get(0).storageProfile()); |
| } |
| |
| @Test |
| public void testSetDefaultZone() { |
| CatalogZoneDescriptor initialDefaultZone = latestActiveCatalog().defaultZone(); |
| |
| // Create new zone |
| { |
| StorageProfileParams storageProfile = StorageProfileParams.builder() |
| .storageProfile("test_profile") |
| .build(); |
| |
| CatalogCommand createZoneCmd = CreateZoneCommand.builder() |
| .zoneName(TEST_ZONE_NAME) |
| .storageProfilesParams(List.of(storageProfile)) |
| .build(); |
| |
| assertThat(manager.execute(createZoneCmd), willCompleteSuccessfully()); |
| |
| assertNotEquals(TEST_ZONE_NAME, latestActiveCatalog().defaultZone().name()); |
| } |
| |
| // Set new zone as default. |
| { |
| CatalogCommand setDefaultCmd = AlterZoneSetDefaultCommand.builder() |
| .zoneName(TEST_ZONE_NAME) |
| .build(); |
| |
| int prevVer = latestActiveCatalog().version(); |
| |
| assertThat(manager.execute(setDefaultCmd), willCompleteSuccessfully()); |
| assertEquals(TEST_ZONE_NAME, latestActiveCatalog().defaultZone().name()); |
| |
| // Make sure history has not been affected. |
| Catalog prevCatalog = Objects.requireNonNull(manager.catalog(prevVer)); |
| assertNotEquals(TEST_ZONE_NAME, prevCatalog.defaultZone().name()); |
| assertNotEquals(latestActiveCatalog().defaultZone().id(), prevCatalog.defaultZone().id()); |
| } |
| |
| // Create table in the new zone. |
| { |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| |
| Catalog catalog = latestActiveCatalog(); |
| CatalogTableDescriptor tab = Objects.requireNonNull(manager.table(TABLE_NAME, catalog.time())); |
| |
| assertEquals(catalog.defaultZone().id(), tab.zoneId()); |
| } |
| |
| // Setting default zone that is already the default changes nothing. |
| { |
| int lastVer = manager.latestCatalogVersion(); |
| |
| CatalogCommand setDefaultCmd = AlterZoneSetDefaultCommand.builder() |
| .zoneName(TEST_ZONE_NAME) |
| .build(); |
| |
| assertThat(manager.execute(setDefaultCmd), willCompleteSuccessfully()); |
| assertEquals(lastVer, manager.latestCatalogVersion()); |
| } |
| |
| // Drop old default zone. |
| { |
| CatalogCommand dropCommand = DropZoneCommand.builder() |
| .zoneName(initialDefaultZone.name()) |
| .build(); |
| |
| assertThat(manager.execute(dropCommand), willCompleteSuccessfully()); |
| } |
| } |
| |
| @Test |
| public void testDropZone() { |
| String zoneName = TEST_ZONE_NAME; |
| |
| CatalogCommand cmd = CreateZoneCommand.builder() |
| .zoneName(zoneName) |
| .storageProfilesParams(List.of(StorageProfileParams.builder().storageProfile(DEFAULT_STORAGE_PROFILE).build())) |
| .build(); |
| |
| assertThat(manager.execute(cmd), willCompleteSuccessfully()); |
| |
| long beforeDropTimestamp = clock.nowLong(); |
| |
| CatalogCommand dropCommand = DropZoneCommand.builder() |
| .zoneName(zoneName) |
| .build(); |
| |
| CompletableFuture<?> fut = manager.execute(dropCommand); |
| |
| assertThat(fut, willCompleteSuccessfully()); |
| |
| // Validate catalog version from the past. |
| CatalogZoneDescriptor zone = manager.zone(zoneName, beforeDropTimestamp); |
| |
| assertNotNull(zone); |
| assertEquals(zoneName, zone.name()); |
| |
| assertSame(zone, manager.zone(zone.id(), beforeDropTimestamp)); |
| |
| // Validate actual catalog |
| assertNull(manager.zone(zoneName, clock.nowLong())); |
| assertNull(manager.zone(zone.id(), clock.nowLong())); |
| |
| // Try to drop non-existing zone. |
| assertThat(manager.execute(dropCommand), willThrow(DistributionZoneNotFoundValidationException.class)); |
| } |
| |
| @Test |
| public void testDropDefaultZoneIsRejected() { |
| // Drop default zone is rejected. |
| { |
| Catalog catalog = latestActiveCatalog(); |
| CatalogCommand dropCommand = DropZoneCommand.builder() |
| .zoneName(catalog.defaultZone().name()) |
| .build(); |
| |
| int ver = catalog.version(); |
| |
| assertThat(manager.execute(dropCommand), willThrow(DistributionZoneCantBeDroppedValidationException.class)); |
| |
| assertEquals(ver, manager.latestCatalogVersion()); |
| } |
| |
| // Renamed zone deletion is also rejected. |
| { |
| CatalogCommand renameCommand = RenameZoneCommand.builder() |
| .zoneName(latestActiveCatalog().defaultZone().name()) |
| .newZoneName(TEST_ZONE_NAME) |
| .build(); |
| |
| int ver = manager.latestCatalogVersion(); |
| |
| assertThat(manager.execute(renameCommand), willCompleteSuccessfully()); |
| |
| assertSame(ver + 1, manager.latestCatalogVersion()); |
| |
| ver = manager.latestCatalogVersion(); |
| |
| CatalogCommand dropCommand = DropZoneCommand.builder() |
| .zoneName(TEST_ZONE_NAME) |
| .build(); |
| |
| assertThat(manager.execute(dropCommand), willThrow(DistributionZoneCantBeDroppedValidationException.class)); |
| assertSame(ver, manager.latestCatalogVersion()); |
| } |
| } |
| |
| @Test |
| public void testRenameZone() throws InterruptedException { |
| String zoneName = TEST_ZONE_NAME; |
| |
| CatalogCommand cmd = CreateZoneCommand.builder() |
| .zoneName(zoneName) |
| .partitions(42) |
| .replicas(15) |
| .storageProfilesParams(List.of(StorageProfileParams.builder().storageProfile(DEFAULT_STORAGE_PROFILE).build())) |
| .build(); |
| |
| assertThat(manager.execute(cmd), willCompleteSuccessfully()); |
| |
| long beforeDropTimestamp = clock.nowLong(); |
| |
| Thread.sleep(5); |
| |
| String newZoneName = "RenamedZone"; |
| |
| CatalogCommand renameZoneCmd = RenameZoneCommand.builder() |
| .zoneName(zoneName) |
| .newZoneName(newZoneName) |
| .build(); |
| |
| assertThat(manager.execute(renameZoneCmd), willCompleteSuccessfully()); |
| |
| // Validate catalog version from the past. |
| CatalogZoneDescriptor zone = manager.zone(zoneName, beforeDropTimestamp); |
| |
| assertNotNull(zone); |
| assertEquals(zoneName, zone.name()); |
| |
| assertSame(zone, manager.zone(zone.id(), beforeDropTimestamp)); |
| |
| // Validate actual catalog |
| zone = manager.zone(newZoneName, clock.nowLong()); |
| |
| assertNotNull(zone); |
| assertNull(manager.zone(zoneName, clock.nowLong())); |
| assertEquals(newZoneName, zone.name()); |
| |
| assertSame(zone, manager.zone(zone.id(), clock.nowLong())); |
| } |
| |
| @Test |
| public void testRenameDefaultZone() { |
| CatalogZoneDescriptor defaultZone = latestActiveCatalog().defaultZone(); |
| |
| assertNotEquals(TEST_ZONE_NAME, defaultZone.name()); |
| |
| CatalogCommand renameZoneCmd = RenameZoneCommand.builder() |
| .zoneName(defaultZone.name()) |
| .newZoneName(TEST_ZONE_NAME) |
| .build(); |
| |
| int ver = manager.latestCatalogVersion(); |
| assertThat(manager.execute(renameZoneCmd), willCompleteSuccessfully()); |
| |
| assertEquals(ver + 1, manager.latestCatalogVersion()); |
| assertEquals(TEST_ZONE_NAME, latestActiveCatalog().defaultZone().name()); |
| assertEquals(defaultZone.id(), latestActiveCatalog().defaultZone().id()); |
| } |
| |
| @Test |
| public void testDefaultZone() { |
| CatalogZoneDescriptor defaultZone = latestActiveCatalog().defaultZone(); |
| |
| // Try to create zone with default zone name. |
| CatalogCommand cmd = CreateZoneCommand.builder() |
| .zoneName(defaultZone.name()) |
| .partitions(42) |
| .replicas(15) |
| .storageProfilesParams(List.of(StorageProfileParams.builder().storageProfile(DEFAULT_STORAGE_PROFILE).build())) |
| .build(); |
| assertThat(manager.execute(cmd), willThrow(DistributionZoneExistsValidationException.class)); |
| |
| // Validate default zone wasn't changed. |
| assertSame(defaultZone, manager.zone(defaultZone.name(), clock.nowLong())); |
| } |
| |
| @Test |
| public void testAlterZone() { |
| String zoneName = TEST_ZONE_NAME; |
| |
| CatalogCommand cmd = CreateZoneCommand.builder() |
| .zoneName(zoneName) |
| .partitions(42) |
| .replicas(15) |
| .dataNodesAutoAdjust(73) |
| .filter("expression") |
| .storageProfilesParams(List.of(StorageProfileParams.builder().storageProfile(DEFAULT_STORAGE_PROFILE).build())) |
| .build(); |
| |
| CatalogCommand alterCmd = AlterZoneCommand.builder() |
| .zoneName(zoneName) |
| .partitions(10) |
| .replicas(2) |
| .dataNodesAutoAdjustScaleUp(3) |
| .dataNodesAutoAdjustScaleDown(4) |
| .filter("newExpression") |
| .storageProfilesParams(List.of(StorageProfileParams.builder().storageProfile("test_profile").build())) |
| .build(); |
| |
| assertThat(manager.execute(cmd), willCompleteSuccessfully()); |
| assertThat(manager.execute(alterCmd), willCompleteSuccessfully()); |
| |
| // Validate actual catalog |
| CatalogZoneDescriptor zone = manager.zone(zoneName, clock.nowLong()); |
| assertNotNull(zone); |
| assertSame(zone, manager.zone(zone.id(), clock.nowLong())); |
| |
| assertEquals(zoneName, zone.name()); |
| assertEquals(10, zone.partitions()); |
| assertEquals(2, zone.replicas()); |
| assertEquals(INFINITE_TIMER_VALUE, zone.dataNodesAutoAdjust()); |
| assertEquals(3, zone.dataNodesAutoAdjustScaleUp()); |
| assertEquals(4, zone.dataNodesAutoAdjustScaleDown()); |
| assertEquals("newExpression", zone.filter()); |
| assertEquals("test_profile", zone.storageProfiles().profiles().get(0).storageProfile()); |
| } |
| |
| @Test |
| public void testCreateZoneWithSameName() { |
| String zoneName = TEST_ZONE_NAME; |
| |
| CatalogCommand cmd = CreateZoneCommand.builder() |
| .zoneName(zoneName) |
| .partitions(42) |
| .replicas(15) |
| .storageProfilesParams(List.of(StorageProfileParams.builder().storageProfile(DEFAULT_STORAGE_PROFILE).build())) |
| .build(); |
| |
| assertThat(manager.execute(cmd), willCompleteSuccessfully()); |
| |
| // Try to create zone with same name. |
| cmd = CreateZoneCommand.builder() |
| .zoneName(zoneName) |
| .partitions(8) |
| .replicas(1) |
| .storageProfilesParams(List.of(StorageProfileParams.builder().storageProfile(DEFAULT_STORAGE_PROFILE).build())) |
| .build(); |
| |
| assertThat(manager.execute(cmd), willThrowFast(DistributionZoneExistsValidationException.class)); |
| |
| // Validate zone was NOT changed |
| CatalogZoneDescriptor zone = manager.zone(zoneName, clock.nowLong()); |
| |
| assertNotNull(zone); |
| assertSame(zone, manager.zone(zoneName, clock.nowLong())); |
| assertSame(zone, manager.zone(zone.id(), clock.nowLong())); |
| |
| assertEquals(zoneName, zone.name()); |
| assertEquals(42, zone.partitions()); |
| assertEquals(15, zone.replicas()); |
| } |
| |
| @Test |
| public void testCreateZoneEvents() { |
| String zoneName = TEST_ZONE_NAME; |
| |
| CatalogCommand cmd = CreateZoneCommand.builder() |
| .zoneName(zoneName) |
| .storageProfilesParams(List.of(StorageProfileParams.builder().storageProfile(DEFAULT_STORAGE_PROFILE).build())) |
| .build(); |
| |
| EventListener<CatalogEventParameters> eventListener = mock(EventListener.class); |
| when(eventListener.notify(any())).thenReturn(falseCompletedFuture()); |
| |
| manager.listen(CatalogEvent.ZONE_CREATE, eventListener); |
| manager.listen(CatalogEvent.ZONE_DROP, eventListener); |
| |
| CompletableFuture<?> fut = manager.execute(cmd); |
| |
| assertThat(fut, willCompleteSuccessfully()); |
| |
| verify(eventListener).notify(any(CreateZoneEventParameters.class)); |
| |
| CatalogCommand dropCommand = DropZoneCommand.builder() |
| .zoneName(zoneName) |
| .build(); |
| |
| fut = manager.execute(dropCommand); |
| |
| assertThat(fut, willCompleteSuccessfully()); |
| |
| verify(eventListener).notify(any(DropZoneEventParameters.class)); |
| verifyNoMoreInteractions(eventListener); |
| } |
| |
| @Test |
| public void testColumnEvents() { |
| EventListener<CatalogEventParameters> eventListener = mock(EventListener.class); |
| when(eventListener.notify(any())).thenReturn(falseCompletedFuture()); |
| |
| manager.listen(CatalogEvent.TABLE_ALTER, eventListener); |
| |
| // Try to add column without table. |
| assertThat(manager.execute(addColumnParams(TABLE_NAME, columnParams(NEW_COLUMN_NAME, INT32))), |
| willThrow(TableNotFoundValidationException.class)); |
| verifyNoInteractions(eventListener); |
| |
| // Create table. |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| |
| // Add column. |
| assertThat(manager.execute(addColumnParams(TABLE_NAME, columnParams(NEW_COLUMN_NAME, INT32))), willCompleteSuccessfully()); |
| verify(eventListener).notify(any(AddColumnEventParameters.class)); |
| |
| // Drop column. |
| assertThat(manager.execute(dropColumnParams(TABLE_NAME, NEW_COLUMN_NAME)), willCompleteSuccessfully()); |
| verify(eventListener).notify(any(DropColumnEventParameters.class)); |
| |
| verifyNoMoreInteractions(eventListener); |
| } |
| |
| @Test |
| public void userFutureCompletesAfterClusterWideActivationHappens() { |
| delayDuration.set(TimeUnit.DAYS.toMillis(365)); |
| |
| reset(clockWaiter); |
| HybridTimestamp startTs = clock.now(); |
| |
| CompletableFuture<?> createTableFuture = manager.execute(simpleTable(TABLE_NAME)); |
| |
| assertFalse(createTableFuture.isDone()); |
| |
| ArgumentCaptor<HybridTimestamp> tsCaptor = ArgumentCaptor.forClass(HybridTimestamp.class); |
| |
| verify(clockWaiter, timeout(10_000)).waitFor(tsCaptor.capture()); |
| HybridTimestamp userWaitTs = tsCaptor.getValue(); |
| assertThat( |
| userWaitTs.getPhysical() - startTs.getPhysical(), |
| greaterThanOrEqualTo(delayDuration.get() + clockService.maxClockSkewMillis()) |
| ); |
| } |
| |
| // TODO: remove after IGNITE-20378 is implemented. |
| @Test |
| public void userFutureCompletesAfterClusterWideActivationWithAdditionalIdleSafeTimePeriodHappens() { |
| delayDuration.set(TimeUnit.DAYS.toMillis(365)); |
| partitionIdleSafeTimePropagationPeriod.set(TimeUnit.DAYS.toDays(365)); |
| |
| reset(clockWaiter); |
| |
| HybridTimestamp startTs = clock.now(); |
| |
| CompletableFuture<?> createTableFuture = manager.execute(simpleTable(TABLE_NAME)); |
| |
| assertFalse(createTableFuture.isDone()); |
| |
| ArgumentCaptor<HybridTimestamp> tsCaptor = ArgumentCaptor.forClass(HybridTimestamp.class); |
| |
| verify(clockWaiter, timeout(10_000)).waitFor(tsCaptor.capture()); |
| HybridTimestamp userWaitTs = tsCaptor.getValue(); |
| assertThat( |
| userWaitTs.getPhysical() - startTs.getPhysical(), |
| greaterThanOrEqualTo( |
| delayDuration.get() + clockService.maxClockSkewMillis() |
| + partitionIdleSafeTimePropagationPeriod.get() + clockService.maxClockSkewMillis() |
| ) |
| ); |
| } |
| |
| @Test |
| void testGetCatalogEntityInCatalogEvent() { |
| var fireEventFuture = new CompletableFuture<Void>(); |
| |
| manager.listen(CatalogEvent.TABLE_CREATE, fromConsumer(fireEventFuture, parameters -> { |
| assertNotNull(manager.schema(parameters.catalogVersion())); |
| })); |
| |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| assertThat(fireEventFuture, willCompleteSuccessfully()); |
| } |
| |
| @Test |
| void testGetTableByIdAndCatalogVersion() { |
| int tableCreationVersion = await(manager.execute(simpleTable(TABLE_NAME))); |
| |
| CatalogTableDescriptor table = manager.table(TABLE_NAME, clock.nowLong()); |
| |
| assertNull(manager.table(table.id(), tableCreationVersion - 1)); |
| assertSame(table, manager.table(table.id(), tableCreationVersion)); |
| } |
| |
| @Test |
| void testGetTableIdOnDropIndexEvent() { |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| |
| assertThat(manager.execute(createHashIndexCommand(INDEX_NAME, List.of("VAL"))), willCompleteSuccessfully()); |
| |
| int indexId = manager.aliveIndex(INDEX_NAME, clock.nowLong()).id(); |
| |
| startBuildingIndex(indexId); |
| makeIndexAvailable(indexId); |
| |
| int tableId = manager.table(TABLE_NAME, clock.nowLong()).id(); |
| int pkIndexId = manager.aliveIndex(pkIndexName(TABLE_NAME), clock.nowLong()).id(); |
| |
| assertNotEquals(tableId, indexId); |
| |
| EventListener<StoppingIndexEventParameters> stoppingListener = mock(EventListener.class); |
| EventListener<RemoveIndexEventParameters> removedListener = mock(EventListener.class); |
| |
| ArgumentCaptor<StoppingIndexEventParameters> stoppingCaptor = ArgumentCaptor.forClass(StoppingIndexEventParameters.class); |
| ArgumentCaptor<RemoveIndexEventParameters> removingCaptor = ArgumentCaptor.forClass(RemoveIndexEventParameters.class); |
| |
| doReturn(falseCompletedFuture()).when(stoppingListener).notify(stoppingCaptor.capture()); |
| doReturn(falseCompletedFuture()).when(removedListener).notify(removingCaptor.capture()); |
| |
| manager.listen(CatalogEvent.INDEX_STOPPING, stoppingListener); |
| manager.listen(CatalogEvent.INDEX_REMOVED, removedListener); |
| |
| // Let's drop the index. |
| assertThat( |
| manager.execute(DropIndexCommand.builder().schemaName(SCHEMA_NAME).indexName(INDEX_NAME).build()), |
| willCompleteSuccessfully() |
| ); |
| |
| StoppingIndexEventParameters stoppingEventParameters = stoppingCaptor.getValue(); |
| |
| assertEquals(indexId, stoppingEventParameters.indexId()); |
| |
| // Let's drop the table. |
| assertThat(manager.execute(dropTableCommand(TABLE_NAME)), willCompleteSuccessfully()); |
| |
| // Let's make sure that the PK index has been removed. |
| RemoveIndexEventParameters pkRemovedEventParameters = removingCaptor.getAllValues().get(0); |
| |
| assertEquals(pkIndexId, pkRemovedEventParameters.indexId()); |
| } |
| |
| @Test |
| void testReCreateIndexWithSameName() { |
| createSomeTable(TABLE_NAME); |
| createSomeIndex(TABLE_NAME, INDEX_NAME); |
| |
| int catalogVersion = manager.latestCatalogVersion(); |
| CatalogIndexDescriptor index1 = manager.aliveIndex(INDEX_NAME, clock.nowLong()); |
| assertNotNull(index1); |
| |
| int indexId1 = index1.id(); |
| startBuildingIndex(indexId1); |
| makeIndexAvailable(indexId1); |
| |
| // Drop index. |
| dropIndex(INDEX_NAME); |
| removeIndex(indexId1); |
| assertNull(manager.aliveIndex(INDEX_NAME, clock.nowLong())); |
| |
| // Re-create index with same name. |
| createSomeSortedIndex(TABLE_NAME, INDEX_NAME); |
| |
| CatalogIndexDescriptor index2 = manager.aliveIndex(INDEX_NAME, clock.nowLong()); |
| assertNotNull(index2); |
| assertThat(index2.indexType(), equalTo(CatalogIndexDescriptorType.SORTED)); |
| |
| // Ensure these are different indexes. |
| int indexId2 = index2.id(); |
| assertNotEquals(indexId1, indexId2); |
| |
| // Ensure dropped index is available for historical queries. |
| assertNotNull(manager.index(indexId1, catalogVersion)); |
| assertThat(manager.index(indexId1, catalogVersion).indexType(), equalTo(CatalogIndexDescriptorType.HASH)); |
| assertNull(manager.index(indexId2, catalogVersion)); |
| } |
| |
| @Test |
| void testLatestCatalogVersion() { |
| assertEquals(1, manager.latestCatalogVersion()); |
| |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| assertEquals(2, manager.latestCatalogVersion()); |
| |
| assertThat(manager.execute(simpleIndex()), willCompleteSuccessfully()); |
| assertEquals(3, manager.latestCatalogVersion()); |
| } |
| |
| @Test |
| void testTables() { |
| int initialVersion = manager.latestCatalogVersion(); |
| |
| assertThat(manager.execute(simpleTable(TABLE_NAME + 0)), willCompleteSuccessfully()); |
| assertThat(manager.execute(simpleTable(TABLE_NAME + 1)), willCompleteSuccessfully()); |
| |
| assertThat(manager.tables(initialVersion), empty()); |
| assertThat( |
| manager.tables(initialVersion + 1), |
| hasItems(table(initialVersion + 1, TABLE_NAME + 0)) |
| ); |
| assertThat( |
| manager.tables(initialVersion + 2), |
| hasItems(table(initialVersion + 2, TABLE_NAME + 0), table(initialVersion + 2, TABLE_NAME + 1)) |
| ); |
| } |
| |
| @Test |
| void testIndexes() { |
| int initialVersion = manager.latestCatalogVersion(); |
| |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| assertThat(manager.execute(simpleIndex()), willCompleteSuccessfully()); |
| |
| assertThat(manager.indexes(initialVersion), empty()); |
| assertThat( |
| manager.indexes(initialVersion + 1), |
| hasItems(index(initialVersion + 1, pkIndexName(TABLE_NAME))) |
| ); |
| assertThat( |
| manager.indexes(initialVersion + 2), |
| hasItems(index(initialVersion + 2, pkIndexName(TABLE_NAME)), index(initialVersion + 2, INDEX_NAME)) |
| ); |
| } |
| |
| @Test |
| public void createTableProducesTableVersion1() { |
| createSomeTable(TABLE_NAME); |
| |
| CatalogTableDescriptor table = manager.table(TABLE_NAME, Long.MAX_VALUE); |
| |
| assertThat(table.tableVersion(), is(1)); |
| } |
| |
| @Test |
| public void addColumnIncrementsTableVersion() { |
| createSomeTable(TABLE_NAME); |
| |
| addSomeColumn(); |
| |
| CatalogTableDescriptor table = manager.table(TABLE_NAME, Long.MAX_VALUE); |
| |
| assertThat(table.tableVersion(), is(2)); |
| } |
| |
| private void addSomeColumn() { |
| assertThat(manager.execute(addColumnParams(TABLE_NAME, columnParams("val2", INT32))), willCompleteSuccessfully()); |
| } |
| |
| @Test |
| public void dropColumnIncrementsTableVersion() { |
| createSomeTable(TABLE_NAME); |
| |
| assertThat(manager.execute(dropColumnParams(TABLE_NAME, "val1")), willCompleteSuccessfully()); |
| |
| CatalogTableDescriptor table = manager.table(TABLE_NAME, Long.MAX_VALUE); |
| |
| assertThat(table.tableVersion(), is(2)); |
| } |
| |
| @Test |
| public void alterColumnIncrementsTableVersion() { |
| createSomeTable(TABLE_NAME); |
| |
| CompletableFuture<?> future = manager.execute( |
| AlterTableAlterColumnCommand.builder() |
| .schemaName(SCHEMA_NAME) |
| .tableName(TABLE_NAME) |
| .columnName("val1") |
| .type(INT64) |
| .build() |
| ); |
| assertThat(future, willCompleteSuccessfully()); |
| |
| CatalogTableDescriptor table = manager.table(TABLE_NAME, Long.MAX_VALUE); |
| |
| assertThat(table.tableVersion(), is(2)); |
| } |
| |
| @Test |
| public void testTableCreationToken() { |
| createSomeTable(TABLE_NAME); |
| |
| CatalogTableDescriptor table = manager.table(TABLE_NAME, Long.MAX_VALUE); |
| |
| long expectedCreationToken = table.updateToken(); |
| |
| assertEquals(expectedCreationToken, table.creationToken()); |
| |
| int tableCreationVersion = await(manager.execute( |
| AlterTableAlterColumnCommand.builder() |
| .schemaName(SCHEMA_NAME) |
| .tableName(TABLE_NAME) |
| .columnName("val1") |
| .type(INT64) |
| .build() |
| )); |
| |
| table = manager.table(TABLE_NAME, Long.MAX_VALUE); |
| |
| assertThat(table.tableVersion(), is(2)); |
| |
| assertEquals(expectedCreationToken, table.creationToken()); |
| |
| table = manager.table(tableId(TABLE_NAME), tableCreationVersion); |
| |
| assertEquals(expectedCreationToken, table.creationToken()); |
| } |
| |
| @Test |
| void testCreateZoneWithDefaults() { |
| assertThat( |
| manager.execute( |
| CreateZoneCommand.builder() |
| .zoneName(TEST_ZONE_NAME) |
| .storageProfilesParams( |
| List.of(StorageProfileParams.builder().storageProfile(DEFAULT_STORAGE_PROFILE).build()) |
| ).build() |
| ), |
| willCompleteSuccessfully() |
| ); |
| |
| CatalogZoneDescriptor zone = manager.zone(TEST_ZONE_NAME, clock.nowLong()); |
| |
| assertEquals(DEFAULT_PARTITION_COUNT, zone.partitions()); |
| assertEquals(DEFAULT_REPLICA_COUNT, zone.replicas()); |
| assertEquals(INFINITE_TIMER_VALUE, zone.dataNodesAutoAdjust()); |
| assertEquals(IMMEDIATE_TIMER_VALUE, zone.dataNodesAutoAdjustScaleUp()); |
| assertEquals(INFINITE_TIMER_VALUE, zone.dataNodesAutoAdjustScaleDown()); |
| assertEquals(DEFAULT_FILTER, zone.filter()); |
| assertEquals(DEFAULT_STORAGE_PROFILE, zone.storageProfiles().defaultProfile().storageProfile()); |
| } |
| |
| @Test |
| void testCreateIndexWithAlreadyExistingName() { |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| assertThat(manager.execute(simpleIndex()), willCompleteSuccessfully()); |
| |
| assertThat( |
| manager.execute(createHashIndexCommand(INDEX_NAME, List.of("VAL"))), |
| willThrowFast(IndexExistsValidationException.class) |
| ); |
| |
| assertThat( |
| manager.execute(createSortedIndexCommand(INDEX_NAME, List.of("VAL"), List.of(ASC_NULLS_LAST))), |
| willThrowFast(IndexExistsValidationException.class) |
| ); |
| } |
| |
| @Test |
| void testCreateIndexWithSameNameAsExistingTable() { |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| |
| assertThat( |
| manager.execute(createHashIndexCommand(TABLE_NAME, List.of("VAL"))), |
| willThrowFast(TableExistsValidationException.class) |
| ); |
| |
| assertThat( |
| manager.execute(createSortedIndexCommand(TABLE_NAME, List.of("VAL"), List.of(ASC_NULLS_LAST))), |
| willThrowFast(TableExistsValidationException.class) |
| ); |
| } |
| |
| @Test |
| void testCreateIndexWithNotExistingTable() { |
| assertThat( |
| manager.execute(createHashIndexCommand(TABLE_NAME, List.of("VAL"))), |
| willThrowFast(TableNotFoundValidationException.class) |
| ); |
| |
| assertThat( |
| manager.execute(createSortedIndexCommand(TABLE_NAME, List.of("VAL"), List.of(ASC_NULLS_LAST))), |
| willThrowFast(TableNotFoundValidationException.class) |
| ); |
| } |
| |
| @Test |
| void testCreateIndexWithMissingTableColumns() { |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| |
| assertThat( |
| manager.execute(createHashIndexCommand(INDEX_NAME, List.of("fake"))), |
| willThrowFast(CatalogValidationException.class) |
| ); |
| |
| assertThat( |
| manager.execute(createSortedIndexCommand(INDEX_NAME, List.of("fake"), List.of(ASC_NULLS_LAST))), |
| willThrowFast(CatalogValidationException.class) |
| ); |
| } |
| |
| @Test |
| void testCreateUniqIndexWithMissingTableColocationColumns() { |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| |
| assertThat( |
| manager.execute(createHashIndexCommand(INDEX_NAME, true, List.of("VAL"))), |
| willThrowFast(CatalogValidationException.class, "Unique index must include all colocation columns") |
| ); |
| |
| assertThat( |
| manager.execute(createSortedIndexCommand(INDEX_NAME, true, List.of("VAL"), List.of(ASC_NULLS_LAST))), |
| willThrowFast(CatalogValidationException.class, "Unique index must include all colocation columns") |
| ); |
| } |
| |
| @Test |
| void droppingAnAvailableIndexMovesItToStoppingState() { |
| createSomeTable(TABLE_NAME); |
| createSomeIndex(TABLE_NAME, INDEX_NAME); |
| |
| int indexId = indexId(INDEX_NAME); |
| |
| startBuildingIndex(indexId); |
| makeIndexAvailable(indexId); |
| |
| dropIndex(INDEX_NAME); |
| |
| CatalogIndexDescriptor index = manager.index(indexId, manager.latestCatalogVersion()); |
| |
| assertThat(index, is(notNullValue())); |
| assertThat(index.status(), is(STOPPING)); |
| } |
| |
| @ParameterizedTest |
| @MethodSource("notYetAvailableIndexStatuses") |
| void droppingNotAvailableIndexRemovesIt(CatalogIndexStatus status) { |
| createSomeTable(TABLE_NAME); |
| createSomeIndex(TABLE_NAME, INDEX_NAME); |
| |
| rollIndexStatusTo(status, indexId(INDEX_NAME)); |
| |
| dropIndex(INDEX_NAME); |
| |
| CatalogIndexDescriptor index = index(manager.latestCatalogVersion(), INDEX_NAME); |
| |
| assertThat(index, is(nullValue())); |
| } |
| |
| private void startBuildingIndex(int indexId) { |
| assertThat(manager.execute(StartBuildingIndexCommand.builder().indexId(indexId).build()), willCompleteSuccessfully()); |
| } |
| |
| private static Stream<Arguments> notYetAvailableIndexStatuses() { |
| return Stream.of(REGISTERED, BUILDING).map(Arguments::of); |
| } |
| |
| @Test |
| void removingStoppedIndexRemovesItFromCatalog() { |
| createSomeTable(TABLE_NAME); |
| createSomeIndex(TABLE_NAME, INDEX_NAME); |
| |
| int indexId = indexId(INDEX_NAME); |
| |
| rollIndexStatusTo(STOPPING, indexId); |
| |
| assertThat(manager.index(indexId, manager.latestCatalogVersion()).status(), is(STOPPING)); |
| |
| removeIndex(indexId); |
| |
| CatalogIndexDescriptor index = manager.index(indexId, manager.latestCatalogVersion()); |
| |
| assertThat(index, is(nullValue())); |
| } |
| |
| private void rollIndexStatusTo(CatalogIndexStatus status, int indexId) { |
| for (CatalogIndexStatus currentStatus : List.of(REGISTERED, BUILDING, AVAILABLE, STOPPING)) { |
| if (currentStatus == status) { |
| break; |
| } |
| |
| switch (currentStatus) { |
| case REGISTERED: |
| startBuildingIndex(indexId); |
| break; |
| case BUILDING: |
| makeIndexAvailable(indexId); |
| break; |
| case AVAILABLE: |
| dropIndex(indexId); |
| break; |
| case STOPPING: |
| removeIndex(indexId); |
| break; |
| default: |
| fail("Unsupported state: " + currentStatus); |
| break; |
| } |
| } |
| } |
| |
| private void removeIndex(int indexId) { |
| assertThat( |
| manager.execute(RemoveIndexCommand.builder().indexId(indexId).build()), |
| willCompleteSuccessfully() |
| ); |
| } |
| |
| private void dropIndex(String indexName) { |
| assertThat( |
| manager.execute(DropIndexCommand.builder().indexName(indexName).schemaName(DEFAULT_SCHEMA_NAME).build()), |
| willCompleteSuccessfully() |
| ); |
| } |
| |
| private void dropIndex(int indexId) { |
| CatalogIndexDescriptor index = manager.index(indexId, Long.MAX_VALUE); |
| assertThat(index, is(notNullValue())); |
| |
| dropIndex(index.name()); |
| } |
| |
| @Test |
| void testDropNotExistingIndex() { |
| assertThat( |
| manager.execute(DropIndexCommand.builder().schemaName(SCHEMA_NAME).indexName(INDEX_NAME).build()), |
| willThrowFast(IndexNotFoundValidationException.class) |
| ); |
| } |
| |
| @Test |
| void testDropNotExistingTable() { |
| assertThat(manager.execute(dropTableCommand(TABLE_NAME)), willThrowFast(CatalogValidationException.class)); |
| } |
| |
| @Test |
| void testDropColumnWithNotExistingTable() { |
| assertThat(manager.execute(dropColumnParams(TABLE_NAME, "key")), willThrowFast(TableNotFoundValidationException.class)); |
| } |
| |
| @Test |
| void testDropColumnWithMissingTableColumns() { |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| |
| assertThat(manager.execute(dropColumnParams(TABLE_NAME, "fake")), willThrowFast(CatalogValidationException.class)); |
| } |
| |
| @Test |
| void testDropColumnWithPrimaryKeyColumns() { |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| |
| assertThat( |
| manager.execute(dropColumnParams(TABLE_NAME, "ID")), |
| willThrowFast(CatalogValidationException.class, "Deleting column `ID` belonging to primary key is not allowed") |
| ); |
| } |
| |
| @Test |
| void testDropColumnWithIndexColumns() { |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| assertThat(manager.execute(simpleIndex()), willCompleteSuccessfully()); |
| |
| assertThat( |
| manager.execute(dropColumnParams(TABLE_NAME, "VAL")), |
| willThrowFast( |
| CatalogValidationException.class, |
| "Deleting column 'VAL' used by index(es) [myIndex], it is not allowed" |
| ) |
| ); |
| } |
| |
| @Test |
| void testAddColumnWithNotExistingTable() { |
| assertThat(manager.execute(addColumnParams(TABLE_NAME, columnParams("key", INT32))), |
| willThrowFast(TableNotFoundValidationException.class)); |
| } |
| |
| @Test |
| void testAddColumnWithExistingName() { |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| |
| assertThat(manager.execute(addColumnParams(TABLE_NAME, columnParams("ID", INT32))), |
| willThrowFast(CatalogValidationException.class)); |
| } |
| |
| @Test |
| void bulkCommandEitherAppliedAtomicallyOrDoesntAppliedAtAll() { |
| String tableName1 = "TEST1"; |
| String tableName2 = "TEST2"; |
| String tableName3 = "TEST1"; // intentional name conflict with table1 |
| |
| List<CatalogCommand> bulkUpdate = List.of( |
| simpleTable(tableName1), |
| simpleTable(tableName2), |
| simpleTable(tableName3) |
| ); |
| |
| assertThat(manager.table(tableName1, Long.MAX_VALUE), nullValue()); |
| assertThat(manager.table(tableName2, Long.MAX_VALUE), nullValue()); |
| assertThat(manager.table(tableName3, Long.MAX_VALUE), nullValue()); |
| |
| assertThat(manager.execute(bulkUpdate), willThrowFast(TableExistsValidationException.class)); |
| |
| // now let's truncate problematic table and retry |
| assertThat(manager.execute(bulkUpdate.subList(0, bulkUpdate.size() - 1)), willCompleteSuccessfully()); |
| |
| assertThat(manager.table(tableName1, Long.MAX_VALUE), notNullValue()); |
| assertThat(manager.table(tableName2, Long.MAX_VALUE), notNullValue()); |
| } |
| |
| @Test |
| void bulkUpdateIncrementsVersionByOne() { |
| String tableName1 = "T1"; |
| String tableName2 = "T2"; |
| String tableName3 = "T3"; |
| |
| int versionBefore = manager.latestCatalogVersion(); |
| |
| assertThat(manager.table(tableName1, Long.MAX_VALUE), nullValue()); |
| assertThat(manager.table(tableName2, Long.MAX_VALUE), nullValue()); |
| assertThat(manager.table(tableName3, Long.MAX_VALUE), nullValue()); |
| |
| assertThat( |
| manager.execute(List.of(simpleTable(tableName1), simpleTable(tableName2), simpleTable(tableName3))), |
| willCompleteSuccessfully() |
| ); |
| |
| int versionAfter = manager.latestCatalogVersion(); |
| |
| assertThat(manager.table(tableName1, Long.MAX_VALUE), notNullValue()); |
| assertThat(manager.table(tableName2, Long.MAX_VALUE), notNullValue()); |
| assertThat(manager.table(tableName3, Long.MAX_VALUE), notNullValue()); |
| |
| assertThat(versionAfter - versionBefore, is(1)); |
| } |
| |
| @Test |
| void bulkUpdateDoesntIncrementVersionInCaseOfError() { |
| String tableName1 = "T1"; |
| |
| int versionBefore = manager.latestCatalogVersion(); |
| |
| assertThat(manager.table(tableName1, Long.MAX_VALUE), nullValue()); |
| |
| assertThat( |
| manager.execute(List.of(simpleTable(tableName1), simpleTable(tableName1))), |
| willThrow(CatalogValidationException.class) |
| ); |
| |
| int versionAfter = manager.latestCatalogVersion(); |
| |
| assertThat(manager.table(tableName1, Long.MAX_VALUE), nullValue()); |
| |
| assertThat(versionAfter, is(versionBefore)); |
| } |
| |
| @Test |
| void testMakeHashIndexAvailable() { |
| createSomeTable(TABLE_NAME); |
| |
| assertThat( |
| manager.execute(createHashIndexCommand(INDEX_NAME, List.of("key1"))), |
| willCompleteSuccessfully() |
| ); |
| |
| int indexId = indexId(INDEX_NAME); |
| |
| startBuildingIndex(indexId); |
| makeIndexAvailable(indexId); |
| |
| CatalogHashIndexDescriptor index = (CatalogHashIndexDescriptor) index(manager.latestCatalogVersion(), INDEX_NAME); |
| |
| assertEquals(AVAILABLE, index.status()); |
| } |
| |
| private void makeIndexAvailable(int indexId) { |
| assertThat( |
| manager.execute(MakeIndexAvailableCommand.builder().indexId(indexId).build()), |
| willCompleteSuccessfully() |
| ); |
| } |
| |
| @Test |
| void testMakeSortedIndexAvailable() { |
| createSomeTable(TABLE_NAME); |
| |
| assertThat( |
| manager.execute(createSortedIndexCommand(INDEX_NAME, List.of("key1"), List.of(ASC_NULLS_LAST))), |
| willCompleteSuccessfully() |
| ); |
| |
| int indexId = indexId(INDEX_NAME); |
| |
| assertThat( |
| manager.execute(startBuildingIndexCommand(indexId)), |
| willCompleteSuccessfully() |
| ); |
| |
| makeIndexAvailable(indexId); |
| |
| CatalogSortedIndexDescriptor index = (CatalogSortedIndexDescriptor) index(manager.latestCatalogVersion(), INDEX_NAME); |
| |
| assertEquals(AVAILABLE, index.status()); |
| } |
| |
| @Test |
| void testAvailableIndexEvent() { |
| createSomeTable(TABLE_NAME); |
| |
| assertThat( |
| manager.execute(createHashIndexCommand(INDEX_NAME, List.of("key1"))), |
| willCompleteSuccessfully() |
| ); |
| |
| int indexId = index(manager.latestCatalogVersion(), INDEX_NAME).id(); |
| |
| var fireEventFuture = new CompletableFuture<Void>(); |
| |
| manager.listen(CatalogEvent.INDEX_AVAILABLE, fromConsumer(fireEventFuture, (MakeIndexAvailableEventParameters parameters) -> { |
| assertEquals(indexId, parameters.indexId()); |
| })); |
| |
| assertThat( |
| manager.execute(startBuildingIndexCommand(indexId)), |
| willCompleteSuccessfully() |
| ); |
| |
| makeIndexAvailable(indexId); |
| |
| assertThat(fireEventFuture, willCompleteSuccessfully()); |
| } |
| |
| @Test |
| void testPkAvailableIndexEvent() { |
| String tableName = TABLE_NAME + "_new"; |
| |
| var fireEventFuture = new CompletableFuture<Void>(); |
| |
| manager.listen(CatalogEvent.INDEX_AVAILABLE, fromConsumer(fireEventFuture, (MakeIndexAvailableEventParameters parameters) -> { |
| CatalogIndexDescriptor catalogIndexDescriptor = manager.index(parameters.indexId(), parameters.catalogVersion()); |
| |
| assertNotNull(catalogIndexDescriptor); |
| assertEquals(pkIndexName(tableName), catalogIndexDescriptor.name()); |
| })); |
| |
| createSomeTable(tableName); |
| |
| assertThat(fireEventFuture, willCompleteSuccessfully()); |
| } |
| |
| @Test |
| void testPkAvailableOnCreateIndexEvent() { |
| var fireEventFuture = new CompletableFuture<Void>(); |
| |
| manager.listen(CatalogEvent.INDEX_CREATE, fromConsumer(fireEventFuture, (CreateIndexEventParameters parameters) -> { |
| assertEquals(AVAILABLE, parameters.indexDescriptor().status()); |
| })); |
| |
| createSomeTable(TABLE_NAME); |
| |
| assertThat(fireEventFuture, willCompleteSuccessfully()); |
| } |
| |
| @Test |
| void testGetIndexesForTables() { |
| String tableName0 = TABLE_NAME + 0; |
| String tableName1 = TABLE_NAME + 1; |
| |
| createSomeTable(tableName0); |
| createSomeTable(tableName1); |
| |
| createSomeIndex(tableName1, INDEX_NAME); |
| |
| int catalogVersion = manager.latestCatalogVersion(); |
| |
| // Let's check for a non-existent table. |
| assertThat(tableIndexIds(catalogVersion, Integer.MAX_VALUE), empty()); |
| |
| // Let's check for an existing tables. |
| int tableId0 = tableId(tableName0); |
| int tableId1 = tableId(tableName1); |
| |
| assertThat(tableIndexIds(catalogVersion, tableId0), hasItems(indexId(pkIndexName(tableName0)))); |
| assertThat(tableIndexIds(catalogVersion, tableId1), hasItems(indexId(pkIndexName(tableName1)), indexId(INDEX_NAME))); |
| } |
| |
| @Test |
| void testGetIndexesForTableInSortedOrderById() { |
| createSomeTable(TABLE_NAME); |
| |
| String indexName0 = INDEX_NAME + 0; |
| String indexName1 = INDEX_NAME + 1; |
| |
| createSomeIndex(TABLE_NAME, indexName0); |
| createSomeIndex(TABLE_NAME, indexName1); |
| |
| int indexId0 = indexId(pkIndexName(TABLE_NAME)); |
| int indexId1 = indexId(indexName0); |
| int indexId2 = indexId(indexName1); |
| |
| int catalogVersion = manager.latestCatalogVersion(); |
| |
| assertThat(tableIndexIds(catalogVersion, tableId(TABLE_NAME)), equalTo(List.of(indexId0, indexId1, indexId2))); |
| } |
| |
| @Test |
| void testTableRename() { |
| createSomeTable(TABLE_NAME); |
| |
| int prevVersion = manager.latestCatalogVersion(); |
| |
| CatalogCommand command = RenameTableCommand.builder() |
| .schemaName(SCHEMA_NAME) |
| .tableName(TABLE_NAME) |
| .newTableName(TABLE_NAME_2) |
| .build(); |
| |
| assertThat(manager.execute(command), willCompleteSuccessfully()); |
| |
| int curVersion = manager.latestCatalogVersion(); |
| |
| CatalogTableDescriptor prevDescriptor = table(prevVersion, TABLE_NAME); |
| CatalogTableDescriptor curDescriptor = table(curVersion, TABLE_NAME_2); |
| |
| assertThat(prevDescriptor, is(notNullValue())); |
| assertThat(prevDescriptor.name(), is(TABLE_NAME)); |
| |
| assertThat(curDescriptor, is(notNullValue())); |
| assertThat(curDescriptor.name(), is(TABLE_NAME_2)); |
| |
| assertThat(table(prevVersion, TABLE_NAME_2), is(nullValue())); |
| assertThat(table(curVersion, TABLE_NAME), is(nullValue())); |
| |
| assertThat(curDescriptor.tableVersion(), is(prevDescriptor.tableVersion() + 1)); |
| |
| // Assert that all other properties have been left intact. |
| assertThat(curDescriptor.id(), is(prevDescriptor.id())); |
| assertThat(curDescriptor.columns(), is(prevDescriptor.columns())); |
| assertThat(curDescriptor.colocationColumns(), is(prevDescriptor.colocationColumns())); |
| assertThat(curDescriptor.creationToken(), is(prevDescriptor.creationToken())); |
| assertThat(curDescriptor.primaryKeyColumns(), is(prevDescriptor.primaryKeyColumns())); |
| assertThat(curDescriptor.primaryKeyIndexId(), is(prevDescriptor.primaryKeyIndexId())); |
| assertThat(curDescriptor.schemaId(), is(prevDescriptor.schemaId())); |
| } |
| |
| @Test |
| void testTableRenameAndCreateTableWithSameName() { |
| createSomeTable(TABLE_NAME); |
| |
| CatalogCommand command = RenameTableCommand.builder() |
| .schemaName(SCHEMA_NAME) |
| .tableName(TABLE_NAME) |
| .newTableName(TABLE_NAME_2) |
| .build(); |
| |
| assertThat(manager.execute(command), willCompleteSuccessfully()); |
| |
| createSomeTable(TABLE_NAME); |
| |
| int catalogVersion = manager.latestCatalogVersion(); |
| |
| assertThat(table(catalogVersion, TABLE_NAME), is(notNullValue())); |
| assertThat(table(catalogVersion, TABLE_NAME_2), is(notNullValue())); |
| } |
| |
| @Test |
| void testTableRenameFiresEvent() { |
| createSomeTable(TABLE_NAME); |
| |
| var fireEventFuture = new CompletableFuture<Void>(); |
| |
| manager.listen(CatalogEvent.TABLE_ALTER, fromConsumer(fireEventFuture, (RenameTableEventParameters parameters) -> { |
| CatalogTableDescriptor tableDescriptor = table(manager.latestCatalogVersion(), TABLE_NAME_2); |
| |
| assertThat(parameters.tableId(), is(tableDescriptor.id())); |
| assertThat(parameters.newTableName(), is(tableDescriptor.name())); |
| })); |
| |
| CatalogCommand command = RenameTableCommand.builder() |
| .schemaName(SCHEMA_NAME) |
| .tableName(TABLE_NAME) |
| .newTableName(TABLE_NAME_2) |
| .build(); |
| |
| assertThat(manager.execute(command), willCompleteSuccessfully()); |
| assertThat(fireEventFuture, willCompleteSuccessfully()); |
| } |
| |
| @Test |
| void testStartHashIndexBuilding() { |
| createSomeTable(TABLE_NAME); |
| |
| assertThat( |
| manager.execute(createHashIndexCommand(INDEX_NAME, List.of("key1"))), |
| willCompleteSuccessfully() |
| ); |
| |
| assertThat( |
| manager.execute(StartBuildingIndexCommand.builder().indexId(indexId(INDEX_NAME)).build()), |
| willCompleteSuccessfully() |
| ); |
| |
| CatalogHashIndexDescriptor index = (CatalogHashIndexDescriptor) index(manager.latestCatalogVersion(), INDEX_NAME); |
| |
| assertEquals(BUILDING, index.status()); |
| } |
| |
| @Test |
| void testStartSortedIndexBuilding() { |
| createSomeTable(TABLE_NAME); |
| |
| assertThat( |
| manager.execute(createSortedIndexCommand(INDEX_NAME, List.of("key1"), List.of(ASC_NULLS_LAST))), |
| willCompleteSuccessfully() |
| ); |
| |
| assertThat( |
| manager.execute(StartBuildingIndexCommand.builder().indexId(indexId(INDEX_NAME)).build()), |
| willCompleteSuccessfully() |
| ); |
| |
| CatalogSortedIndexDescriptor index = (CatalogSortedIndexDescriptor) index(manager.latestCatalogVersion(), INDEX_NAME); |
| |
| assertEquals(BUILDING, index.status()); |
| } |
| |
| @Test |
| void testStartBuildingIndexEvent() { |
| createSomeTable(TABLE_NAME); |
| |
| assertThat( |
| manager.execute(createHashIndexCommand(INDEX_NAME, List.of("key1"))), |
| willCompleteSuccessfully() |
| ); |
| |
| int indexId = index(manager.latestCatalogVersion(), INDEX_NAME).id(); |
| |
| var fireEventFuture = new CompletableFuture<Void>(); |
| |
| manager.listen(CatalogEvent.INDEX_BUILDING, fromConsumer(fireEventFuture, (StartBuildingIndexEventParameters parameters) -> { |
| assertEquals(indexId, parameters.indexId()); |
| })); |
| |
| assertThat( |
| manager.execute(startBuildingIndexCommand(indexId)), |
| willCompleteSuccessfully() |
| ); |
| |
| assertThat(fireEventFuture, willCompleteSuccessfully()); |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = {true, false}) |
| void testIndexCreationCatalogVersionAfterUpdateIndex(boolean hashIndex) { |
| createSomeTable(TABLE_NAME); |
| |
| if (hashIndex) { |
| createSomeIndex(TABLE_NAME, INDEX_NAME); |
| } else { |
| createSomeSortedIndex(TABLE_NAME, INDEX_NAME); |
| } |
| |
| int expCreationVersion = manager.latestCatalogVersion(); |
| |
| int indexId = indexId(INDEX_NAME); |
| |
| assertThat(manager.execute(startBuildingIndexCommand(indexId)), willCompleteSuccessfully()); |
| |
| Catalog latestCatalog = manager.catalog(manager.activeCatalogVersion(clock.nowLong())); |
| |
| assertThat(latestCatalog.version(), greaterThan(expCreationVersion)); |
| |
| assertEquals(expCreationVersion, latestCatalog.index(indexId).txWaitCatalogVersion()); |
| } |
| |
| @ParameterizedTest(name = "hashIndex={0}, updateIndex={1}") |
| @MethodSource("argumentsForCheckIndexCreationCatalogVersion") |
| void testIndexCreationCatalogVersionAfterUpdateCatalog(boolean hashIndex, boolean updateIndex) { |
| createSomeTable(TABLE_NAME); |
| |
| if (hashIndex) { |
| createSomeIndex(TABLE_NAME, INDEX_NAME); |
| } else { |
| createSomeSortedIndex(TABLE_NAME, INDEX_NAME); |
| } |
| |
| int expCreationVersion = manager.latestCatalogVersion(); |
| |
| int indexId = indexId(INDEX_NAME); |
| |
| if (updateIndex) { |
| assertThat(manager.execute(startBuildingIndexCommand(indexId)), willCompleteSuccessfully()); |
| } else { |
| createSomeTable(TABLE_NAME + 1); |
| } |
| |
| Catalog latestCatalog = manager.catalog(manager.activeCatalogVersion(clock.nowLong())); |
| |
| assertThat(latestCatalog.version(), greaterThan(expCreationVersion)); |
| |
| assertEquals(expCreationVersion, latestCatalog.index(indexId).txWaitCatalogVersion()); |
| } |
| |
| private static Stream<Arguments> argumentsForCheckIndexCreationCatalogVersion() { |
| return Stream.of( |
| Arguments.of(true, true), // Create hash index and update index status.. |
| Arguments.of(true, false), // Create hash index and update catalog (create table). |
| Arguments.of(false, true), // Create sorted index and update index status.. |
| Arguments.of(false, false) // Create sorted index and update catalog (create table). |
| ); |
| } |
| |
| @Test |
| public void testCreateSchema() { |
| String schemaName = "S1"; |
| |
| assertThat(manager.execute(CreateSchemaCommand.builder().name(schemaName).build()), willCompleteSuccessfully()); |
| |
| Catalog latestCatalog = manager.catalog(manager.activeCatalogVersion(clock.nowLong())); |
| |
| assertNotNull(latestCatalog); |
| assertNotNull(latestCatalog.schema(schemaName)); |
| assertNotNull(latestCatalog.schema(DEFAULT_SCHEMA_NAME)); |
| |
| assertThat( |
| manager.execute(CreateSchemaCommand.builder().name(schemaName).build()), |
| willThrowFast(CatalogValidationException.class, "Schema with name 'S1' already exists") |
| ); |
| } |
| |
| @Test |
| public void testCatalogCompaction() throws Exception { |
| assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully()); |
| assertThat(manager.execute(simpleTable(TABLE_NAME_2)), willCompleteSuccessfully()); |
| |
| long timestamp = clock.nowLong(); |
| Catalog catalog = manager.catalog(manager.activeCatalogVersion(clock.nowLong())); |
| |
| // Add more updates |
| assertThat(manager.execute(simpleIndex(TABLE_NAME, INDEX_NAME)), willCompleteSuccessfully()); |
| assertThat(manager.execute(simpleIndex(TABLE_NAME, INDEX_NAME_2)), willCompleteSuccessfully()); |
| |
| assertThat(manager.compactCatalog(timestamp), willBe(Boolean.TRUE)); |
| assertTrue(waitForCondition(() -> catalog.version() == manager.earliestCatalogVersion(), 3_000)); |
| |
| assertNull(manager.catalog(0)); |
| assertNull(manager.catalog(catalog.version() - 1)); |
| assertNotNull(manager.catalog(catalog.version())); |
| |
| assertThrows(IllegalStateException.class, () -> manager.activeCatalogVersion(0)); |
| assertThrows(IllegalStateException.class, () -> manager.activeCatalogVersion(catalog.time() - 1)); |
| assertSame(catalog.version(), manager.activeCatalogVersion(catalog.time())); |
| assertSame(catalog.version(), manager.activeCatalogVersion(timestamp)); |
| |
| assertThat(manager.compactCatalog(timestamp), willBe(false)); |
| assertEquals(catalog.version(), manager.earliestCatalogVersion()); |
| } |
| |
| @Test |
| public void testEmptyCatalogCompaction() { |
| assertEquals(1, manager.latestCatalogVersion()); |
| |
| long timestamp = clock.nowLong(); |
| |
| assertThat(manager.compactCatalog(timestamp), willBe(false)); |
| |
| assertEquals(0, manager.earliestCatalogVersion()); |
| assertEquals(1, manager.latestCatalogVersion()); |
| |
| assertNotNull(manager.catalog(1)); |
| |
| assertEquals(0, manager.activeCatalogVersion(0)); |
| assertEquals(1, manager.activeCatalogVersion(timestamp)); |
| } |
| |
| private CompletableFuture<?> changeColumn( |
| String tab, |
| String col, |
| @Nullable TestColumnTypeParams typeParams, |
| @Nullable Boolean notNull, |
| @Nullable Supplier<DefaultValue> dflt |
| ) { |
| AlterTableAlterColumnCommandBuilder builder = AlterTableAlterColumnCommand.builder() |
| .schemaName(SCHEMA_NAME) |
| .tableName(tab) |
| .columnName(col); |
| |
| if (notNull != null) { |
| builder.nullable(!notNull); |
| } |
| |
| if (dflt != null) { |
| builder.deferredDefaultValue(ignore -> dflt.get()); |
| } |
| |
| if (typeParams != null) { |
| builder.type(typeParams.type); |
| |
| if (typeParams.precision != null) { |
| builder.precision(typeParams.precision); |
| } |
| |
| if (typeParams.length != null) { |
| builder.length(typeParams.length); |
| } |
| |
| if (typeParams.scale != null) { |
| builder.scale(typeParams.scale); |
| } |
| } |
| |
| return manager.execute(builder.build()); |
| } |
| |
| private static CatalogCommand simpleIndex() { |
| return createSortedIndexCommand(INDEX_NAME, List.of("VAL"), List.of(ASC_NULLS_LAST)); |
| } |
| |
| private static class TestColumnTypeParams { |
| private final ColumnType type; |
| private final Integer precision; |
| private final Integer length; |
| private final Integer scale; |
| |
| private TestColumnTypeParams(ColumnType type) { |
| this(type, null, null, null); |
| } |
| |
| private TestColumnTypeParams(ColumnType type, @Nullable Integer precision, @Nullable Integer length, @Nullable Integer scale) { |
| this.type = type; |
| this.precision = precision; |
| this.length = length; |
| this.scale = scale; |
| } |
| } |
| |
| private @Nullable CatalogTableDescriptor table(int catalogVersion, String tableName) { |
| return manager.schema(catalogVersion).table(tableName); |
| } |
| |
| private @Nullable CatalogIndexDescriptor index(int catalogVersion, String indexName) { |
| return manager.schema(catalogVersion).aliveIndex(indexName); |
| } |
| |
| private int tableId(String tableName) { |
| CatalogTableDescriptor table = manager.table(tableName, clock.nowLong()); |
| |
| assertNotNull(table, tableName); |
| |
| return table.id(); |
| } |
| |
| private int indexId(String indexName) { |
| CatalogIndexDescriptor index = manager.aliveIndex(indexName, clock.nowLong()); |
| |
| assertNotNull(index, indexName); |
| |
| return index.id(); |
| } |
| |
| private void createSomeTable(String tableName) { |
| assertThat( |
| manager.execute(createTableCommand( |
| tableName, |
| List.of(columnParams("key1", INT32), columnParams("val1", INT32)), |
| List.of("key1"), |
| List.of("key1") |
| )), |
| willCompleteSuccessfully() |
| ); |
| } |
| |
| private void createSomeIndex(String tableName, String indexName) { |
| assertThat( |
| manager.execute(createHashIndexCommand(tableName, indexName, false, List.of("key1"))), |
| willCompleteSuccessfully() |
| ); |
| } |
| |
| private void createSomeSortedIndex(String tableName, String indexName) { |
| assertThat( |
| manager.execute(createSortedIndexCommand(tableName, indexName, false, List.of("key1"), List.of(ASC_NULLS_LAST))), |
| willCompleteSuccessfully() |
| ); |
| } |
| |
| private List<Integer> tableIndexIds(int catalogVersion, int tableId) { |
| return manager.indexes(catalogVersion, tableId).stream().map(CatalogObjectDescriptor::id).collect(toList()); |
| } |
| |
| private static <T extends CatalogEventParameters> EventListener<T> fromConsumer( |
| CompletableFuture<Void> fireEventFuture, |
| Consumer<T> consumer |
| ) { |
| return parameters -> { |
| try { |
| consumer.accept(parameters); |
| |
| fireEventFuture.complete(null); |
| } catch (Throwable t) { |
| fireEventFuture.completeExceptionally(t); |
| } |
| |
| return falseCompletedFuture(); |
| }; |
| } |
| |
| private Catalog latestActiveCatalog() { |
| Catalog catalog = manager.catalog(manager.activeCatalogVersion(clock.nowLong())); |
| |
| return Objects.requireNonNull(catalog); |
| } |
| } |