IGNITE-19082: Catalog. Cleanup dead code (#3669)
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index 14e90a1..070c7a8 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -47,6 +47,7 @@
import java.util.concurrent.Flow.Publisher;
import java.util.function.LongSupplier;
import org.apache.ignite.internal.catalog.commands.AlterZoneSetDefaultCommand;
+import org.apache.ignite.internal.catalog.commands.CreateSchemaCommand;
import org.apache.ignite.internal.catalog.commands.CreateZoneCommand;
import org.apache.ignite.internal.catalog.commands.StorageProfileParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor;
@@ -119,6 +120,9 @@
/** Versioned catalog descriptors sorted in chronological order. */
private final NavigableMap<Long, Catalog> catalogByTs = new ConcurrentSkipListMap<>();
+ /** A future that completes when an empty catalog is initialised. If catalog is not empty this future when this completes starts. */
+ private final CompletableFuture<Void> catalogInitializationFuture = new CompletableFuture<>();
+
private final UpdateLog updateLog;
private final PendingComparableValuesTracker<Integer, Void> versionTracker = new PendingComparableValuesTracker<>(0);
@@ -158,27 +162,7 @@
public CompletableFuture<Void> startAsync() {
int objectIdGen = 0;
- // TODO: IGNITE-19082 Move default schema objects initialization to cluster init procedure.
- CatalogSchemaDescriptor publicSchema = new CatalogSchemaDescriptor(
- objectIdGen++,
- DEFAULT_SCHEMA_NAME,
- new CatalogTableDescriptor[0],
- new CatalogIndexDescriptor[0],
- new CatalogSystemViewDescriptor[0],
- INITIAL_CAUSALITY_TOKEN
- );
-
- // TODO: IGNITE-19082 Move system schema objects initialization to cluster init procedure.
- CatalogSchemaDescriptor systemSchema = new CatalogSchemaDescriptor(
- objectIdGen++,
- SYSTEM_SCHEMA_NAME,
- new CatalogTableDescriptor[0],
- new CatalogIndexDescriptor[0],
- new CatalogSystemViewDescriptor[0],
- INITIAL_CAUSALITY_TOKEN
- );
-
- Catalog emptyCatalog = new Catalog(0, 0L, objectIdGen, List.of(), List.of(publicSchema, systemSchema), null);
+ Catalog emptyCatalog = new Catalog(0, 0L, objectIdGen, List.of(), List.of(), null);
registerCatalog(emptyCatalog);
@@ -187,12 +171,17 @@
return updateLog.startAsync()
.thenCompose(none -> {
if (latestCatalogVersion() == emptyCatalog.version()) {
- // node has not seen any updates yet, let's try to initialise
- // catalog with default zone
- return createDefaultZone(emptyCatalog);
- }
+ int initializedCatalogVersion = emptyCatalog.version() + 1;
- return nullCompletedFuture();
+ this.catalogReadyFuture(initializedCatalogVersion)
+ .thenCompose(ignored -> awaitVersionActivation(initializedCatalogVersion))
+ .handle((r, e) -> catalogInitializationFuture.complete(null));
+
+ return initCatalog(emptyCatalog);
+ } else {
+ catalogInitializationFuture.complete(null);
+ return nullCompletedFuture();
+ }
});
}
@@ -205,7 +194,11 @@
@Override
public @Nullable CatalogTableDescriptor table(String tableName, long timestamp) {
- return catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME).table(tableName);
+ CatalogSchemaDescriptor schema = catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME);
+ if (schema == null) {
+ return null;
+ }
+ return schema.table(tableName);
}
@Override
@@ -225,7 +218,11 @@
@Override
public @Nullable CatalogIndexDescriptor aliveIndex(String indexName, long timestamp) {
- return catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME).aliveIndex(indexName);
+ CatalogSchemaDescriptor schema = catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME);
+ if (schema == null) {
+ return null;
+ }
+ return schema.aliveIndex(indexName);
}
@Override
@@ -322,6 +319,11 @@
}
@Override
+ public CompletableFuture<Void> catalogInitializationFuture() {
+ return catalogInitializationFuture;
+ }
+
+ @Override
public @Nullable Catalog catalog(int catalogVersion) {
return catalogByVer.get(catalogVersion);
}
@@ -363,8 +365,9 @@
return updateLog.saveSnapshot(new SnapshotEntry(catalog));
}
- private CompletableFuture<Void> createDefaultZone(Catalog emptyCatalog) {
- List<UpdateEntry> createZoneEntries = new BulkUpdateProducer(List.of(
+ private CompletableFuture<Void> initCatalog(Catalog emptyCatalog) {
+ List<CatalogCommand> initCommands = List.of(
+ // Init default zone
CreateZoneCommand.builder()
.zoneName(DEFAULT_ZONE_NAME)
.partitions(DEFAULT_PARTITION_COUNT)
@@ -378,10 +381,15 @@
.build(),
AlterZoneSetDefaultCommand.builder()
.zoneName(DEFAULT_ZONE_NAME)
- .build()
- )).get(emptyCatalog);
+ .build(),
+ // Add schemas
+ CreateSchemaCommand.builder().name(DEFAULT_SCHEMA_NAME).build(),
+ CreateSchemaCommand.builder().name(SYSTEM_SCHEMA_NAME).build()
+ );
- return updateLog.append(new VersionedUpdate(emptyCatalog.version() + 1, 0L, createZoneEntries))
+ List<UpdateEntry> entries = new BulkUpdateProducer(initCommands).get(emptyCatalog);
+
+ return updateLog.append(new VersionedUpdate(emptyCatalog.version() + 1, 0L, entries))
.handle((result, error) -> {
if (error != null) {
LOG.warn("Unable to create default zone.", error);
@@ -407,13 +415,7 @@
CompletableFuture<Integer> resultFuture = new CompletableFuture<>();
saveUpdate(updateProducer, 0)
- .thenCompose(newVersion -> {
- Catalog catalog = catalogByVer.get(newVersion);
-
- HybridTimestamp tsSafeForRoReadingInPastOptimization = calcClusterWideEnsureActivationTime(catalog);
-
- return clockService.waitFor(tsSafeForRoReadingInPastOptimization).thenApply(unused -> newVersion);
- })
+ .thenCompose(this::awaitVersionActivation)
.whenComplete((newVersion, err) -> {
if (err != null) {
Throwable errUnwrapped = ExceptionUtils.unwrapCause(err);
@@ -448,6 +450,14 @@
return resultFuture;
}
+ private CompletableFuture<Integer> awaitVersionActivation(int version) {
+ Catalog catalog = catalogByVer.get(version);
+
+ HybridTimestamp tsSafeForRoReadingInPastOptimization = calcClusterWideEnsureActivationTime(catalog);
+
+ return clockService.waitFor(tsSafeForRoReadingInPastOptimization).thenApply(unused -> version);
+ }
+
private HybridTimestamp calcClusterWideEnsureActivationTime(Catalog catalog) {
return clusterWideEnsuredActivationTsSafeForRoReads(
catalog,
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
index eefbaa0..cc1c65d 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
@@ -45,8 +45,10 @@
* <p>TBD: events
*/
public interface CatalogService extends EventProducer<CatalogEvent, CatalogEventParameters> {
+ /** Default schema name. */
String DEFAULT_SCHEMA_NAME = "PUBLIC";
+ /** System schema name. */
String SYSTEM_SCHEMA_NAME = "SYSTEM";
/** Default storage profile. */
@@ -110,4 +112,9 @@
* @param version Catalog version to wait for.
*/
CompletableFuture<Void> catalogReadyFuture(int version);
+
+ /**
+ * Returns a future, which completes when empty catalog is initialised. Otherwise this future completes upon startup.
+ */
+ CompletableFuture<Void> catalogInitializationFuture();
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AbstractCreateIndexCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AbstractCreateIndexCommand.java
index 08bebca..12bf302 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AbstractCreateIndexCommand.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AbstractCreateIndexCommand.java
@@ -91,7 +91,7 @@
}
return List.of(
- new NewIndexEntry(createDescriptor(catalog.objectIdGenState(), table.id(), catalog.version() + 1), schemaName),
+ new NewIndexEntry(createDescriptor(catalog.objectIdGenState(), table.id(), catalog.version() + 1)),
new ObjectIdGenUpdateEntry(1)
);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAddColumnCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAddColumnCommand.java
index 57a2d1a..d1cc894 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAddColumnCommand.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAddColumnCommand.java
@@ -89,7 +89,7 @@
}
return List.of(
- new NewColumnsEntry(table.id(), columnDescriptors, schemaName)
+ new NewColumnsEntry(table.id(), columnDescriptors)
);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAlterColumnCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAlterColumnCommand.java
index 1a06cc1..1fda646 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAlterColumnCommand.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAlterColumnCommand.java
@@ -116,7 +116,7 @@
}
return List.of(
- new AlterColumnEntry(table.id(), target, schemaName)
+ new AlterColumnEntry(table.id(), target)
);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableDropColumnCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableDropColumnCommand.java
index 9e6ec15..7190859 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableDropColumnCommand.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableDropColumnCommand.java
@@ -107,7 +107,7 @@
});
return List.of(
- new DropColumnsEntry(table.id(), columns, schemaName)
+ new DropColumnsEntry(table.id(), columns)
);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommand.java
new file mode 100644
index 0000000..38f15e0
--- /dev/null
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommand.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.commands;
+
+import static org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
+import static org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.validateIdentifier;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+
+import java.util.List;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.CatalogValidationException;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSystemViewDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.storage.NewSchemaEntry;
+import org.apache.ignite.internal.catalog.storage.ObjectIdGenUpdateEntry;
+import org.apache.ignite.internal.catalog.storage.UpdateEntry;
+
+/**
+ * Command to create a new schema.
+ */
+public class CreateSchemaCommand implements CatalogCommand {
+
+ private final String schemaName;
+
+ private CreateSchemaCommand(String schemaName) {
+ validateIdentifier(schemaName, "Name of the schema");
+
+ this.schemaName = schemaName;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public List<UpdateEntry> get(Catalog catalog) {
+ int id = catalog.objectIdGenState();
+
+ if (catalog.schema(schemaName) != null) {
+ throw new CatalogValidationException(format("Schema with name '{}' already exists", schemaName));
+ }
+
+ CatalogSchemaDescriptor schema = new CatalogSchemaDescriptor(
+ id,
+ schemaName,
+ new CatalogTableDescriptor[0],
+ new CatalogIndexDescriptor[0],
+ new CatalogSystemViewDescriptor[0],
+ INITIAL_CAUSALITY_TOKEN
+ );
+
+ return List.of(
+ new NewSchemaEntry(schema),
+ new ObjectIdGenUpdateEntry(1)
+ );
+ }
+
+ /** Returns builder to create a command to create a new schema. */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /** Implementation of {@link CreateSchemaCommandBuilder}. */
+ public static class Builder implements CreateSchemaCommandBuilder {
+
+ private String name;
+
+ /** {@inheritDoc} */
+ @Override
+ public CreateSchemaCommandBuilder name(String name) {
+ this.name = name;
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CatalogCommand build() {
+ return new CreateSchemaCommand(name);
+ }
+ }
+}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommandBuilder.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommandBuilder.java
new file mode 100644
index 0000000..b3e690f
--- /dev/null
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommandBuilder.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.commands;
+
+import org.apache.ignite.internal.catalog.CatalogCommand;
+
+/**
+ * Builder for a {@link CreateSchemaCommand}.
+ */
+public interface CreateSchemaCommandBuilder {
+
+ /** Sets schema name. Should not be null or blank. */
+ CreateSchemaCommandBuilder name(String name);
+
+ /** Creates new schema command. */
+ CatalogCommand build();
+}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSystemViewCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSystemViewCommand.java
index efe6e63..baf358d 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSystemViewCommand.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSystemViewCommand.java
@@ -108,7 +108,7 @@
CatalogSchemaDescriptor systemSchema = schemaOrThrow(catalog, CatalogManager.SYSTEM_SCHEMA_NAME);
List<CatalogTableColumnDescriptor> viewColumns = columns.stream().map(CatalogUtils::fromParams).collect(toList());
- CatalogSystemViewDescriptor descriptor = new CatalogSystemViewDescriptor(id, name, viewColumns, systemViewType);
+ CatalogSystemViewDescriptor descriptor = new CatalogSystemViewDescriptor(id, systemSchema.id(), name, viewColumns, systemViewType);
CatalogSystemViewDescriptor existingSystemView = systemSchema.systemView(name);
@@ -121,7 +121,7 @@
}
return List.of(
- new NewSystemViewEntry(descriptor, systemSchema.name()),
+ new NewSystemViewEntry(descriptor),
new ObjectIdGenUpdateEntry(1)
);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
index d8defb6..7a05f44 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
@@ -151,8 +151,8 @@
CatalogIndexDescriptor pkIndex = createIndexDescriptor(txWaitCatalogVersion, indexName, pkIndexId, tableId);
return List.of(
- new NewTableEntry(table, schemaName),
- new NewIndexEntry(pkIndex, schemaName),
+ new NewTableEntry(table),
+ new NewIndexEntry(pkIndex),
new MakeIndexAvailableEntry(pkIndexId),
new ObjectIdGenUpdateEntry(id - catalog.objectIdGenState())
);
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommand.java
index 52cfbaf..2efb9e4 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommand.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommand.java
@@ -90,7 +90,7 @@
case BUILDING:
return List.of(new RemoveIndexEntry(index.id()));
case AVAILABLE:
- return List.of(new DropIndexEntry(index.id(), index.tableId()));
+ return List.of(new DropIndexEntry(index.id()));
default:
throw new IllegalStateException("Unknown index status: " + index.status());
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropTableCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropTableCommand.java
index ed2f713..399b2d9 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropTableCommand.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropTableCommand.java
@@ -62,7 +62,7 @@
updateEntries.add(new RemoveIndexEntry(index.id()));
});
- updateEntries.add(new DropTableEntry(table.id(), schemaName));
+ updateEntries.add(new DropTableEntry(table.id()));
return updateEntries;
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogColumnCollation.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogColumnCollation.java
index f494106..efaf92a 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogColumnCollation.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogColumnCollation.java
@@ -20,7 +20,7 @@
/**
* Enumeration of all supported collations.
*/
-// TODO: IGNITE-19082 drop similar classes in index and sql-engine modules.
+// TODO: https://issues.apache.org/jira/browse/IGNITE-22179 drop similar classes in index and sql-engine modules.
public enum CatalogColumnCollation {
ASC_NULLS_FIRST(true, true),
ASC_NULLS_LAST(true, false),
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexDescriptor.java
index 93a1aff..a14c846 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexDescriptor.java
@@ -99,12 +99,13 @@
/** Returns catalog index descriptor type by identifier. */
public static CatalogIndexDescriptorType forId(int id) {
- assert id == HASH.typeId || id == SORTED.typeId : "Unknown index descriptor type ID: " + id;
-
- if (id == HASH.typeId) {
- return HASH;
- } else {
- return SORTED;
+ switch (id) {
+ case 0:
+ return HASH;
+ case 1:
+ return SORTED;
+ default:
+ throw new IllegalArgumentException("Unknown index descriptor type id: " + id);
}
}
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptor.java
index 6efa736..edbb46b 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptor.java
@@ -35,6 +35,8 @@
public class CatalogSystemViewDescriptor extends CatalogObjectDescriptor {
public static final CatalogObjectSerializer<CatalogSystemViewDescriptor> SERIALIZER = new SystemViewDescriptorSerializer();
+ private final int schemaId;
+
private final List<CatalogTableColumnDescriptor> columns;
private final SystemViewType systemViewType;
@@ -43,18 +45,26 @@
* Constructor.
*
* @param id View id.
+ * @param schemaId Schema id.
* @param name View name.
* @param columns View columns.
* @param systemViewType View type.
*/
- public CatalogSystemViewDescriptor(int id, String name, List<CatalogTableColumnDescriptor> columns, SystemViewType systemViewType) {
- this(id, name, columns, systemViewType, INITIAL_CAUSALITY_TOKEN);
+ public CatalogSystemViewDescriptor(
+ int id,
+ int schemaId,
+ String name,
+ List<CatalogTableColumnDescriptor> columns,
+ SystemViewType systemViewType
+ ) {
+ this(id, schemaId, name, columns, systemViewType, INITIAL_CAUSALITY_TOKEN);
}
/**
* Constructor.
*
* @param id View id.
+ * @param schemaId Schema id.
* @param name View name.
* @param columns View columns.
* @param systemViewType View type.
@@ -62,6 +72,7 @@
*/
public CatalogSystemViewDescriptor(
int id,
+ int schemaId,
String name,
List<CatalogTableColumnDescriptor> columns,
SystemViewType systemViewType,
@@ -69,11 +80,21 @@
) {
super(id, Type.SYSTEM_VIEW, name, causalityToken);
+ this.schemaId = schemaId;
this.columns = Objects.requireNonNull(columns, "columns");
this.systemViewType = Objects.requireNonNull(systemViewType, "viewType");
}
/**
+ * Returns a schema id of this view.
+ *
+ * @return A schema id.
+ */
+ public int schemaId() {
+ return schemaId;
+ }
+
+ /**
* Returns a list of columns of this view.
*
* @return A list of columns.
@@ -101,13 +122,13 @@
return false;
}
CatalogSystemViewDescriptor that = (CatalogSystemViewDescriptor) o;
- return Objects.equals(columns, that.columns) && systemViewType == that.systemViewType;
+ return schemaId == that.schemaId && Objects.equals(columns, that.columns) && systemViewType == that.systemViewType;
}
/** {@inheritDoc} */
@Override
public int hashCode() {
- return Objects.hash(columns, systemViewType);
+ return Objects.hash(schemaId, columns, systemViewType);
}
/** {@inheritDoc} */
@@ -116,6 +137,7 @@
return S.toString(
CatalogSystemViewDescriptor.class, this,
"id", id(),
+ "schemaId", schemaId,
"name", name(),
"columns", columns,
"systemViewType", systemViewType()
@@ -147,12 +169,13 @@
/** Returns system view type by identifier. */
private static SystemViewType forId(int id) {
- if (id == 0) {
- return NODE;
- } else {
- assert id == 1;
-
- return CLUSTER;
+ switch (id) {
+ case 0:
+ return NODE;
+ case 1:
+ return CLUSTER;
+ default:
+ throw new IllegalArgumentException("Unknown system view type id: " + id);
}
}
}
@@ -164,6 +187,7 @@
@Override
public CatalogSystemViewDescriptor readFrom(IgniteDataInput input) throws IOException {
int id = input.readInt();
+ int schemaId = input.readInt();
String name = input.readUTF();
long updateToken = input.readLong();
List<CatalogTableColumnDescriptor> columns = readList(CatalogTableColumnDescriptor.SERIALIZER, input);
@@ -171,12 +195,13 @@
byte sysViewTypeId = input.readByte();
SystemViewType sysViewType = SystemViewType.forId(sysViewTypeId);
- return new CatalogSystemViewDescriptor(id, name, columns, sysViewType, updateToken);
+ return new CatalogSystemViewDescriptor(id, schemaId, name, columns, sysViewType, updateToken);
}
@Override
public void writeTo(CatalogSystemViewDescriptor descriptor, IgniteDataOutput output) throws IOException {
output.writeInt(descriptor.id());
+ output.writeInt(descriptor.schemaId);
output.writeUTF(descriptor.name());
output.writeLong(descriptor.updateToken());
writeList(descriptor.columns(), CatalogTableColumnDescriptor.SERIALIZER, output);
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
index e8f7b1d..ab14b29 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
@@ -26,12 +26,12 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableSchemaVersions.TableVersion;
import org.apache.ignite.internal.catalog.storage.serialization.CatalogObjectSerializer;
import org.apache.ignite.internal.tostring.IgniteToStringExclude;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.io.IgniteDataInput;
import org.apache.ignite.internal.util.io.IgniteDataOutput;
@@ -55,7 +55,9 @@
private final CatalogTableSchemaVersions schemaVersions;
private final List<CatalogTableColumnDescriptor> columns;
+ @IgniteToStringInclude
private final List<String> primaryKeyColumns;
+ @IgniteToStringInclude
private final List<String> colocationColumns;
@IgniteToStringExclude
@@ -125,22 +127,12 @@
this.pkIndexId = pkIndexId;
this.zoneId = zoneId;
this.columns = Objects.requireNonNull(columns, "No columns defined.");
- primaryKeyColumns = Objects.requireNonNull(pkCols, "No primary key columns.");
- colocationColumns = colocationCols == null || colocationCols.isEmpty() ? pkCols : colocationCols;
-
+ this.primaryKeyColumns = Objects.requireNonNull(pkCols, "No primary key columns.");
this.columnsMap = columns.stream().collect(Collectors.toMap(CatalogTableColumnDescriptor::name, Function.identity()));
-
- this.schemaVersions = schemaVersions;
-
+ this.colocationColumns = Objects.requireNonNullElse(colocationCols, pkCols);
+ this.schemaVersions = Objects.requireNonNull(schemaVersions, "No catalog schema versions.");
+ this.storageProfile = Objects.requireNonNull(storageProfile, "No storage profile.");
this.creationToken = creationToken;
-
- this.storageProfile = storageProfile;
-
- // TODO: IGNITE-19082 Throw proper exceptions.
- assert !columnsMap.isEmpty() : "No columns.";
-
- assert primaryKeyColumns.stream().noneMatch(c -> Objects.requireNonNull(columnsMap.get(c), c).nullable());
- assert Set.copyOf(primaryKeyColumns).containsAll(colocationColumns);
}
/**
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StoppingIndexEventParameters.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StoppingIndexEventParameters.java
index ac8864b..6a27103 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StoppingIndexEventParameters.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StoppingIndexEventParameters.java
@@ -25,7 +25,6 @@
* @see CatalogEvent#INDEX_STOPPING
*/
public class StoppingIndexEventParameters extends IndexEventParameters {
- private final int tableId;
/**
* Constructor.
@@ -33,16 +32,8 @@
* @param causalityToken Causality token.
* @param catalogVersion Catalog version.
* @param indexId An id of dropped index.
- * @param tableId Table ID for which the index was removed.
*/
- public StoppingIndexEventParameters(long causalityToken, int catalogVersion, int indexId, int tableId) {
+ public StoppingIndexEventParameters(long causalityToken, int catalogVersion, int indexId) {
super(causalityToken, catalogVersion, indexId);
-
- this.tableId = tableId;
- }
-
- /** Returns table ID for which the index was removed. */
- public int tableId() {
- return tableId;
}
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AbstractChangeIndexStatusEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AbstractChangeIndexStatusEntry.java
index 6c6baac..e86cf15 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AbstractChangeIndexStatusEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AbstractChangeIndexStatusEntry.java
@@ -21,6 +21,8 @@
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.indexOrThrow;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceIndex;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceSchema;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import org.apache.ignite.internal.catalog.Catalog;
@@ -61,19 +63,9 @@
}
static CatalogSchemaDescriptor schemaByIndexId(Catalog catalog, int indexId) {
- CatalogIndexDescriptor index = catalog.index(indexId);
-
- assert index != null : indexId;
-
- CatalogTableDescriptor table = catalog.table(index.tableId());
-
- assert table != null : index.tableId();
-
- CatalogSchemaDescriptor schema = catalog.schema(table.schemaId());
-
- assert schema != null : table.schemaId();
-
- return schema;
+ CatalogIndexDescriptor index = indexOrThrow(catalog, indexId);
+ CatalogTableDescriptor table = tableOrThrow(catalog, index.tableId());
+ return schemaOrThrow(catalog, table.schemaId());
}
private CatalogIndexDescriptor updateIndexStatus(
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
index f7cefa5..0d08df2 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
@@ -17,12 +17,12 @@
package org.apache.ignite.internal.catalog.storage;
-import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceSchema;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceTable;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
import java.io.IOException;
import org.apache.ignite.internal.catalog.Catalog;
@@ -48,19 +48,15 @@
private final CatalogTableColumnDescriptor column;
- private final String schemaName;
-
/**
* Constructs the object.
*
* @param tableId An id the table to be modified.
* @param column A modified descriptor of the column to be replaced.
- * @param schemaName Schema name.
*/
- public AlterColumnEntry(int tableId, CatalogTableColumnDescriptor column, String schemaName) {
+ public AlterColumnEntry(int tableId, CatalogTableColumnDescriptor column) {
this.tableId = tableId;
this.column = column;
- this.schemaName = schemaName;
}
/** Returns an id the table to be modified. */
@@ -90,18 +86,17 @@
@Override
public Catalog applyUpdate(Catalog catalog, long causalityToken) {
- CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);
+ CatalogTableDescriptor table = tableOrThrow(catalog, tableId);
+ CatalogSchemaDescriptor schema = schemaOrThrow(catalog, table.schemaId());
- CatalogTableDescriptor currentTableDescriptor = requireNonNull(catalog.table(tableId));
-
- CatalogTableDescriptor newTableDescriptor = currentTableDescriptor.newDescriptor(
- currentTableDescriptor.name(),
- currentTableDescriptor.tableVersion() + 1,
- currentTableDescriptor.columns().stream()
+ CatalogTableDescriptor newTable = table.newDescriptor(
+ table.name(),
+ table.tableVersion() + 1,
+ table.columns().stream()
.map(source -> source.name().equals(column.name()) ? column : source)
.collect(toList()),
causalityToken,
- currentTableDescriptor.storageProfile()
+ table.storageProfile()
);
return new Catalog(
@@ -109,7 +104,7 @@
catalog.time(),
catalog.objectIdGenState(),
catalog.zones(),
- replaceSchema(replaceTable(schema, newTableDescriptor), catalog.schemas()),
+ replaceSchema(replaceTable(schema, newTable), catalog.schemas()),
defaultZoneIdOpt(catalog)
);
}
@@ -126,18 +121,15 @@
@Override
public AlterColumnEntry readFrom(IgniteDataInput input) throws IOException {
CatalogTableColumnDescriptor descriptor = CatalogTableColumnDescriptor.SERIALIZER.readFrom(input);
-
- String schemaName = input.readUTF();
int tableId = input.readInt();
- return new AlterColumnEntry(tableId, descriptor, schemaName);
+ return new AlterColumnEntry(tableId, descriptor);
}
@Override
public void writeTo(AlterColumnEntry value, IgniteDataOutput output) throws IOException {
CatalogTableColumnDescriptor.SERIALIZER.writeTo(value.descriptor(), output);
- output.writeUTF(value.schemaName);
output.writeInt(value.tableId);
}
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
index c3cd9f8..c31f5fc 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
@@ -17,12 +17,12 @@
package org.apache.ignite.internal.catalog.storage;
-import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceSchema;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceTable;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
import static org.apache.ignite.internal.catalog.storage.serialization.CatalogSerializationUtils.writeStringCollection;
import static org.apache.ignite.internal.util.IgniteUtils.capacity;
@@ -50,19 +50,16 @@
private final int tableId;
private final Set<String> columns;
- private final String schemaName;
/**
* Constructs the object.
*
* @param tableId Table id.
* @param columns Names of columns to drop.
- * @param schemaName Schema name.
*/
- public DropColumnsEntry(int tableId, Set<String> columns, String schemaName) {
+ public DropColumnsEntry(int tableId, Set<String> columns) {
this.tableId = tableId;
this.columns = columns;
- this.schemaName = schemaName;
}
/** Returns table id. */
@@ -92,18 +89,17 @@
@Override
public Catalog applyUpdate(Catalog catalog, long causalityToken) {
- CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);
+ CatalogTableDescriptor table = tableOrThrow(catalog, tableId);
+ CatalogSchemaDescriptor schema = schemaOrThrow(catalog, table.schemaId());
- CatalogTableDescriptor currentTableDescriptor = requireNonNull(catalog.table(tableId));
-
- CatalogTableDescriptor newTableDescriptor = currentTableDescriptor.newDescriptor(
- currentTableDescriptor.name(),
- currentTableDescriptor.tableVersion() + 1,
- currentTableDescriptor.columns().stream()
+ CatalogTableDescriptor newTable = table.newDescriptor(
+ table.name(),
+ table.tableVersion() + 1,
+ table.columns().stream()
.filter(col -> !columns.contains(col.name()))
.collect(toList()),
causalityToken,
- currentTableDescriptor.storageProfile()
+ table.storageProfile()
);
return new Catalog(
@@ -111,7 +107,7 @@
catalog.time(),
catalog.objectIdGenState(),
catalog.zones(),
- replaceSchema(replaceTable(schema, newTableDescriptor), catalog.schemas()),
+ replaceSchema(replaceTable(schema, newTable), catalog.schemas()),
defaultZoneIdOpt(catalog)
);
}
@@ -127,16 +123,14 @@
private static class DropColumnEntrySerializer implements CatalogObjectSerializer<DropColumnsEntry> {
@Override
public DropColumnsEntry readFrom(IgniteDataInput input) throws IOException {
- String schemaName = input.readUTF();
int tableId = input.readInt();
Set<String> columns = CatalogSerializationUtils.readStringCollection(input, size -> new HashSet<>(capacity(size)));
- return new DropColumnsEntry(tableId, columns, schemaName);
+ return new DropColumnsEntry(tableId, columns);
}
@Override
public void writeTo(DropColumnsEntry object, IgniteDataOutput output) throws IOException {
- output.writeUTF(object.schemaName);
output.writeInt(object.tableId());
writeStringCollection(object.columns(), output);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
index 134455e..4241dee 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
@@ -33,20 +33,15 @@
* the {@link CatalogIndexStatus#STOPPING} state.
*/
public class DropIndexEntry extends AbstractChangeIndexStatusEntry implements Fireable {
- public static final DropIndexEntrySerializer SERIALIZER = new DropIndexEntrySerializer();
-
- private final int tableId;
+ public static final CatalogObjectSerializer<DropIndexEntry> SERIALIZER = new DropIndexEntrySerializer();
/**
* Constructs the object.
*
* @param indexId An id of an index to drop.
- * @param tableId Table ID for which the index was removed.
*/
- public DropIndexEntry(int indexId, int tableId) {
+ public DropIndexEntry(int indexId) {
super(indexId, CatalogIndexStatus.STOPPING);
-
- this.tableId = tableId;
}
/** Returns an id of an index to drop. */
@@ -54,11 +49,6 @@
return indexId;
}
- /** Returns table ID for which the index was removed. */
- public int tableId() {
- return tableId;
- }
-
@Override
public int typeId() {
return MarshallableEntryType.DROP_INDEX.id();
@@ -71,7 +61,7 @@
@Override
public CatalogEventParameters createEventParameters(long causalityToken, int catalogVersion) {
- return new StoppingIndexEventParameters(causalityToken, catalogVersion, indexId, tableId);
+ return new StoppingIndexEventParameters(causalityToken, catalogVersion, indexId);
}
@Override
@@ -86,15 +76,13 @@
@Override
public DropIndexEntry readFrom(IgniteDataInput input) throws IOException {
int indexId = input.readInt();
- int tableId = input.readInt();
- return new DropIndexEntry(indexId, tableId);
+ return new DropIndexEntry(indexId);
}
@Override
public void writeTo(DropIndexEntry entry, IgniteDataOutput out) throws IOException {
out.writeInt(entry.indexId());
- out.writeInt(entry.tableId());
}
}
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
index 509d284..06ccd5a 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
@@ -18,10 +18,11 @@
package org.apache.ignite.internal.catalog.storage;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Objects;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.commands.CatalogUtils;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
@@ -43,17 +44,13 @@
private final int tableId;
- private final String schemaName;
-
/**
* Constructs the object.
*
* @param tableId An id of a table to drop.
- * @param schemaName A schema name.
*/
- public DropTableEntry(int tableId, String schemaName) {
+ public DropTableEntry(int tableId) {
this.tableId = tableId;
- this.schemaName = schemaName;
}
/** Returns an id of a table to drop. */
@@ -78,7 +75,8 @@
@Override
public Catalog applyUpdate(Catalog catalog, long causalityToken) {
- CatalogSchemaDescriptor schema = Objects.requireNonNull(catalog.schema(schemaName));
+ CatalogTableDescriptor table = tableOrThrow(catalog, tableId);
+ CatalogSchemaDescriptor schema = schemaOrThrow(catalog, table.schemaId());
return new Catalog(
catalog.version(),
@@ -109,15 +107,13 @@
@Override
public DropTableEntry readFrom(IgniteDataInput input) throws IOException {
int tableId = input.readInt();
- String schemaName = input.readUTF();
- return new DropTableEntry(tableId, schemaName);
+ return new DropTableEntry(tableId);
}
@Override
public void writeTo(DropTableEntry entry, IgniteDataOutput out) throws IOException {
out.writeInt(entry.tableId());
- out.writeUTF(entry.schemaName);
}
}
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
index de24c10..9e29a5f 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
@@ -17,11 +17,11 @@
package org.apache.ignite.internal.catalog.storage;
-import static java.util.Objects.requireNonNull;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceSchema;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceTable;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
import static org.apache.ignite.internal.catalog.storage.serialization.CatalogSerializationUtils.readList;
import static org.apache.ignite.internal.catalog.storage.serialization.CatalogSerializationUtils.writeList;
@@ -49,7 +49,6 @@
private final int tableId;
private final List<CatalogTableColumnDescriptor> descriptors;
- private final String schemaName;
/**
* Constructs the object.
@@ -57,10 +56,9 @@
* @param tableId Table id.
* @param descriptors Descriptors of columns to add.
*/
- public NewColumnsEntry(int tableId, List<CatalogTableColumnDescriptor> descriptors, String schemaName) {
+ public NewColumnsEntry(int tableId, List<CatalogTableColumnDescriptor> descriptors) {
this.tableId = tableId;
this.descriptors = descriptors;
- this.schemaName = schemaName;
}
/** Returns table id. */
@@ -90,16 +88,15 @@
@Override
public Catalog applyUpdate(Catalog catalog, long causalityToken) {
- CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);
+ CatalogTableDescriptor table = tableOrThrow(catalog, tableId);
+ CatalogSchemaDescriptor schema = schemaOrThrow(catalog, table.schemaId());
- CatalogTableDescriptor currentTableDescriptor = requireNonNull(catalog.table(tableId));
-
- CatalogTableDescriptor newTableDescriptor = currentTableDescriptor.newDescriptor(
- currentTableDescriptor.name(),
- currentTableDescriptor.tableVersion() + 1,
- CollectionUtils.concat(currentTableDescriptor.columns(), descriptors),
+ CatalogTableDescriptor newTable = table.newDescriptor(
+ table.name(),
+ table.tableVersion() + 1,
+ CollectionUtils.concat(table.columns(), descriptors),
causalityToken,
- currentTableDescriptor.storageProfile()
+ table.storageProfile()
);
return new Catalog(
@@ -107,7 +104,7 @@
catalog.time(),
catalog.objectIdGenState(),
catalog.zones(),
- replaceSchema(replaceTable(schema, newTableDescriptor), catalog.schemas()),
+ replaceSchema(replaceTable(schema, newTable), catalog.schemas()),
defaultZoneIdOpt(catalog)
);
}
@@ -125,16 +122,14 @@
public NewColumnsEntry readFrom(IgniteDataInput in) throws IOException {
List<CatalogTableColumnDescriptor> columns = readList(CatalogTableColumnDescriptor.SERIALIZER, in);
int tableId = in.readInt();
- String schemaName = in.readUTF();
- return new NewColumnsEntry(tableId, columns, schemaName);
+ return new NewColumnsEntry(tableId, columns);
}
@Override
public void writeTo(NewColumnsEntry entry, IgniteDataOutput out) throws IOException {
writeList(entry.descriptors(), CatalogTableColumnDescriptor.SERIALIZER, out);
out.writeInt(entry.tableId());
- out.writeUTF(entry.schemaName);
}
}
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
index 59f294c..b312d99 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
@@ -18,13 +18,15 @@
package org.apache.ignite.internal.catalog.storage;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
import java.io.IOException;
-import java.util.Objects;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.commands.CatalogUtils;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
@@ -44,17 +46,13 @@
private final CatalogIndexDescriptor descriptor;
- private final String schemaName;
-
/**
* Constructs the object.
*
* @param descriptor A descriptor of an index to add.
- * @param schemaName Schema name.
*/
- public NewIndexEntry(CatalogIndexDescriptor descriptor, String schemaName) {
+ public NewIndexEntry(CatalogIndexDescriptor descriptor) {
this.descriptor = descriptor;
- this.schemaName = schemaName;
}
/** Gets descriptor of an index to add. */
@@ -79,7 +77,8 @@
@Override
public Catalog applyUpdate(Catalog catalog, long causalityToken) {
- CatalogSchemaDescriptor schema = Objects.requireNonNull(catalog.schema(schemaName));
+ CatalogTableDescriptor table = tableOrThrow(catalog, descriptor.tableId());
+ CatalogSchemaDescriptor schema = schemaOrThrow(catalog, table.schemaId());
descriptor.updateToken(causalityToken);
@@ -111,15 +110,13 @@
private static class NewIndexEntrySerializer implements CatalogObjectSerializer<NewIndexEntry> {
@Override
public NewIndexEntry readFrom(IgniteDataInput input) throws IOException {
- String schemaName = input.readUTF();
CatalogIndexDescriptor descriptor = CatalogSerializationUtils.IDX_SERIALIZER.readFrom(input);
- return new NewIndexEntry(descriptor, schemaName);
+ return new NewIndexEntry(descriptor);
}
@Override
public void writeTo(NewIndexEntry entry, IgniteDataOutput output) throws IOException {
- output.writeUTF(entry.schemaName);
CatalogSerializationUtils.IDX_SERIALIZER.writeTo(entry.descriptor(), output);
}
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSchemaEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSchemaEntry.java
new file mode 100644
index 0000000..ec809dd
--- /dev/null
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSchemaEntry.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.storage;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogValidationException;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
+import org.apache.ignite.internal.catalog.storage.serialization.CatalogObjectSerializer;
+import org.apache.ignite.internal.catalog.storage.serialization.MarshallableEntryType;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+
+/**
+ * New schema entry.
+ */
+public class NewSchemaEntry implements UpdateEntry {
+ public static final CatalogObjectSerializer<NewSchemaEntry> SERIALIZER = new Serializer();
+
+ private final CatalogSchemaDescriptor descriptor;
+
+ public NewSchemaEntry(CatalogSchemaDescriptor descriptor) {
+ this.descriptor = descriptor;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Catalog applyUpdate(Catalog catalog, long causalityToken) {
+ CatalogSchemaDescriptor schema = catalog.schema(descriptor.name());
+
+ if (schema != null) {
+ throw new CatalogValidationException(format("Schema with name '{}' already exists", schema.name()));
+ }
+
+ descriptor.updateToken(causalityToken);
+
+ List<CatalogSchemaDescriptor> schemas = new ArrayList<>(catalog.schemas().size() + 1);
+ schemas.addAll(catalog.schemas());
+ schemas.add(descriptor);
+
+ return new Catalog(
+ catalog.version(),
+ catalog.time(),
+ catalog.objectIdGenState(),
+ catalog.zones(),
+ schemas,
+ catalog.defaultZone().id()
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int typeId() {
+ return MarshallableEntryType.NEW_SCHEMA.id();
+ }
+
+ private static class Serializer implements CatalogObjectSerializer<NewSchemaEntry> {
+ @Override
+ public NewSchemaEntry readFrom(IgniteDataInput input) throws IOException {
+ CatalogSchemaDescriptor schemaDescriptor = CatalogSchemaDescriptor.SERIALIZER.readFrom(input);
+ return new NewSchemaEntry(schemaDescriptor);
+ }
+
+ @Override
+ public void writeTo(NewSchemaEntry value, IgniteDataOutput output) throws IOException {
+ CatalogSchemaDescriptor.SERIALIZER.writeTo(value.descriptor, output);
+ }
+ }
+}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSystemViewEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSystemViewEntry.java
index 03d130a..924749f 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSystemViewEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSystemViewEntry.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.catalog.storage;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
import java.io.IOException;
import java.util.Arrays;
@@ -45,17 +46,13 @@
private final CatalogSystemViewDescriptor descriptor;
- private final String schemaName;
-
/**
* Constructor.
*
* @param descriptor System view descriptor.
- * @param schemaName A schema name.
*/
- public NewSystemViewEntry(CatalogSystemViewDescriptor descriptor, String schemaName) {
+ public NewSystemViewEntry(CatalogSystemViewDescriptor descriptor) {
this.descriptor = descriptor;
- this.schemaName = schemaName;
}
@Override
@@ -78,7 +75,7 @@
/** {@inheritDoc} */
@Override
public Catalog applyUpdate(Catalog catalog, long causalityToken) {
- CatalogSchemaDescriptor systemSchema = catalog.schema(schemaName);
+ CatalogSchemaDescriptor systemSchema = schemaOrThrow(catalog, descriptor.schemaId());
descriptor.updateToken(causalityToken);
@@ -119,15 +116,13 @@
@Override
public NewSystemViewEntry readFrom(IgniteDataInput input) throws IOException {
CatalogSystemViewDescriptor descriptor = CatalogSystemViewDescriptor.SERIALIZER.readFrom(input);
- String schema = input.readUTF();
- return new NewSystemViewEntry(descriptor, schema);
+ return new NewSystemViewEntry(descriptor);
}
@Override
public void writeTo(NewSystemViewEntry entry, IgniteDataOutput output) throws IOException {
CatalogSystemViewDescriptor.SERIALIZER.writeTo(entry.descriptor, output);
- output.writeUTF(entry.schemaName);
}
}
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
index 563bd5e..42b4e5b 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
@@ -18,10 +18,10 @@
package org.apache.ignite.internal.catalog.storage;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
import java.io.IOException;
import java.util.List;
-import java.util.Objects;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.commands.CatalogUtils;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
@@ -44,17 +44,13 @@
private final CatalogTableDescriptor descriptor;
- private final String schemaName;
-
/**
* Constructs the object.
*
* @param descriptor A descriptor of a table to add.
- * @param schemaName A schema name.
*/
- public NewTableEntry(CatalogTableDescriptor descriptor, String schemaName) {
+ public NewTableEntry(CatalogTableDescriptor descriptor) {
this.descriptor = descriptor;
- this.schemaName = schemaName;
}
/** Returns descriptor of a table to add. */
@@ -79,7 +75,7 @@
@Override
public Catalog applyUpdate(Catalog catalog, long causalityToken) {
- CatalogSchemaDescriptor schema = Objects.requireNonNull(catalog.schema(schemaName));
+ CatalogSchemaDescriptor schema = schemaOrThrow(catalog, descriptor.schemaId());
descriptor.updateToken(causalityToken);
@@ -114,15 +110,13 @@
@Override
public NewTableEntry readFrom(IgniteDataInput input) throws IOException {
CatalogTableDescriptor descriptor = CatalogTableDescriptor.SERIALIZER.readFrom(input);
- String schemaName = input.readUTF();
- return new NewTableEntry(descriptor, schemaName);
+ return new NewTableEntry(descriptor);
}
@Override
public void writeTo(NewTableEntry entry, IgniteDataOutput output) throws IOException {
CatalogTableDescriptor.SERIALIZER.writeTo(entry.descriptor(), output);
- output.writeUTF(entry.schemaName);
}
}
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RenameTableEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RenameTableEntry.java
index 420fc4a..69c3cdb 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RenameTableEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RenameTableEntry.java
@@ -17,10 +17,11 @@
package org.apache.ignite.internal.catalog.storage;
-import static java.util.Objects.requireNonNull;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceSchema;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceTable;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
import java.io.IOException;
import org.apache.ignite.internal.catalog.Catalog;
@@ -64,16 +65,15 @@
@Override
public Catalog applyUpdate(Catalog catalog, long causalityToken) {
- CatalogTableDescriptor tableDescriptor = requireNonNull(catalog.table(tableId));
+ CatalogTableDescriptor table = tableOrThrow(catalog, tableId);
+ CatalogSchemaDescriptor schema = schemaOrThrow(catalog, table.schemaId());
- CatalogSchemaDescriptor schemaDescriptor = requireNonNull(catalog.schema(tableDescriptor.schemaId()));
-
- CatalogTableDescriptor newTableDescriptor = tableDescriptor.newDescriptor(
+ CatalogTableDescriptor newTable = table.newDescriptor(
newTableName,
- tableDescriptor.tableVersion() + 1,
- tableDescriptor.columns(),
+ table.tableVersion() + 1,
+ table.columns(),
causalityToken,
- tableDescriptor.storageProfile()
+ table.storageProfile()
);
return new Catalog(
@@ -81,7 +81,7 @@
catalog.time(),
catalog.objectIdGenState(),
catalog.zones(),
- replaceSchema(replaceTable(schemaDescriptor, newTableDescriptor), catalog.schemas()),
+ replaceSchema(replaceTable(schema, newTable), catalog.schemas()),
defaultZoneIdOpt(catalog)
);
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/CatalogEntrySerializerProvider.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/CatalogEntrySerializerProvider.java
index ed7e523..86167ad 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/CatalogEntrySerializerProvider.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/CatalogEntrySerializerProvider.java
@@ -28,6 +28,7 @@
import org.apache.ignite.internal.catalog.storage.MakeIndexAvailableEntry;
import org.apache.ignite.internal.catalog.storage.NewColumnsEntry;
import org.apache.ignite.internal.catalog.storage.NewIndexEntry;
+import org.apache.ignite.internal.catalog.storage.NewSchemaEntry;
import org.apache.ignite.internal.catalog.storage.NewSystemViewEntry;
import org.apache.ignite.internal.catalog.storage.NewTableEntry;
import org.apache.ignite.internal.catalog.storage.NewZoneEntry;
@@ -79,6 +80,7 @@
serializers[MarshallableEntryType.SNAPSHOT.id()] = SnapshotEntry.SERIALIZER;
serializers[MarshallableEntryType.RENAME_INDEX.id()] = RenameIndexEntry.SERIALIZER;
serializers[MarshallableEntryType.SET_DEFAULT_ZONE.id()] = SetDefaultZoneEntry.SERIALIZER;
+ serializers[MarshallableEntryType.NEW_SCHEMA.id()] = NewSchemaEntry.SERIALIZER;
//noinspection ThisEscapedInObjectConstruction
serializers[MarshallableEntryType.VERSIONED_UPDATE.id()] = new VersionedUpdateSerializer(this);
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/MarshallableEntryType.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/MarshallableEntryType.java
index 5306fc7..8cdb723 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/MarshallableEntryType.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/MarshallableEntryType.java
@@ -40,7 +40,8 @@
SNAPSHOT(16),
VERSIONED_UPDATE(17),
RENAME_INDEX(18),
- SET_DEFAULT_ZONE(19);
+ SET_DEFAULT_ZONE(19),
+ NEW_SCHEMA(20);
/** Type ID. */
private final int id;
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerDescriptorCausalityTokenTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerDescriptorCausalityTokenTest.java
index 5d0eb96..dbc0185 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerDescriptorCausalityTokenTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerDescriptorCausalityTokenTest.java
@@ -66,12 +66,12 @@
@Test
public void testEmptyCatalog() {
- CatalogSchemaDescriptor defaultSchema = manager.schema(DEFAULT_SCHEMA_NAME, 0);
+ CatalogSchemaDescriptor defaultSchema = manager.schema(DEFAULT_SCHEMA_NAME, 1);
assertNotNull(defaultSchema);
assertNull(manager.catalog(0).defaultZone());
assertSame(defaultSchema, manager.activeSchema(DEFAULT_SCHEMA_NAME, clock.nowLong()));
- assertSame(defaultSchema, manager.schema(0));
+ assertSame(defaultSchema, manager.schema(1));
assertSame(defaultSchema, manager.activeSchema(clock.nowLong()));
Catalog catalogWithDefaultZone = manager.catalog(1);
@@ -89,7 +89,7 @@
assertThrows(IllegalStateException.class, () -> manager.activeSchema(-1L));
// Validate default schema.
- assertEquals(INITIAL_CAUSALITY_TOKEN, defaultSchema.updateToken());
+ assertEquals(1, defaultSchema.updateToken());
}
@Test
@@ -108,8 +108,7 @@
assertNotNull(schema);
assertEquals(SCHEMA_NAME, schema.name());
- assertSame(schema, manager.activeSchema(123L));
- assertEquals(INITIAL_CAUSALITY_TOKEN, schema.updateToken());
+ assertEquals(1, schema.updateToken());
assertNull(schema.table(TABLE_NAME));
assertNull(manager.table(TABLE_NAME, 123L));
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index e0a34fb..d15c5ff 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -108,6 +108,7 @@
import org.apache.ignite.internal.catalog.commands.CatalogUtils;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.commands.ColumnParams.Builder;
+import org.apache.ignite.internal.catalog.commands.CreateSchemaCommand;
import org.apache.ignite.internal.catalog.commands.CreateZoneCommand;
import org.apache.ignite.internal.catalog.commands.DefaultValue;
import org.apache.ignite.internal.catalog.commands.DropIndexCommand;
@@ -173,12 +174,12 @@
@Test
public void testEmptyCatalog() {
- CatalogSchemaDescriptor defaultSchema = manager.schema(DEFAULT_SCHEMA_NAME, 0);
+ CatalogSchemaDescriptor defaultSchema = manager.schema(DEFAULT_SCHEMA_NAME, 1);
assertNotNull(defaultSchema);
assertSame(defaultSchema, manager.activeSchema(DEFAULT_SCHEMA_NAME, clock.nowLong()));
- assertSame(defaultSchema, manager.schema(0));
- assertSame(defaultSchema, manager.schema(defaultSchema.id(), 0));
+ assertSame(defaultSchema, manager.schema(1));
+ assertSame(defaultSchema, manager.schema(defaultSchema.id(), 1));
assertSame(defaultSchema, manager.activeSchema(clock.nowLong()));
int nonExistingVersion = manager.latestCatalogVersion() + 1;
@@ -189,7 +190,7 @@
// Validate default schema.
assertEquals(DEFAULT_SCHEMA_NAME, defaultSchema.name());
- assertEquals(0, defaultSchema.id());
+ assertEquals(1, defaultSchema.id());
assertEquals(0, defaultSchema.tables().length);
assertEquals(0, defaultSchema.indexes().length);
@@ -206,15 +207,15 @@
// System schema should exist.
- CatalogSchemaDescriptor systemSchema = manager.schema(SYSTEM_SCHEMA_NAME, 0);
+ CatalogSchemaDescriptor systemSchema = manager.schema(SYSTEM_SCHEMA_NAME, 1);
assertNotNull(systemSchema, "system schema");
assertSame(systemSchema, manager.activeSchema(SYSTEM_SCHEMA_NAME, clock.nowLong()));
- assertSame(systemSchema, manager.schema(SYSTEM_SCHEMA_NAME, 0));
- assertSame(systemSchema, manager.schema(systemSchema.id(), 0));
+ assertSame(systemSchema, manager.schema(SYSTEM_SCHEMA_NAME, 1));
+ assertSame(systemSchema, manager.schema(systemSchema.id(), 1));
// Validate system schema.
assertEquals(SYSTEM_SCHEMA_NAME, systemSchema.name());
- assertEquals(1, systemSchema.id());
+ assertEquals(2, systemSchema.id());
assertEquals(0, systemSchema.tables().length);
assertEquals(0, systemSchema.indexes().length);
@@ -260,6 +261,8 @@
@Test
public void testCreateTable() {
+ long timePriorToTableCreation = clock.nowLong();
+
int tableCreationVersion = await(
manager.execute(createTableCommand(
TABLE_NAME,
@@ -274,8 +277,7 @@
assertNotNull(schema);
assertEquals(SCHEMA_NAME, schema.name());
- assertSame(schema, manager.activeSchema(0L));
- assertSame(schema, manager.activeSchema(123L));
+ assertSame(schema, manager.activeSchema(timePriorToTableCreation));
assertNull(schema.table(TABLE_NAME));
assertNull(manager.table(TABLE_NAME, 123L));
@@ -1136,18 +1138,12 @@
return falseCompletedFuture();
});
- CompletableFuture<?> createTableFut = manager.execute(List.of(
- CreateZoneCommand.builder()
- .zoneName("TEST_ZONE")
- .storageProfilesParams(List.of(StorageProfileParams.builder().storageProfile(DEFAULT_STORAGE_PROFILE).build()))
- .build(),
- AlterZoneSetDefaultCommand.builder()
- .zoneName("TEST_ZONE")
- .build(),
- simpleTable("T")
- ));
+ // It should not matter what a command does
+ CatalogCommand catalogCommand = catalog -> List.of(new ObjectIdGenUpdateEntry(1));
- assertThat(createTableFut, willThrow(IgniteInternalException.class, "Max retry limit exceeded"));
+ CompletableFuture<?> fut = manager.execute(List.of(catalogCommand));
+
+ assertThat(fut, willThrow(IgniteInternalException.class, "Max retry limit exceeded"));
// retry limit is hardcoded at org.apache.ignite.internal.catalog.CatalogServiceImpl.MAX_RETRY_COUNT
verify(updateLogMock, times(10)).append(any());
@@ -1203,7 +1199,7 @@
assertFalse(createTableFuture2.isDone());
- verify(clockWaiter, timeout(10_000).times(2)).waitFor(any());
+ verify(clockWaiter, timeout(10_000).times(3)).waitFor(any());
Catalog catalog0 = manager.catalog(manager.latestCatalogVersion());
@@ -1832,7 +1828,6 @@
StoppingIndexEventParameters stoppingEventParameters = stoppingCaptor.getValue();
assertEquals(indexId, stoppingEventParameters.indexId());
- assertEquals(tableId, stoppingEventParameters.tableId());
// Let's drop the table.
assertThat(manager.execute(dropTableCommand(TABLE_NAME)), willCompleteSuccessfully());
@@ -2704,6 +2699,24 @@
}
@Test
+ public void testCreateSchema() {
+ String schemaName = "S1";
+
+ assertThat(manager.execute(CreateSchemaCommand.builder().name(schemaName).build()), willCompleteSuccessfully());
+
+ Catalog latestCatalog = manager.catalog(manager.activeCatalogVersion(clock.nowLong()));
+
+ assertNotNull(latestCatalog);
+ assertNotNull(latestCatalog.schema(schemaName));
+ assertNotNull(latestCatalog.schema(DEFAULT_SCHEMA_NAME));
+
+ assertThat(
+ manager.execute(CreateSchemaCommand.builder().name(schemaName).build()),
+ willThrowFast(CatalogValidationException.class, "Schema with name 'S1' already exists")
+ );
+ }
+
+ @Test
public void testCatalogCompaction() throws Exception {
assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully());
assertThat(manager.execute(simpleTable(TABLE_NAME_2)), willCompleteSuccessfully());
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogSystemViewTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogSystemViewTest.java
index 4d06b4a..0474a24 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogSystemViewTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogSystemViewTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.catalog;
import static java.util.stream.Collectors.toList;
-import static org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
import static org.apache.ignite.internal.catalog.CatalogService.SYSTEM_SCHEMA_NAME;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
@@ -119,7 +118,7 @@
CatalogSchemaDescriptor schema = manager.activeSchema(clock.nowLong());
assertNotNull(schema);
- assertEquals(INITIAL_CAUSALITY_TOKEN, schema.updateToken());
+ assertEquals(1, schema.updateToken());
assertThat(manager.execute(command), willCompleteSuccessfully());
@@ -131,7 +130,7 @@
schema = manager.activeSchema(clock.nowLong());
assertNotNull(schema);
long schemaCausalityToken = schema.updateToken();
- assertEquals(INITIAL_CAUSALITY_TOKEN, schemaCausalityToken);
+ assertEquals(1, schemaCausalityToken);
// Assert that creation of the system view updates token for the descriptor.
assertTrue(systemSchema.updateToken() > schemaCausalityToken);
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java
index a1e24c4..751b537 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java
@@ -389,7 +389,7 @@
Exception e = assertThrows(CatalogValidationException.class, () -> replaceIndex(schema, index));
- assertThat(e.getMessage(), is(String.format("Index with ID %d has not been found in schema with ID %d", index.id(), 0)));
+ assertThat(e.getMessage(), is(String.format("Index with ID %d has not been found in schema with ID %d", index.id(), 1)));
}
@Test
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommandValidationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommandValidationTest.java
new file mode 100644
index 0000000..ff85120
--- /dev/null
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommandValidationTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.commands;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogValidationException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests to verify validation of {@link CreateSchemaCommand}.
+ */
+public class CreateSchemaCommandValidationTest extends AbstractCommandValidationTest {
+
+ @ParameterizedTest(name = "[{index}] ''{argumentsWithNames}''")
+ @MethodSource("nullAndBlankStrings")
+ void schemaNameMustNotBeNullOrBlank(String name) {
+ CreateSchemaCommandBuilder builder = CreateSchemaCommand.builder().name(name);
+
+ assertThrows(
+ CatalogValidationException.class,
+ builder::build,
+ "Name of the schema can't be null or blank"
+ );
+ }
+
+ @Test
+ void commandFailsWhenSchemaAlreadyExists() {
+ String schemaName = "TEST";
+
+ CreateSchemaCommandBuilder builder = CreateSchemaCommand.builder().name(schemaName);
+
+ Catalog catalog = catalogWithSchema(schemaName);
+
+ assertThrows(
+ CatalogValidationException.class,
+ () -> builder.build().get(catalog),
+ "Schema with name 'TEST' already exists"
+ );
+ }
+
+ private static Catalog catalogWithSchema(String schemaName) {
+ return catalog(CreateSchemaCommand.builder().name(schemaName).build());
+ }
+}
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptorTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptorTest.java
index 41ed56c..92879a1 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptorTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptorTest.java
@@ -30,6 +30,7 @@
void toStringContainsTypeAndFields() {
var descriptor = new CatalogSystemViewDescriptor(
1,
+ 2,
"view1",
List.of(),
SystemViewType.NODE
@@ -39,6 +40,7 @@
assertThat(toString, startsWith("CatalogSystemViewDescriptor ["));
assertThat(toString, containsString("id=1"));
+ assertThat(toString, containsString("schemaId=2"));
assertThat(toString, containsString("name=view1"));
assertThat(toString, containsString("systemViewType=NODE"));
}
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/CatalogEntrySerializationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/CatalogEntrySerializationTest.java
index ac0e23b..4480404 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/CatalogEntrySerializationTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/CatalogEntrySerializationTest.java
@@ -106,15 +106,15 @@
break;
case DROP_COLUMN:
- checkSerialization(new DropColumnsEntry(1, Set.of("C1", "C2"), "PUBLIC"));
+ checkSerialization(new DropColumnsEntry(1, Set.of("C1", "C2")));
break;
case DROP_INDEX:
- checkSerialization(new DropIndexEntry(231, 23), new DropIndexEntry(231, 1));
+ checkSerialization(new DropIndexEntry(231), new DropIndexEntry(231));
break;
case DROP_TABLE:
- checkSerialization(new DropTableEntry(23, "PUBLIC"), new DropTableEntry(3, "SYSTEM"));
+ checkSerialization(new DropTableEntry(23), new DropTableEntry(3));
break;
case DROP_ZONE:
@@ -169,6 +169,11 @@
checkSerialization(new SetDefaultZoneEntry(1), new SetDefaultZoneEntry(Integer.MAX_VALUE));
break;
+ case NEW_SCHEMA:
+ checkSerialization(new NewSchemaEntry(new CatalogSchemaDescriptor(
+ 0, "S", new CatalogTableDescriptor[0], new CatalogIndexDescriptor[0], new CatalogSystemViewDescriptor[0], 0)));
+ break;
+
default:
throw new UnsupportedOperationException("Test not implemented " + type);
}
@@ -282,10 +287,10 @@
newCatalogTableColumnDescriptor("c2", DefaultValue.functionCall("function"));
CatalogTableColumnDescriptor desc4 = newCatalogTableColumnDescriptor("c3", DefaultValue.constant(null));
- UpdateEntry entry1 = new AlterColumnEntry(1, desc1, "public");
- UpdateEntry entry2 = new AlterColumnEntry(1, desc2, "public");
- UpdateEntry entry3 = new AlterColumnEntry(1, desc3, "public");
- UpdateEntry entry4 = new AlterColumnEntry(1, desc4, "public");
+ UpdateEntry entry1 = new AlterColumnEntry(1, desc1);
+ UpdateEntry entry2 = new AlterColumnEntry(1, desc2);
+ UpdateEntry entry3 = new AlterColumnEntry(1, desc3);
+ UpdateEntry entry4 = new AlterColumnEntry(1, desc4);
VersionedUpdate update = newVersionedUpdate(entry1, entry2, entry3, entry4);
@@ -296,7 +301,7 @@
CatalogTableColumnDescriptor columnDescriptor1 = newCatalogTableColumnDescriptor("c1", DefaultValue.constant(null));
CatalogTableColumnDescriptor columnDescriptor2 = newCatalogTableColumnDescriptor("c2", DefaultValue.functionCall("func"));
- NewColumnsEntry entry = new NewColumnsEntry(11, List.of(columnDescriptor1, columnDescriptor2), "PUBLIC");
+ NewColumnsEntry entry = new NewColumnsEntry(11, List.of(columnDescriptor1, columnDescriptor2));
VersionedUpdate update = newVersionedUpdate(entry);
@@ -307,8 +312,8 @@
CatalogSortedIndexDescriptor sortedIndexDescriptor = newSortedIndexDescriptor("idx1");
CatalogHashIndexDescriptor hashIndexDescriptor = newHashIndexDescriptor("idx2");
- NewIndexEntry sortedIdxEntry = new NewIndexEntry(sortedIndexDescriptor, "PUBLIC");
- NewIndexEntry hashIdxEntry = new NewIndexEntry(hashIndexDescriptor, "PUBLIC");
+ NewIndexEntry sortedIdxEntry = new NewIndexEntry(sortedIndexDescriptor);
+ NewIndexEntry hashIdxEntry = new NewIndexEntry(hashIndexDescriptor);
VersionedUpdate update = newVersionedUpdate(sortedIdxEntry, hashIdxEntry);
@@ -323,10 +328,10 @@
List<CatalogTableColumnDescriptor> columns = List.of(col1, col2, col3, col4);
- NewTableEntry entry1 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), null), "PUBLIC");
- NewTableEntry entry2 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), List.of()), "PUBLIC");
- NewTableEntry entry3 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), List.of("c2")), "PUBLIC");
- NewTableEntry entry4 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), List.of("c1")), "PUBLIC");
+ NewTableEntry entry1 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), null));
+ NewTableEntry entry2 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), List.of()));
+ NewTableEntry entry3 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), List.of("c2")));
+ NewTableEntry entry4 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), List.of("c1")));
VersionedUpdate update = newVersionedUpdate(entry1, entry2, entry3, entry4);
VersionedUpdate deserialized = serialize(update);
@@ -342,12 +347,12 @@
CatalogTableColumnDescriptor col2 = newCatalogTableColumnDescriptor("c2", null);
CatalogSystemViewDescriptor nodeDesc =
- new CatalogSystemViewDescriptor(1, "view1", List.of(col1, col2), SystemViewType.NODE);
+ new CatalogSystemViewDescriptor(1, 2, "view1", List.of(col1, col2), SystemViewType.NODE);
CatalogSystemViewDescriptor clusterDesc =
- new CatalogSystemViewDescriptor(1, "view1", List.of(col1, col2), SystemViewType.CLUSTER);
+ new CatalogSystemViewDescriptor(1, 2, "view1", List.of(col1, col2), SystemViewType.CLUSTER);
- NewSystemViewEntry nodeEntry = new NewSystemViewEntry(nodeDesc, "PUBLIC");
- NewSystemViewEntry clusterEntry = new NewSystemViewEntry(clusterDesc, "PUBLIC");
+ NewSystemViewEntry nodeEntry = new NewSystemViewEntry(nodeDesc);
+ NewSystemViewEntry clusterEntry = new NewSystemViewEntry(clusterDesc);
VersionedUpdate update = newVersionedUpdate(nodeEntry, clusterEntry);
@@ -371,8 +376,8 @@
};
CatalogSystemViewDescriptor[] views = {
- new CatalogSystemViewDescriptor(1, "view1", columns, SystemViewType.NODE),
- new CatalogSystemViewDescriptor(1, "view2", columns, SystemViewType.CLUSTER)
+ new CatalogSystemViewDescriptor(1, 2, "view1", columns, SystemViewType.NODE),
+ new CatalogSystemViewDescriptor(1, 2, "view2", columns, SystemViewType.CLUSTER)
};
CatalogStorageProfilesDescriptor profiles =
diff --git a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
index d6491b6..6e9722f 100644
--- a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
+++ b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
@@ -158,7 +158,12 @@
@Override
public CompletableFuture<Void> catalogReadyFuture(int version) {
- return null;
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> catalogInitializationFuture() {
+ return CompletableFuture.completedFuture(null);
}
@Override
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 5a4bfc0..47be872 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -69,7 +69,6 @@
* <p>To avoid errors when using indexes while applying replication log during node recovery, the registration of indexes was moved to the
* start of the tables.</p>
*/
-// TODO: IGNITE-19082 Delete this class
public class IndexManager implements IgniteComponent {
private static final IgniteLogger LOG = Loggers.forClass(IndexManager.class);
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index c90b014..8503e0f 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1114,6 +1114,7 @@
return cmgMgr.onJoinReady();
}, startupExecutor)
.thenComposeAsync(ignored -> awaitSelfInLocalLogicalTopology(), startupExecutor)
+ .thenCompose(ignored -> catalogManager.catalogInitializationFuture())
.thenRunAsync(() -> {
try {
// Enable watermark events.
diff --git a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
index ec8875f1..9f044df 100644
--- a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
+++ b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
@@ -266,6 +266,7 @@
return TestIgnitionManager.start(nodeName, config, workDir.resolve(nodeName))
.thenApply(IgniteImpl.class::cast)
+ .thenCompose(ignite -> ignite.catalogManager().catalogInitializationFuture().thenApply(ignored -> ignite))
.thenApply(ignite -> {
synchronized (nodes) {
while (nodes.size() < nodeIndex) {
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
index c30388f..58101d6 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.schema.registry;
-import static java.util.Collections.unmodifiableMap;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
@@ -38,7 +37,6 @@
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.jetbrains.annotations.Nullable;
-import org.jetbrains.annotations.TestOnly;
/**
* Caching registry of actual schema descriptors for a table.
@@ -245,7 +243,7 @@
* @param rowSchema Row schema.
* @return Column mapper for target schema.
*/
- ColumnMapper resolveMapping(SchemaDescriptor curSchema, SchemaDescriptor rowSchema) {
+ private ColumnMapper resolveMapping(SchemaDescriptor curSchema, SchemaDescriptor rowSchema) {
assert curSchema.version() > rowSchema.version();
if (curSchema.version() == rowSchema.version() + 1) {
@@ -254,9 +252,9 @@
long mappingKey = (((long) curSchema.version()) << 32) | (rowSchema.version());
- ColumnMapper mapping;
+ ColumnMapper mapping = mappingCache.get(mappingKey);
- if ((mapping = mappingCache.get(mappingKey)) != null) {
+ if (mapping != null) {
return mapping;
}
@@ -292,34 +290,6 @@
makeSchemaVersionAvailable(desc);
}
- /**
- * Cleanup given schema version from history.
- *
- * @param ver Schema version to remove.
- * @throws SchemaRegistryException If incorrect schema version provided.
- */
- public void onSchemaDropped(int ver) {
- int lastVer = schemaCache.lastKey();
-
- if (ver <= 0 || ver >= lastVer || ver > schemaCache.keySet().first()) {
- throw new SchemaRegistryException("Incorrect schema version to clean up to: " + ver);
- }
-
- if (schemaCache.remove(ver) != null) {
- mappingCache.keySet().removeIf(k -> (k & 0xFFFF_FFFFL) == ver);
- }
- }
-
- /**
- * For test purposes only.
- *
- * @return ColumnMapping cache.
- */
- @TestOnly
- Map<Long, ColumnMapper> mappingCache() {
- return unmodifiableMap(mappingCache);
- }
-
private CompletableFuture<SchemaDescriptor> tableSchemaAsync(int schemaVer) {
if (schemaVer < lastKnownSchemaVersion()) {
return completedFuture(loadStoredSchemaByVersion(schemaVer));
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
index 25e7a7b..cecab95 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
@@ -21,7 +21,6 @@
import static java.util.stream.Collectors.toMap;
import static org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor.INITIAL_TABLE_VERSION;
import static org.apache.ignite.internal.schema.mapping.ColumnMapping.createMapper;
-import static org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowFast;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
@@ -30,7 +29,6 @@
import static org.apache.ignite.internal.type.NativeTypes.INT64;
import static org.apache.ignite.internal.type.NativeTypes.STRING;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -45,7 +43,6 @@
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaUtils;
-import org.apache.ignite.internal.schema.mapping.ColumnMapper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -206,109 +203,6 @@
}
/**
- * Check schema cleanup.
- */
- @Test
- public void testSchemaCleanup() {
- final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
- new Column[]{new Column("keyLongCol", INT64, false)},
- new Column[]{
- new Column("valStringCol", STRING, true)
- });
-
- final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
- new Column[]{new Column("keyLongCol", INT64, false)},
- new Column[]{
- new Column("valBytesCol", BYTES, true),
- new Column("valStringCol", STRING, true)
- });
-
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, schemaV1);
-
- assertEquals(INITIAL_TABLE_VERSION, reg.lastKnownSchemaVersion());
-
- // Fail to cleanup initial schema
- assertThrows(SchemaRegistryException.class, () -> reg.onSchemaDropped(INITIAL_TABLE_VERSION));
- assertThrows(SchemaRegistryException.class, () -> reg.onSchemaDropped(0));
-
- // Register schema with very first version.
- assertThrows(SchemaRegistrationConflictException.class, () -> reg.onSchemaRegistered(schemaV1));
-
- assertEquals(1, reg.lastKnownSchemaVersion());
- assertNotNull(reg.lastKnownSchema());
- assertNotNull(reg.schema(1));
-
- // Try to remove latest schema.
- assertThrows(SchemaRegistryException.class, () -> reg.onSchemaDropped(1));
-
- assertEquals(1, reg.lastKnownSchemaVersion());
- assertNotNull(reg.lastKnownSchema());
- assertNotNull(reg.schema(1));
-
- // Register new schema with next version.
- reg.onSchemaRegistered(schemaV2);
- reg.onSchemaRegistered(schemaV3);
-
- assertEquals(3, reg.lastKnownSchemaVersion());
- assertNotNull(reg.schema(1));
- assertNotNull(reg.schema(2));
- assertNotNull(reg.schema(3));
-
- // Remove outdated schema 1.
- reg.onSchemaDropped(1);
-
- assertEquals(3, reg.lastKnownSchemaVersion());
- assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
- assertNotNull(reg.schema(2));
- assertNotNull(reg.schema(3));
-
- // Remove non-existed schemas.
- reg.onSchemaDropped(1);
-
- assertEquals(3, reg.lastKnownSchemaVersion());
- assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
- assertNotNull(reg.schema(2));
- assertNotNull(reg.schema(3));
-
- // Register new schema with next version.
- reg.onSchemaRegistered(schemaV4);
-
- assertEquals(4, reg.lastKnownSchemaVersion());
- assertNotNull(reg.schema(2));
- assertNotNull(reg.schema(3));
- assertNotNull(reg.schema(4));
-
- // Remove non-existed schemas.
- reg.onSchemaDropped(1);
-
- assertEquals(4, reg.lastKnownSchemaVersion());
- assertSameSchema(schemaV4, reg.lastKnownSchema());
- assertSameSchema(schemaV2, reg.schema(2));
- assertSameSchema(schemaV3, reg.schema(3));
- assertSameSchema(schemaV4, reg.schema(4));
-
- // Out of order remove.
- assertThrows(SchemaRegistryException.class, () -> reg.onSchemaDropped(3));
-
- // Correct removal order.
- reg.onSchemaDropped(2);
- reg.onSchemaDropped(3);
-
- assertEquals(4, reg.lastKnownSchemaVersion());
- assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
- assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
- assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
- assertSameSchema(schemaV4, reg.lastKnownSchema());
- assertSameSchema(schemaV4, reg.schema(4));
-
- // Try to remove latest schema.
- assertThrows(SchemaRegistryException.class, () -> reg.onSchemaDropped(4));
-
- assertEquals(4, reg.lastKnownSchemaVersion());
- assertSameSchema(schemaV4, reg.schema(4));
- }
-
- /**
* Check schema registration with full history.
*/
@Test
@@ -420,105 +314,6 @@
assertSameSchema(schemaV4, reg.schema(4));
}
- /**
- * Check schema cleanup.
- */
- @Test
- public void testSchemaWithHistoryCleanup() {
- final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
- new Column[]{new Column("keyLongCol", INT64, false)},
- new Column[]{
- new Column("valStringCol", STRING, true)
- });
-
- final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
- new Column[]{new Column("keyLongCol", INT64, false)},
- new Column[]{
- new Column("valBytesCol", BYTES, true),
- new Column("valStringCol", STRING, true)
- });
-
- Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV2, schemaV3, schemaV4);
-
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, schemaV4);
-
- assertEquals(4, reg.lastKnownSchemaVersion());
- assertSameSchema(schemaV4, reg.lastKnownSchema());
- assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
- assertSameSchema(schemaV2, reg.schema(2));
- assertSameSchema(schemaV3, reg.schema(3));
- assertSameSchema(schemaV4, reg.schema(4));
-
- history.remove(1);
- reg.onSchemaDropped(1);
-
- assertEquals(4, reg.lastKnownSchemaVersion());
- assertNotNull(reg.schema(2));
- assertNotNull(reg.schema(3));
- assertNotNull(reg.schema(4));
-
- history.remove(2);
- history.remove(3);
- reg.onSchemaDropped(2);
- reg.onSchemaDropped(3);
-
- assertEquals(4, reg.lastKnownSchemaVersion());
- assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
- assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
- assertNotNull(reg.schema(4));
- }
-
- /**
- * Check schema cache cleanup.
- */
- @Test
- public void testSchemaCacheCleanup() {
- final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
- new Column[]{new Column("keyLongCol", INT64, false)},
- new Column[]{
- new Column("valStringCol", STRING, true)
- });
-
- schemaV3.columnMapping(createMapper(schemaV3).add(
- schemaV3.column("valStringCol").positionInRow(),
- schemaV2.column("valStringCol").positionInRow())
- );
-
- final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
- new Column[]{new Column("keyLongCol", INT64, false)},
- new Column[]{
- new Column("valBytesCol", BYTES, true),
- new Column("valStringCol", STRING, true)
- });
-
- schemaV4.columnMapping(createMapper(schemaV4).add(schemaV4.column("valBytesCol")));
-
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, schemaV1);
-
- final Map<Long, ColumnMapper> cache = reg.mappingCache();
-
- assertThrows(SchemaRegistrationConflictException.class, () -> reg.onSchemaRegistered(schemaV1));
- reg.onSchemaRegistered(schemaV2);
- reg.onSchemaRegistered(schemaV3);
- reg.onSchemaRegistered(schemaV4);
-
- assertEquals(0, cache.size());
-
- reg.resolveMapping(schemaV4, schemaV1);
- reg.resolveMapping(schemaV3, schemaV1);
- reg.resolveMapping(schemaV4, schemaV2);
-
- assertEquals(3, cache.size());
-
- reg.onSchemaDropped(schemaV1.version());
-
- assertEquals(1, cache.size());
-
- reg.onSchemaDropped(schemaV2.version());
-
- assertEquals(0, cache.size());
- }
-
@Test
void schemaAsyncReturnsExpectedResults() {
Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV1);
@@ -536,19 +331,6 @@
assertThat(schema2Future, willBe(schemaV2));
}
- @Test
- void schemaAsyncReturnsExceptionForCompactedAwayVersion() {
- Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV1, schemaV2);
-
- SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, schemaV2);
-
- history.remove(1);
- reg.onSchemaDropped(1);
-
- SchemaRegistryException ex = assertWillThrowFast(reg.schemaAsync(1), SchemaRegistryException.class);
- assertThat(ex.getMessage(), is("Failed to find schema (was it compacted away?) [version=1]"));
- }
-
/**
* SchemaHistory.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
index 8733e38..a2aaff1 100644
--- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
+++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
@@ -22,7 +22,6 @@
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.SYSTEM_SCHEMAS;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
-import static org.apache.ignite.internal.table.TableTestUtils.getTableStrict;
import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_PARSE_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -301,25 +300,6 @@
}
/**
- * Checks that schema version is updated even if column names are intersected.
- */
- // Need to be removed after https://issues.apache.org/jira/browse/IGNITE-19082
- @Test
- public void checkSchemaUpdatedWithEqAlterColumn() {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- IgniteImpl node = CLUSTER.aliveNode();
-
- int tableVersionBefore = getTableStrict(node.catalogManager(), "TEST", node.clock().nowLong()).tableVersion();
-
- sql("ALTER TABLE TEST ADD COLUMN (VAL1 INT)");
-
- int tableVersionAfter = getTableStrict(node.catalogManager(), "TEST", node.clock().nowLong()).tableVersion();
-
- assertEquals(tableVersionBefore + 1, tableVersionAfter);
- }
-
- /**
* Check explicit colocation columns configuration.
*/
@Test
diff --git a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java b/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java
index 75667d9..1a37242 100644
--- a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java
+++ b/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java
@@ -117,8 +117,7 @@
.map(SystemViewUtils::toSystemViewCreateCommand)
.collect(Collectors.toList());
- catalogManager.execute(commands).whenComplete(
- (r, t) -> {
+ catalogManager.catalogReadyFuture(1).thenCompose((x) -> catalogManager.execute(commands)).whenComplete((r, t) -> {
viewsRegistrationFuture.complete(null);
if (t != null) {
diff --git a/modules/system-view/src/test/java/org/apache/ignite/internal/systemview/SystemViewManagerTest.java b/modules/system-view/src/test/java/org/apache/ignite/internal/systemview/SystemViewManagerTest.java
index 2f8c9cf..1ee56d5 100644
--- a/modules/system-view/src/test/java/org/apache/ignite/internal/systemview/SystemViewManagerTest.java
+++ b/modules/system-view/src/test/java/org/apache/ignite/internal/systemview/SystemViewManagerTest.java
@@ -39,10 +39,12 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.BitSet;
@@ -75,7 +77,6 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
-import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -86,8 +87,7 @@
public class SystemViewManagerTest extends BaseIgniteAbstractTest {
private static final String LOCAL_NODE_NAME = "LOCAL_NODE_NAME";
- @Mock
- private CatalogManager catalog;
+ private final CatalogManager catalog = Mockito.mock(CatalogManager.class);
private SystemViewManagerImpl viewMgr;
@@ -119,13 +119,16 @@
@Test
public void startAfterStartFails() {
- Mockito.when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
+ when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
+ when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
+ when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
viewMgr.register(() -> List.of(dummyView("test")));
assertThat(viewMgr.startAsync(), willCompleteSuccessfully());
- verify(catalog, only()).execute(anyList());
+ verify(catalog, times(1)).execute(anyList());
+ reset(catalog);
assertThrows(IllegalStateException.class, viewMgr::startAsync);
@@ -148,12 +151,14 @@
public void registerAllColumnTypes(NativeTypeSpec typeSpec) {
NativeType type = SchemaTestUtils.specToType(typeSpec);
- Mockito.when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
+ when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
+ when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
+ when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
viewMgr.register(() -> List.of(dummyView("test", type)));
assertThat(viewMgr.startAsync(), willCompleteSuccessfully());
- verify(catalog, only()).execute(anyList());
+ verify(catalog, times(1)).execute(anyList());
assertTrue(viewMgr.completeRegistration().isDone());
}
@@ -161,20 +166,24 @@
public void managerStartsSuccessfullyEvenIfCatalogRespondsWithError() {
CatalogValidationException expected = new CatalogValidationException("Expected exception.");
- Mockito.when(catalog.execute(anyList())).thenReturn(failedFuture(expected));
+ when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
+ when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
+ when(catalog.execute(anyList())).thenReturn(failedFuture(expected));
viewMgr.register(() -> List.of(dummyView("test")));
assertThat(viewMgr.startAsync(), willCompleteSuccessfully());
- verify(catalog, only()).execute(anyList());
+ verify(catalog, times(1)).execute(anyList());
assertThat(viewMgr.completeRegistration(), willBe(nullValue()));
}
@Test
public void nodeAttributesUpdatedAfterStart() {
- Mockito.when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
+ when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
+ when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
+ when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
String name1 = "view1";
String name2 = "view2";
@@ -185,9 +194,6 @@
assertThat(viewMgr.startAsync(), willCompleteSuccessfully());
- verify(catalog, only()).execute(anyList());
- verifyNoMoreInteractions(catalog);
-
assertThat(viewMgr.nodeAttributes(), is(Map.of(NODE_ATTRIBUTES_KEY, String.join(NODE_ATTRIBUTES_LIST_SEPARATOR, name1.toUpperCase(
Locale.ROOT), name2.toUpperCase(Locale.ROOT)))));
}
@@ -251,7 +257,9 @@
@Test
void viewScanTest() {
- Mockito.when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
+ when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
+ when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
+ when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
String nodeView = "NODE_VIEW";
String clusterView = "CLUSTER_VIEW";