IGNITE-19082: Catalog. Cleanup dead code (#3669)

diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index 14e90a1..070c7a8 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -47,6 +47,7 @@
 import java.util.concurrent.Flow.Publisher;
 import java.util.function.LongSupplier;
 import org.apache.ignite.internal.catalog.commands.AlterZoneSetDefaultCommand;
+import org.apache.ignite.internal.catalog.commands.CreateSchemaCommand;
 import org.apache.ignite.internal.catalog.commands.CreateZoneCommand;
 import org.apache.ignite.internal.catalog.commands.StorageProfileParams;
 import org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor;
@@ -119,6 +120,9 @@
     /** Versioned catalog descriptors sorted in chronological order. */
     private final NavigableMap<Long, Catalog> catalogByTs = new ConcurrentSkipListMap<>();
 
+    /** A future that completes when an empty catalog is initialised. If catalog is not empty this future when this completes starts. */
+    private final CompletableFuture<Void> catalogInitializationFuture = new CompletableFuture<>();
+
     private final UpdateLog updateLog;
 
     private final PendingComparableValuesTracker<Integer, Void> versionTracker = new PendingComparableValuesTracker<>(0);
@@ -158,27 +162,7 @@
     public CompletableFuture<Void> startAsync() {
         int objectIdGen = 0;
 
-        // TODO: IGNITE-19082 Move default schema objects initialization to cluster init procedure.
-        CatalogSchemaDescriptor publicSchema = new CatalogSchemaDescriptor(
-                objectIdGen++,
-                DEFAULT_SCHEMA_NAME,
-                new CatalogTableDescriptor[0],
-                new CatalogIndexDescriptor[0],
-                new CatalogSystemViewDescriptor[0],
-                INITIAL_CAUSALITY_TOKEN
-        );
-
-        // TODO: IGNITE-19082 Move system schema objects initialization to cluster init procedure.
-        CatalogSchemaDescriptor systemSchema = new CatalogSchemaDescriptor(
-                objectIdGen++,
-                SYSTEM_SCHEMA_NAME,
-                new CatalogTableDescriptor[0],
-                new CatalogIndexDescriptor[0],
-                new CatalogSystemViewDescriptor[0],
-                INITIAL_CAUSALITY_TOKEN
-        );
-
-        Catalog emptyCatalog = new Catalog(0, 0L, objectIdGen, List.of(), List.of(publicSchema, systemSchema), null);
+        Catalog emptyCatalog = new Catalog(0, 0L, objectIdGen, List.of(), List.of(), null);
 
         registerCatalog(emptyCatalog);
 
@@ -187,12 +171,17 @@
         return updateLog.startAsync()
                 .thenCompose(none -> {
                     if (latestCatalogVersion() == emptyCatalog.version()) {
-                        // node has not seen any updates yet, let's try to initialise
-                        // catalog with default zone
-                        return createDefaultZone(emptyCatalog);
-                    }
+                        int initializedCatalogVersion = emptyCatalog.version() + 1;
 
-                    return nullCompletedFuture();
+                        this.catalogReadyFuture(initializedCatalogVersion)
+                                .thenCompose(ignored -> awaitVersionActivation(initializedCatalogVersion))
+                                .handle((r, e) -> catalogInitializationFuture.complete(null));
+
+                        return initCatalog(emptyCatalog);
+                    } else {
+                        catalogInitializationFuture.complete(null);
+                        return nullCompletedFuture();
+                    }
                 });
     }
 
@@ -205,7 +194,11 @@
 
     @Override
     public @Nullable CatalogTableDescriptor table(String tableName, long timestamp) {
-        return catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME).table(tableName);
+        CatalogSchemaDescriptor schema = catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME);
+        if (schema == null) {
+            return null;
+        }
+        return schema.table(tableName);
     }
 
     @Override
@@ -225,7 +218,11 @@
 
     @Override
     public @Nullable CatalogIndexDescriptor aliveIndex(String indexName, long timestamp) {
-        return catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME).aliveIndex(indexName);
+        CatalogSchemaDescriptor schema = catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME);
+        if (schema == null) {
+            return null;
+        }
+        return schema.aliveIndex(indexName);
     }
 
     @Override
@@ -322,6 +319,11 @@
     }
 
     @Override
+    public CompletableFuture<Void> catalogInitializationFuture() {
+        return catalogInitializationFuture;
+    }
+
+    @Override
     public @Nullable Catalog catalog(int catalogVersion) {
         return catalogByVer.get(catalogVersion);
     }
@@ -363,8 +365,9 @@
         return updateLog.saveSnapshot(new SnapshotEntry(catalog));
     }
 
-    private CompletableFuture<Void> createDefaultZone(Catalog emptyCatalog) {
-        List<UpdateEntry> createZoneEntries = new BulkUpdateProducer(List.of(
+    private CompletableFuture<Void> initCatalog(Catalog emptyCatalog) {
+        List<CatalogCommand> initCommands = List.of(
+                // Init default zone
                 CreateZoneCommand.builder()
                         .zoneName(DEFAULT_ZONE_NAME)
                         .partitions(DEFAULT_PARTITION_COUNT)
@@ -378,10 +381,15 @@
                         .build(),
                 AlterZoneSetDefaultCommand.builder()
                         .zoneName(DEFAULT_ZONE_NAME)
-                        .build()
-        )).get(emptyCatalog);
+                        .build(),
+                // Add schemas
+                CreateSchemaCommand.builder().name(DEFAULT_SCHEMA_NAME).build(),
+                CreateSchemaCommand.builder().name(SYSTEM_SCHEMA_NAME).build()
+        );
 
-        return updateLog.append(new VersionedUpdate(emptyCatalog.version() + 1, 0L, createZoneEntries))
+        List<UpdateEntry> entries = new BulkUpdateProducer(initCommands).get(emptyCatalog);
+
+        return updateLog.append(new VersionedUpdate(emptyCatalog.version() + 1, 0L, entries))
                 .handle((result, error) -> {
                     if (error != null) {
                         LOG.warn("Unable to create default zone.", error);
@@ -407,13 +415,7 @@
         CompletableFuture<Integer> resultFuture = new CompletableFuture<>();
 
         saveUpdate(updateProducer, 0)
-                .thenCompose(newVersion -> {
-                    Catalog catalog = catalogByVer.get(newVersion);
-
-                    HybridTimestamp tsSafeForRoReadingInPastOptimization = calcClusterWideEnsureActivationTime(catalog);
-
-                    return clockService.waitFor(tsSafeForRoReadingInPastOptimization).thenApply(unused -> newVersion);
-                })
+                .thenCompose(this::awaitVersionActivation)
                 .whenComplete((newVersion, err) -> {
                     if (err != null) {
                         Throwable errUnwrapped = ExceptionUtils.unwrapCause(err);
@@ -448,6 +450,14 @@
         return resultFuture;
     }
 
+    private CompletableFuture<Integer> awaitVersionActivation(int version) {
+        Catalog catalog = catalogByVer.get(version);
+
+        HybridTimestamp tsSafeForRoReadingInPastOptimization = calcClusterWideEnsureActivationTime(catalog);
+
+        return clockService.waitFor(tsSafeForRoReadingInPastOptimization).thenApply(unused -> version);
+    }
+
     private HybridTimestamp calcClusterWideEnsureActivationTime(Catalog catalog) {
         return clusterWideEnsuredActivationTsSafeForRoReads(
                 catalog,
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
index eefbaa0..cc1c65d 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
@@ -45,8 +45,10 @@
  * <p>TBD: events
  */
 public interface CatalogService extends EventProducer<CatalogEvent, CatalogEventParameters> {
+    /** Default schema name. */
     String DEFAULT_SCHEMA_NAME = "PUBLIC";
 
+    /** System schema name. */
     String SYSTEM_SCHEMA_NAME = "SYSTEM";
 
     /** Default storage profile. */
@@ -110,4 +112,9 @@
      * @param version Catalog version to wait for.
      */
     CompletableFuture<Void> catalogReadyFuture(int version);
+
+    /**
+     * Returns a future, which completes when empty catalog is initialised. Otherwise this future completes upon startup.
+     */
+    CompletableFuture<Void> catalogInitializationFuture();
 }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AbstractCreateIndexCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AbstractCreateIndexCommand.java
index 08bebca..12bf302 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AbstractCreateIndexCommand.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AbstractCreateIndexCommand.java
@@ -91,7 +91,7 @@
         }
 
         return List.of(
-                new NewIndexEntry(createDescriptor(catalog.objectIdGenState(), table.id(), catalog.version() + 1), schemaName),
+                new NewIndexEntry(createDescriptor(catalog.objectIdGenState(), table.id(), catalog.version() + 1)),
                 new ObjectIdGenUpdateEntry(1)
         );
     }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAddColumnCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAddColumnCommand.java
index 57a2d1a..d1cc894 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAddColumnCommand.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAddColumnCommand.java
@@ -89,7 +89,7 @@
         }
 
         return List.of(
-                new NewColumnsEntry(table.id(), columnDescriptors, schemaName)
+                new NewColumnsEntry(table.id(), columnDescriptors)
         );
     }
 
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAlterColumnCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAlterColumnCommand.java
index 1a06cc1..1fda646 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAlterColumnCommand.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAlterColumnCommand.java
@@ -116,7 +116,7 @@
         }
 
         return List.of(
-                new AlterColumnEntry(table.id(), target, schemaName)
+                new AlterColumnEntry(table.id(), target)
         );
     }
 
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableDropColumnCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableDropColumnCommand.java
index 9e6ec15..7190859 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableDropColumnCommand.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableDropColumnCommand.java
@@ -107,7 +107,7 @@
         });
 
         return List.of(
-                new DropColumnsEntry(table.id(), columns, schemaName)
+                new DropColumnsEntry(table.id(), columns)
         );
     }
 
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommand.java
new file mode 100644
index 0000000..38f15e0
--- /dev/null
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommand.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.commands;
+
+import static org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
+import static org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.validateIdentifier;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+
+import java.util.List;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.CatalogValidationException;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSystemViewDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.storage.NewSchemaEntry;
+import org.apache.ignite.internal.catalog.storage.ObjectIdGenUpdateEntry;
+import org.apache.ignite.internal.catalog.storage.UpdateEntry;
+
+/**
+ * Command to create a new schema.
+ */
+public class CreateSchemaCommand implements CatalogCommand {
+
+    private final String schemaName;
+
+    private CreateSchemaCommand(String schemaName) {
+        validateIdentifier(schemaName, "Name of the schema");
+
+        this.schemaName = schemaName;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public List<UpdateEntry> get(Catalog catalog) {
+        int id = catalog.objectIdGenState();
+
+        if (catalog.schema(schemaName) != null) {
+            throw new CatalogValidationException(format("Schema with name '{}' already exists", schemaName));
+        }
+
+        CatalogSchemaDescriptor schema = new CatalogSchemaDescriptor(
+                id,
+                schemaName,
+                new CatalogTableDescriptor[0],
+                new CatalogIndexDescriptor[0],
+                new CatalogSystemViewDescriptor[0],
+                INITIAL_CAUSALITY_TOKEN
+        );
+
+        return List.of(
+                new NewSchemaEntry(schema),
+                new ObjectIdGenUpdateEntry(1)
+        );
+    }
+
+    /** Returns builder to create a command to create a new schema. */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /** Implementation of {@link CreateSchemaCommandBuilder}. */
+    public static class Builder implements CreateSchemaCommandBuilder {
+
+        private String name;
+
+        /** {@inheritDoc} */
+        @Override
+        public CreateSchemaCommandBuilder name(String name) {
+            this.name = name;
+            return this;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public CatalogCommand build() {
+            return new CreateSchemaCommand(name);
+        }
+    }
+}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommandBuilder.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommandBuilder.java
new file mode 100644
index 0000000..b3e690f
--- /dev/null
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommandBuilder.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.commands;
+
+import org.apache.ignite.internal.catalog.CatalogCommand;
+
+/**
+ * Builder for a {@link CreateSchemaCommand}.
+ */
+public interface CreateSchemaCommandBuilder {
+
+    /** Sets schema name. Should not be null or blank. */
+    CreateSchemaCommandBuilder name(String name);
+
+    /** Creates new schema command. */
+    CatalogCommand build();
+}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSystemViewCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSystemViewCommand.java
index efe6e63..baf358d 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSystemViewCommand.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSystemViewCommand.java
@@ -108,7 +108,7 @@
         CatalogSchemaDescriptor systemSchema = schemaOrThrow(catalog, CatalogManager.SYSTEM_SCHEMA_NAME);
 
         List<CatalogTableColumnDescriptor> viewColumns = columns.stream().map(CatalogUtils::fromParams).collect(toList());
-        CatalogSystemViewDescriptor descriptor = new CatalogSystemViewDescriptor(id, name, viewColumns, systemViewType);
+        CatalogSystemViewDescriptor descriptor = new CatalogSystemViewDescriptor(id, systemSchema.id(), name, viewColumns, systemViewType);
 
         CatalogSystemViewDescriptor existingSystemView = systemSchema.systemView(name);
 
@@ -121,7 +121,7 @@
         }
 
         return List.of(
-                new NewSystemViewEntry(descriptor, systemSchema.name()),
+                new NewSystemViewEntry(descriptor),
                 new ObjectIdGenUpdateEntry(1)
         );
     }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
index d8defb6..7a05f44 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
@@ -151,8 +151,8 @@
         CatalogIndexDescriptor pkIndex = createIndexDescriptor(txWaitCatalogVersion, indexName, pkIndexId, tableId);
 
         return List.of(
-                new NewTableEntry(table, schemaName),
-                new NewIndexEntry(pkIndex, schemaName),
+                new NewTableEntry(table),
+                new NewIndexEntry(pkIndex),
                 new MakeIndexAvailableEntry(pkIndexId),
                 new ObjectIdGenUpdateEntry(id - catalog.objectIdGenState())
         );
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommand.java
index 52cfbaf..2efb9e4 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommand.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommand.java
@@ -90,7 +90,7 @@
             case BUILDING:
                 return List.of(new RemoveIndexEntry(index.id()));
             case AVAILABLE:
-                return List.of(new DropIndexEntry(index.id(), index.tableId()));
+                return List.of(new DropIndexEntry(index.id()));
             default:
                 throw new IllegalStateException("Unknown index status: " + index.status());
         }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropTableCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropTableCommand.java
index ed2f713..399b2d9 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropTableCommand.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropTableCommand.java
@@ -62,7 +62,7 @@
                     updateEntries.add(new RemoveIndexEntry(index.id()));
                 });
 
-        updateEntries.add(new DropTableEntry(table.id(), schemaName));
+        updateEntries.add(new DropTableEntry(table.id()));
 
         return updateEntries;
     }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogColumnCollation.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogColumnCollation.java
index f494106..efaf92a 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogColumnCollation.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogColumnCollation.java
@@ -20,7 +20,7 @@
 /**
  * Enumeration of all supported collations.
  */
-// TODO: IGNITE-19082 drop similar classes in index and sql-engine modules.
+// TODO: https://issues.apache.org/jira/browse/IGNITE-22179 drop similar classes in index and sql-engine modules.
 public enum CatalogColumnCollation {
     ASC_NULLS_FIRST(true, true),
     ASC_NULLS_LAST(true, false),
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexDescriptor.java
index 93a1aff..a14c846 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexDescriptor.java
@@ -99,12 +99,13 @@
 
         /** Returns catalog index descriptor type by identifier. */
         public static CatalogIndexDescriptorType forId(int id) {
-            assert id == HASH.typeId || id == SORTED.typeId : "Unknown index descriptor type ID: " + id;
-
-            if (id == HASH.typeId) {
-                return HASH;
-            } else {
-                return SORTED;
+            switch (id) {
+                case 0:
+                    return HASH;
+                case 1:
+                    return SORTED;
+                default:
+                    throw new IllegalArgumentException("Unknown index descriptor type id: " + id);
             }
         }
     }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptor.java
index 6efa736..edbb46b 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptor.java
@@ -35,6 +35,8 @@
 public class CatalogSystemViewDescriptor extends CatalogObjectDescriptor {
     public static final CatalogObjectSerializer<CatalogSystemViewDescriptor> SERIALIZER = new SystemViewDescriptorSerializer();
 
+    private final int schemaId;
+
     private final List<CatalogTableColumnDescriptor> columns;
 
     private final SystemViewType systemViewType;
@@ -43,18 +45,26 @@
      * Constructor.
      *
      * @param id View id.
+     * @param schemaId Schema id.
      * @param name View name.
      * @param columns View columns.
      * @param systemViewType View type.
      */
-    public CatalogSystemViewDescriptor(int id, String name, List<CatalogTableColumnDescriptor> columns, SystemViewType systemViewType) {
-        this(id, name, columns, systemViewType, INITIAL_CAUSALITY_TOKEN);
+    public CatalogSystemViewDescriptor(
+            int id,
+            int schemaId,
+            String name,
+            List<CatalogTableColumnDescriptor> columns,
+            SystemViewType systemViewType
+    ) {
+        this(id, schemaId, name, columns, systemViewType, INITIAL_CAUSALITY_TOKEN);
     }
 
     /**
      * Constructor.
      *
      * @param id View id.
+     * @param schemaId Schema id.
      * @param name View name.
      * @param columns View columns.
      * @param systemViewType View type.
@@ -62,6 +72,7 @@
      */
     public CatalogSystemViewDescriptor(
             int id,
+            int schemaId,
             String name,
             List<CatalogTableColumnDescriptor> columns,
             SystemViewType systemViewType,
@@ -69,11 +80,21 @@
     ) {
         super(id, Type.SYSTEM_VIEW, name, causalityToken);
 
+        this.schemaId = schemaId;
         this.columns = Objects.requireNonNull(columns, "columns");
         this.systemViewType = Objects.requireNonNull(systemViewType, "viewType");
     }
 
     /**
+     * Returns a schema id of this view.
+     *
+     * @return A schema id.
+     */
+    public int schemaId() {
+        return schemaId;
+    }
+
+    /**
      * Returns a list of columns of this view.
      *
      * @return A list of columns.
@@ -101,13 +122,13 @@
             return false;
         }
         CatalogSystemViewDescriptor that = (CatalogSystemViewDescriptor) o;
-        return Objects.equals(columns, that.columns) && systemViewType == that.systemViewType;
+        return schemaId == that.schemaId && Objects.equals(columns, that.columns) && systemViewType == that.systemViewType;
     }
 
     /** {@inheritDoc} */
     @Override
     public int hashCode() {
-        return Objects.hash(columns, systemViewType);
+        return Objects.hash(schemaId, columns, systemViewType);
     }
 
     /** {@inheritDoc} */
@@ -116,6 +137,7 @@
         return S.toString(
                 CatalogSystemViewDescriptor.class, this,
                 "id", id(),
+                "schemaId", schemaId,
                 "name", name(),
                 "columns", columns,
                 "systemViewType", systemViewType()
@@ -147,12 +169,13 @@
 
         /** Returns system view type by identifier. */
         private static SystemViewType forId(int id) {
-            if (id == 0) {
-                return NODE;
-            } else {
-                assert id == 1;
-
-                return CLUSTER;
+            switch (id) {
+                case 0:
+                    return NODE;
+                case 1:
+                    return CLUSTER;
+                default:
+                    throw new IllegalArgumentException("Unknown system view type id: " + id);
             }
         }
     }
@@ -164,6 +187,7 @@
         @Override
         public CatalogSystemViewDescriptor readFrom(IgniteDataInput input) throws IOException {
             int id = input.readInt();
+            int schemaId = input.readInt();
             String name = input.readUTF();
             long updateToken = input.readLong();
             List<CatalogTableColumnDescriptor> columns = readList(CatalogTableColumnDescriptor.SERIALIZER, input);
@@ -171,12 +195,13 @@
             byte sysViewTypeId = input.readByte();
             SystemViewType sysViewType = SystemViewType.forId(sysViewTypeId);
 
-            return new CatalogSystemViewDescriptor(id, name, columns, sysViewType, updateToken);
+            return new CatalogSystemViewDescriptor(id, schemaId, name, columns, sysViewType, updateToken);
         }
 
         @Override
         public void writeTo(CatalogSystemViewDescriptor descriptor, IgniteDataOutput output) throws IOException {
             output.writeInt(descriptor.id());
+            output.writeInt(descriptor.schemaId);
             output.writeUTF(descriptor.name());
             output.writeLong(descriptor.updateToken());
             writeList(descriptor.columns(), CatalogTableColumnDescriptor.SERIALIZER, output);
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
index e8f7b1d..ab14b29 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
@@ -26,12 +26,12 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableSchemaVersions.TableVersion;
 import org.apache.ignite.internal.catalog.storage.serialization.CatalogObjectSerializer;
 import org.apache.ignite.internal.tostring.IgniteToStringExclude;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.util.io.IgniteDataInput;
 import org.apache.ignite.internal.util.io.IgniteDataOutput;
@@ -55,7 +55,9 @@
     private final CatalogTableSchemaVersions schemaVersions;
 
     private final List<CatalogTableColumnDescriptor> columns;
+    @IgniteToStringInclude
     private final List<String> primaryKeyColumns;
+    @IgniteToStringInclude
     private final List<String> colocationColumns;
 
     @IgniteToStringExclude
@@ -125,22 +127,12 @@
         this.pkIndexId = pkIndexId;
         this.zoneId = zoneId;
         this.columns = Objects.requireNonNull(columns, "No columns defined.");
-        primaryKeyColumns = Objects.requireNonNull(pkCols, "No primary key columns.");
-        colocationColumns = colocationCols == null || colocationCols.isEmpty() ? pkCols : colocationCols;
-
+        this.primaryKeyColumns = Objects.requireNonNull(pkCols, "No primary key columns.");
         this.columnsMap = columns.stream().collect(Collectors.toMap(CatalogTableColumnDescriptor::name, Function.identity()));
-
-        this.schemaVersions = schemaVersions;
-
+        this.colocationColumns = Objects.requireNonNullElse(colocationCols, pkCols);
+        this.schemaVersions =  Objects.requireNonNull(schemaVersions, "No catalog schema versions.");
+        this.storageProfile = Objects.requireNonNull(storageProfile, "No storage profile.");
         this.creationToken = creationToken;
-
-        this.storageProfile = storageProfile;
-
-        // TODO: IGNITE-19082 Throw proper exceptions.
-        assert !columnsMap.isEmpty() : "No columns.";
-
-        assert primaryKeyColumns.stream().noneMatch(c -> Objects.requireNonNull(columnsMap.get(c), c).nullable());
-        assert Set.copyOf(primaryKeyColumns).containsAll(colocationColumns);
     }
 
     /**
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StoppingIndexEventParameters.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StoppingIndexEventParameters.java
index ac8864b..6a27103 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StoppingIndexEventParameters.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StoppingIndexEventParameters.java
@@ -25,7 +25,6 @@
  * @see CatalogEvent#INDEX_STOPPING
  */
 public class StoppingIndexEventParameters extends IndexEventParameters {
-    private final int tableId;
 
     /**
      * Constructor.
@@ -33,16 +32,8 @@
      * @param causalityToken Causality token.
      * @param catalogVersion Catalog version.
      * @param indexId An id of dropped index.
-     * @param tableId Table ID for which the index was removed.
      */
-    public StoppingIndexEventParameters(long causalityToken, int catalogVersion, int indexId, int tableId) {
+    public StoppingIndexEventParameters(long causalityToken, int catalogVersion, int indexId) {
         super(causalityToken, catalogVersion, indexId);
-
-        this.tableId = tableId;
-    }
-
-    /** Returns table ID for which the index was removed. */
-    public int tableId() {
-        return tableId;
     }
 }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AbstractChangeIndexStatusEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AbstractChangeIndexStatusEntry.java
index 6c6baac..e86cf15 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AbstractChangeIndexStatusEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AbstractChangeIndexStatusEntry.java
@@ -21,6 +21,8 @@
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.indexOrThrow;
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceIndex;
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceSchema;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 
 import org.apache.ignite.internal.catalog.Catalog;
@@ -61,19 +63,9 @@
     }
 
     static CatalogSchemaDescriptor schemaByIndexId(Catalog catalog, int indexId) {
-        CatalogIndexDescriptor index = catalog.index(indexId);
-
-        assert index != null : indexId;
-
-        CatalogTableDescriptor table = catalog.table(index.tableId());
-
-        assert table != null : index.tableId();
-
-        CatalogSchemaDescriptor schema = catalog.schema(table.schemaId());
-
-        assert schema != null : table.schemaId();
-
-        return schema;
+        CatalogIndexDescriptor index = indexOrThrow(catalog, indexId);
+        CatalogTableDescriptor table = tableOrThrow(catalog, index.tableId());
+        return schemaOrThrow(catalog, table.schemaId());
     }
 
     private CatalogIndexDescriptor updateIndexStatus(
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
index f7cefa5..0d08df2 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
@@ -17,12 +17,12 @@
 
 package org.apache.ignite.internal.catalog.storage;
 
-import static java.util.Objects.requireNonNull;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceSchema;
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceTable;
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
 
 import java.io.IOException;
 import org.apache.ignite.internal.catalog.Catalog;
@@ -48,19 +48,15 @@
 
     private final CatalogTableColumnDescriptor column;
 
-    private final String schemaName;
-
     /**
      * Constructs the object.
      *
      * @param tableId An id the table to be modified.
      * @param column A modified descriptor of the column to be replaced.
-     * @param schemaName Schema name.
      */
-    public AlterColumnEntry(int tableId, CatalogTableColumnDescriptor column, String schemaName) {
+    public AlterColumnEntry(int tableId, CatalogTableColumnDescriptor column) {
         this.tableId = tableId;
         this.column = column;
-        this.schemaName = schemaName;
     }
 
     /** Returns an id the table to be modified. */
@@ -90,18 +86,17 @@
 
     @Override
     public Catalog applyUpdate(Catalog catalog, long causalityToken) {
-        CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);
+        CatalogTableDescriptor table = tableOrThrow(catalog, tableId);
+        CatalogSchemaDescriptor schema = schemaOrThrow(catalog, table.schemaId());
 
-        CatalogTableDescriptor currentTableDescriptor = requireNonNull(catalog.table(tableId));
-
-        CatalogTableDescriptor newTableDescriptor = currentTableDescriptor.newDescriptor(
-                currentTableDescriptor.name(),
-                currentTableDescriptor.tableVersion() + 1,
-                currentTableDescriptor.columns().stream()
+        CatalogTableDescriptor newTable = table.newDescriptor(
+                table.name(),
+                table.tableVersion() + 1,
+                table.columns().stream()
                         .map(source -> source.name().equals(column.name()) ? column : source)
                         .collect(toList()),
                 causalityToken,
-                currentTableDescriptor.storageProfile()
+                table.storageProfile()
         );
 
         return new Catalog(
@@ -109,7 +104,7 @@
                 catalog.time(),
                 catalog.objectIdGenState(),
                 catalog.zones(),
-                replaceSchema(replaceTable(schema, newTableDescriptor), catalog.schemas()),
+                replaceSchema(replaceTable(schema, newTable), catalog.schemas()),
                 defaultZoneIdOpt(catalog)
         );
     }
@@ -126,18 +121,15 @@
         @Override
         public AlterColumnEntry readFrom(IgniteDataInput input) throws IOException {
             CatalogTableColumnDescriptor descriptor = CatalogTableColumnDescriptor.SERIALIZER.readFrom(input);
-
-            String schemaName = input.readUTF();
             int tableId = input.readInt();
 
-            return new AlterColumnEntry(tableId, descriptor, schemaName);
+            return new AlterColumnEntry(tableId, descriptor);
         }
 
         @Override
         public void writeTo(AlterColumnEntry value, IgniteDataOutput output) throws IOException {
             CatalogTableColumnDescriptor.SERIALIZER.writeTo(value.descriptor(), output);
 
-            output.writeUTF(value.schemaName);
             output.writeInt(value.tableId);
         }
     }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
index c3cd9f8..c31f5fc 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
@@ -17,12 +17,12 @@
 
 package org.apache.ignite.internal.catalog.storage;
 
-import static java.util.Objects.requireNonNull;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceSchema;
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceTable;
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
 import static org.apache.ignite.internal.catalog.storage.serialization.CatalogSerializationUtils.writeStringCollection;
 import static org.apache.ignite.internal.util.IgniteUtils.capacity;
 
@@ -50,19 +50,16 @@
 
     private final int tableId;
     private final Set<String> columns;
-    private final String schemaName;
 
     /**
      * Constructs the object.
      *
      * @param tableId Table id.
      * @param columns Names of columns to drop.
-     * @param schemaName Schema name.
      */
-    public DropColumnsEntry(int tableId, Set<String> columns, String schemaName) {
+    public DropColumnsEntry(int tableId, Set<String> columns) {
         this.tableId = tableId;
         this.columns = columns;
-        this.schemaName = schemaName;
     }
 
     /** Returns table id. */
@@ -92,18 +89,17 @@
 
     @Override
     public Catalog applyUpdate(Catalog catalog, long causalityToken) {
-        CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);
+        CatalogTableDescriptor table = tableOrThrow(catalog, tableId);
+        CatalogSchemaDescriptor schema = schemaOrThrow(catalog, table.schemaId());
 
-        CatalogTableDescriptor currentTableDescriptor = requireNonNull(catalog.table(tableId));
-
-        CatalogTableDescriptor newTableDescriptor = currentTableDescriptor.newDescriptor(
-                currentTableDescriptor.name(),
-                currentTableDescriptor.tableVersion() + 1,
-                currentTableDescriptor.columns().stream()
+        CatalogTableDescriptor newTable = table.newDescriptor(
+                table.name(),
+                table.tableVersion() + 1,
+                table.columns().stream()
                         .filter(col -> !columns.contains(col.name()))
                         .collect(toList()),
                 causalityToken,
-                currentTableDescriptor.storageProfile()
+                table.storageProfile()
         );
 
         return new Catalog(
@@ -111,7 +107,7 @@
                 catalog.time(),
                 catalog.objectIdGenState(),
                 catalog.zones(),
-                replaceSchema(replaceTable(schema, newTableDescriptor), catalog.schemas()),
+                replaceSchema(replaceTable(schema, newTable), catalog.schemas()),
                 defaultZoneIdOpt(catalog)
         );
     }
@@ -127,16 +123,14 @@
     private static class DropColumnEntrySerializer implements CatalogObjectSerializer<DropColumnsEntry> {
         @Override
         public DropColumnsEntry readFrom(IgniteDataInput input) throws IOException {
-            String schemaName = input.readUTF();
             int tableId = input.readInt();
             Set<String> columns = CatalogSerializationUtils.readStringCollection(input, size -> new HashSet<>(capacity(size)));
 
-            return new DropColumnsEntry(tableId, columns, schemaName);
+            return new DropColumnsEntry(tableId, columns);
         }
 
         @Override
         public void writeTo(DropColumnsEntry object, IgniteDataOutput output) throws IOException {
-            output.writeUTF(object.schemaName);
             output.writeInt(object.tableId());
             writeStringCollection(object.columns(), output);
         }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
index 134455e..4241dee 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
@@ -33,20 +33,15 @@
  * the {@link CatalogIndexStatus#STOPPING} state.
  */
 public class DropIndexEntry extends AbstractChangeIndexStatusEntry implements Fireable {
-    public static final DropIndexEntrySerializer SERIALIZER = new DropIndexEntrySerializer();
-
-    private final int tableId;
+    public static final CatalogObjectSerializer<DropIndexEntry> SERIALIZER = new DropIndexEntrySerializer();
 
     /**
      * Constructs the object.
      *
      * @param indexId An id of an index to drop.
-     * @param tableId Table ID for which the index was removed.
      */
-    public DropIndexEntry(int indexId, int tableId) {
+    public DropIndexEntry(int indexId) {
         super(indexId, CatalogIndexStatus.STOPPING);
-
-        this.tableId = tableId;
     }
 
     /** Returns an id of an index to drop. */
@@ -54,11 +49,6 @@
         return indexId;
     }
 
-    /** Returns table ID for which the index was removed. */
-    public int tableId() {
-        return tableId;
-    }
-
     @Override
     public int typeId() {
         return MarshallableEntryType.DROP_INDEX.id();
@@ -71,7 +61,7 @@
 
     @Override
     public CatalogEventParameters createEventParameters(long causalityToken, int catalogVersion) {
-        return new StoppingIndexEventParameters(causalityToken, catalogVersion, indexId, tableId);
+        return new StoppingIndexEventParameters(causalityToken, catalogVersion, indexId);
     }
 
     @Override
@@ -86,15 +76,13 @@
         @Override
         public DropIndexEntry readFrom(IgniteDataInput input) throws IOException {
             int indexId = input.readInt();
-            int tableId = input.readInt();
 
-            return new DropIndexEntry(indexId, tableId);
+            return new DropIndexEntry(indexId);
         }
 
         @Override
         public void writeTo(DropIndexEntry entry, IgniteDataOutput out) throws IOException {
             out.writeInt(entry.indexId());
-            out.writeInt(entry.tableId());
         }
     }
 }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
index 509d284..06ccd5a 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
@@ -18,10 +18,11 @@
 package org.apache.ignite.internal.catalog.storage;
 
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Objects;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.commands.CatalogUtils;
 import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
@@ -43,17 +44,13 @@
 
     private final int tableId;
 
-    private final String schemaName;
-
     /**
      * Constructs the object.
      *
      * @param tableId An id of a table to drop.
-     * @param schemaName A schema name.
      */
-    public DropTableEntry(int tableId, String schemaName) {
+    public DropTableEntry(int tableId) {
         this.tableId = tableId;
-        this.schemaName = schemaName;
     }
 
     /** Returns an id of a table to drop. */
@@ -78,7 +75,8 @@
 
     @Override
     public Catalog applyUpdate(Catalog catalog, long causalityToken) {
-        CatalogSchemaDescriptor schema = Objects.requireNonNull(catalog.schema(schemaName));
+        CatalogTableDescriptor table = tableOrThrow(catalog, tableId);
+        CatalogSchemaDescriptor schema = schemaOrThrow(catalog, table.schemaId());
 
         return new Catalog(
                 catalog.version(),
@@ -109,15 +107,13 @@
         @Override
         public DropTableEntry readFrom(IgniteDataInput input) throws IOException {
             int tableId = input.readInt();
-            String schemaName = input.readUTF();
 
-            return new DropTableEntry(tableId, schemaName);
+            return new DropTableEntry(tableId);
         }
 
         @Override
         public void writeTo(DropTableEntry entry, IgniteDataOutput out) throws IOException {
             out.writeInt(entry.tableId());
-            out.writeUTF(entry.schemaName);
         }
     }
 }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
index de24c10..9e29a5f 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
@@ -17,11 +17,11 @@
 
 package org.apache.ignite.internal.catalog.storage;
 
-import static java.util.Objects.requireNonNull;
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceSchema;
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceTable;
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
 import static org.apache.ignite.internal.catalog.storage.serialization.CatalogSerializationUtils.readList;
 import static org.apache.ignite.internal.catalog.storage.serialization.CatalogSerializationUtils.writeList;
 
@@ -49,7 +49,6 @@
 
     private final int tableId;
     private final List<CatalogTableColumnDescriptor> descriptors;
-    private final String schemaName;
 
     /**
      * Constructs the object.
@@ -57,10 +56,9 @@
      * @param tableId Table id.
      * @param descriptors Descriptors of columns to add.
      */
-    public NewColumnsEntry(int tableId, List<CatalogTableColumnDescriptor> descriptors, String schemaName) {
+    public NewColumnsEntry(int tableId, List<CatalogTableColumnDescriptor> descriptors) {
         this.tableId = tableId;
         this.descriptors = descriptors;
-        this.schemaName = schemaName;
     }
 
     /** Returns table id. */
@@ -90,16 +88,15 @@
 
     @Override
     public Catalog applyUpdate(Catalog catalog, long causalityToken) {
-        CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);
+        CatalogTableDescriptor table = tableOrThrow(catalog, tableId);
+        CatalogSchemaDescriptor schema = schemaOrThrow(catalog, table.schemaId());
 
-        CatalogTableDescriptor currentTableDescriptor = requireNonNull(catalog.table(tableId));
-
-        CatalogTableDescriptor newTableDescriptor = currentTableDescriptor.newDescriptor(
-                currentTableDescriptor.name(),
-                currentTableDescriptor.tableVersion() + 1,
-                CollectionUtils.concat(currentTableDescriptor.columns(), descriptors),
+        CatalogTableDescriptor newTable = table.newDescriptor(
+                table.name(),
+                table.tableVersion() + 1,
+                CollectionUtils.concat(table.columns(), descriptors),
                 causalityToken,
-                currentTableDescriptor.storageProfile()
+                table.storageProfile()
         );
 
         return new Catalog(
@@ -107,7 +104,7 @@
                 catalog.time(),
                 catalog.objectIdGenState(),
                 catalog.zones(),
-                replaceSchema(replaceTable(schema, newTableDescriptor), catalog.schemas()),
+                replaceSchema(replaceTable(schema, newTable), catalog.schemas()),
                 defaultZoneIdOpt(catalog)
         );
     }
@@ -125,16 +122,14 @@
         public NewColumnsEntry readFrom(IgniteDataInput in) throws IOException {
             List<CatalogTableColumnDescriptor> columns = readList(CatalogTableColumnDescriptor.SERIALIZER, in);
             int tableId = in.readInt();
-            String schemaName = in.readUTF();
 
-            return new NewColumnsEntry(tableId, columns, schemaName);
+            return new NewColumnsEntry(tableId, columns);
         }
 
         @Override
         public void writeTo(NewColumnsEntry entry, IgniteDataOutput out) throws IOException {
             writeList(entry.descriptors(), CatalogTableColumnDescriptor.SERIALIZER, out);
             out.writeInt(entry.tableId());
-            out.writeUTF(entry.schemaName);
         }
     }
 }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
index 59f294c..b312d99 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
@@ -18,13 +18,15 @@
 package org.apache.ignite.internal.catalog.storage;
 
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
 
 import java.io.IOException;
-import java.util.Objects;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.commands.CatalogUtils;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
 import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
@@ -44,17 +46,13 @@
 
     private final CatalogIndexDescriptor descriptor;
 
-    private final String schemaName;
-
     /**
      * Constructs the object.
      *
      * @param descriptor A descriptor of an index to add.
-     * @param schemaName Schema name.
      */
-    public NewIndexEntry(CatalogIndexDescriptor descriptor, String schemaName) {
+    public NewIndexEntry(CatalogIndexDescriptor descriptor) {
         this.descriptor = descriptor;
-        this.schemaName = schemaName;
     }
 
     /** Gets descriptor of an index to add. */
@@ -79,7 +77,8 @@
 
     @Override
     public Catalog applyUpdate(Catalog catalog, long causalityToken) {
-        CatalogSchemaDescriptor schema = Objects.requireNonNull(catalog.schema(schemaName));
+        CatalogTableDescriptor table = tableOrThrow(catalog, descriptor.tableId());
+        CatalogSchemaDescriptor schema = schemaOrThrow(catalog, table.schemaId());
 
         descriptor.updateToken(causalityToken);
 
@@ -111,15 +110,13 @@
     private static class NewIndexEntrySerializer implements CatalogObjectSerializer<NewIndexEntry> {
         @Override
         public NewIndexEntry readFrom(IgniteDataInput input) throws IOException {
-            String schemaName = input.readUTF();
             CatalogIndexDescriptor descriptor = CatalogSerializationUtils.IDX_SERIALIZER.readFrom(input);
 
-            return new NewIndexEntry(descriptor, schemaName);
+            return new NewIndexEntry(descriptor);
         }
 
         @Override
         public void writeTo(NewIndexEntry entry, IgniteDataOutput output) throws IOException {
-            output.writeUTF(entry.schemaName);
             CatalogSerializationUtils.IDX_SERIALIZER.writeTo(entry.descriptor(), output);
         }
     }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSchemaEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSchemaEntry.java
new file mode 100644
index 0000000..ec809dd
--- /dev/null
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSchemaEntry.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.storage;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogValidationException;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
+import org.apache.ignite.internal.catalog.storage.serialization.CatalogObjectSerializer;
+import org.apache.ignite.internal.catalog.storage.serialization.MarshallableEntryType;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+
+/**
+ * New schema entry.
+ */
+public class NewSchemaEntry implements UpdateEntry {
+    public static final CatalogObjectSerializer<NewSchemaEntry> SERIALIZER = new Serializer();
+
+    private final CatalogSchemaDescriptor descriptor;
+
+    public NewSchemaEntry(CatalogSchemaDescriptor descriptor) {
+        this.descriptor = descriptor;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Catalog applyUpdate(Catalog catalog, long causalityToken) {
+        CatalogSchemaDescriptor schema = catalog.schema(descriptor.name());
+
+        if (schema != null) {
+            throw new CatalogValidationException(format("Schema with name '{}' already exists", schema.name()));
+        }
+
+        descriptor.updateToken(causalityToken);
+
+        List<CatalogSchemaDescriptor> schemas = new ArrayList<>(catalog.schemas().size() + 1);
+        schemas.addAll(catalog.schemas());
+        schemas.add(descriptor);
+
+        return new Catalog(
+                catalog.version(),
+                catalog.time(),
+                catalog.objectIdGenState(),
+                catalog.zones(),
+                schemas,
+                catalog.defaultZone().id()
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int typeId() {
+        return MarshallableEntryType.NEW_SCHEMA.id();
+    }
+
+    private static class Serializer implements CatalogObjectSerializer<NewSchemaEntry> {
+        @Override
+        public NewSchemaEntry readFrom(IgniteDataInput input) throws IOException {
+            CatalogSchemaDescriptor schemaDescriptor = CatalogSchemaDescriptor.SERIALIZER.readFrom(input);
+            return new NewSchemaEntry(schemaDescriptor);
+        }
+
+        @Override
+        public void writeTo(NewSchemaEntry value, IgniteDataOutput output) throws IOException {
+            CatalogSchemaDescriptor.SERIALIZER.writeTo(value.descriptor, output);
+        }
+    }
+}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSystemViewEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSystemViewEntry.java
index 03d130a..924749f 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSystemViewEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSystemViewEntry.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.catalog.storage;
 
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -45,17 +46,13 @@
 
     private final CatalogSystemViewDescriptor descriptor;
 
-    private final String schemaName;
-
     /**
      * Constructor.
      *
      * @param descriptor System view descriptor.
-     * @param schemaName A schema name.
      */
-    public NewSystemViewEntry(CatalogSystemViewDescriptor descriptor, String schemaName) {
+    public NewSystemViewEntry(CatalogSystemViewDescriptor descriptor) {
         this.descriptor = descriptor;
-        this.schemaName = schemaName;
     }
 
     @Override
@@ -78,7 +75,7 @@
     /** {@inheritDoc} */
     @Override
     public Catalog applyUpdate(Catalog catalog, long causalityToken) {
-        CatalogSchemaDescriptor systemSchema = catalog.schema(schemaName);
+        CatalogSchemaDescriptor systemSchema = schemaOrThrow(catalog, descriptor.schemaId());
 
         descriptor.updateToken(causalityToken);
 
@@ -119,15 +116,13 @@
         @Override
         public NewSystemViewEntry readFrom(IgniteDataInput input) throws IOException {
             CatalogSystemViewDescriptor descriptor = CatalogSystemViewDescriptor.SERIALIZER.readFrom(input);
-            String schema = input.readUTF();
 
-            return new NewSystemViewEntry(descriptor, schema);
+            return new NewSystemViewEntry(descriptor);
         }
 
         @Override
         public void writeTo(NewSystemViewEntry entry, IgniteDataOutput output) throws IOException {
             CatalogSystemViewDescriptor.SERIALIZER.writeTo(entry.descriptor, output);
-            output.writeUTF(entry.schemaName);
         }
     }
 }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
index 563bd5e..42b4e5b 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
@@ -18,10 +18,10 @@
 package org.apache.ignite.internal.catalog.storage;
 
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Objects;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.commands.CatalogUtils;
 import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
@@ -44,17 +44,13 @@
 
     private final CatalogTableDescriptor descriptor;
 
-    private final String schemaName;
-
     /**
      * Constructs the object.
      *
      * @param descriptor A descriptor of a table to add.
-     * @param schemaName A schema name.
      */
-    public NewTableEntry(CatalogTableDescriptor descriptor, String schemaName) {
+    public NewTableEntry(CatalogTableDescriptor descriptor) {
         this.descriptor = descriptor;
-        this.schemaName = schemaName;
     }
 
     /** Returns descriptor of a table to add. */
@@ -79,7 +75,7 @@
 
     @Override
     public Catalog applyUpdate(Catalog catalog, long causalityToken) {
-        CatalogSchemaDescriptor schema = Objects.requireNonNull(catalog.schema(schemaName));
+        CatalogSchemaDescriptor schema = schemaOrThrow(catalog, descriptor.schemaId());
 
         descriptor.updateToken(causalityToken);
 
@@ -114,15 +110,13 @@
         @Override
         public NewTableEntry readFrom(IgniteDataInput input) throws IOException {
             CatalogTableDescriptor descriptor = CatalogTableDescriptor.SERIALIZER.readFrom(input);
-            String schemaName = input.readUTF();
 
-            return new NewTableEntry(descriptor, schemaName);
+            return new NewTableEntry(descriptor);
         }
 
         @Override
         public void writeTo(NewTableEntry entry, IgniteDataOutput output) throws IOException {
             CatalogTableDescriptor.SERIALIZER.writeTo(entry.descriptor(), output);
-            output.writeUTF(entry.schemaName);
         }
     }
 }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RenameTableEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RenameTableEntry.java
index 420fc4a..69c3cdb 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RenameTableEntry.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RenameTableEntry.java
@@ -17,10 +17,11 @@
 
 package org.apache.ignite.internal.catalog.storage;
 
-import static java.util.Objects.requireNonNull;
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceSchema;
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceTable;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
 
 import java.io.IOException;
 import org.apache.ignite.internal.catalog.Catalog;
@@ -64,16 +65,15 @@
 
     @Override
     public Catalog applyUpdate(Catalog catalog, long causalityToken) {
-        CatalogTableDescriptor tableDescriptor = requireNonNull(catalog.table(tableId));
+        CatalogTableDescriptor table = tableOrThrow(catalog, tableId);
+        CatalogSchemaDescriptor schema = schemaOrThrow(catalog, table.schemaId());
 
-        CatalogSchemaDescriptor schemaDescriptor = requireNonNull(catalog.schema(tableDescriptor.schemaId()));
-
-        CatalogTableDescriptor newTableDescriptor = tableDescriptor.newDescriptor(
+        CatalogTableDescriptor newTable = table.newDescriptor(
                 newTableName,
-                tableDescriptor.tableVersion() + 1,
-                tableDescriptor.columns(),
+                table.tableVersion() + 1,
+                table.columns(),
                 causalityToken,
-                tableDescriptor.storageProfile()
+                table.storageProfile()
         );
 
         return new Catalog(
@@ -81,7 +81,7 @@
                 catalog.time(),
                 catalog.objectIdGenState(),
                 catalog.zones(),
-                replaceSchema(replaceTable(schemaDescriptor, newTableDescriptor), catalog.schemas()),
+                replaceSchema(replaceTable(schema, newTable), catalog.schemas()),
                 defaultZoneIdOpt(catalog)
         );
     }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/CatalogEntrySerializerProvider.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/CatalogEntrySerializerProvider.java
index ed7e523..86167ad 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/CatalogEntrySerializerProvider.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/CatalogEntrySerializerProvider.java
@@ -28,6 +28,7 @@
 import org.apache.ignite.internal.catalog.storage.MakeIndexAvailableEntry;
 import org.apache.ignite.internal.catalog.storage.NewColumnsEntry;
 import org.apache.ignite.internal.catalog.storage.NewIndexEntry;
+import org.apache.ignite.internal.catalog.storage.NewSchemaEntry;
 import org.apache.ignite.internal.catalog.storage.NewSystemViewEntry;
 import org.apache.ignite.internal.catalog.storage.NewTableEntry;
 import org.apache.ignite.internal.catalog.storage.NewZoneEntry;
@@ -79,6 +80,7 @@
             serializers[MarshallableEntryType.SNAPSHOT.id()] = SnapshotEntry.SERIALIZER;
             serializers[MarshallableEntryType.RENAME_INDEX.id()] = RenameIndexEntry.SERIALIZER;
             serializers[MarshallableEntryType.SET_DEFAULT_ZONE.id()] = SetDefaultZoneEntry.SERIALIZER;
+            serializers[MarshallableEntryType.NEW_SCHEMA.id()] = NewSchemaEntry.SERIALIZER;
             //noinspection ThisEscapedInObjectConstruction
             serializers[MarshallableEntryType.VERSIONED_UPDATE.id()] = new VersionedUpdateSerializer(this);
 
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/MarshallableEntryType.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/MarshallableEntryType.java
index 5306fc7..8cdb723 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/MarshallableEntryType.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/MarshallableEntryType.java
@@ -40,7 +40,8 @@
     SNAPSHOT(16),
     VERSIONED_UPDATE(17),
     RENAME_INDEX(18),
-    SET_DEFAULT_ZONE(19);
+    SET_DEFAULT_ZONE(19),
+    NEW_SCHEMA(20);
 
     /** Type ID. */
     private final int id;
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerDescriptorCausalityTokenTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerDescriptorCausalityTokenTest.java
index 5d0eb96..dbc0185 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerDescriptorCausalityTokenTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerDescriptorCausalityTokenTest.java
@@ -66,12 +66,12 @@
 
     @Test
     public void testEmptyCatalog() {
-        CatalogSchemaDescriptor defaultSchema = manager.schema(DEFAULT_SCHEMA_NAME, 0);
+        CatalogSchemaDescriptor defaultSchema = manager.schema(DEFAULT_SCHEMA_NAME, 1);
 
         assertNotNull(defaultSchema);
         assertNull(manager.catalog(0).defaultZone());
         assertSame(defaultSchema, manager.activeSchema(DEFAULT_SCHEMA_NAME, clock.nowLong()));
-        assertSame(defaultSchema, manager.schema(0));
+        assertSame(defaultSchema, manager.schema(1));
         assertSame(defaultSchema, manager.activeSchema(clock.nowLong()));
 
         Catalog catalogWithDefaultZone = manager.catalog(1);
@@ -89,7 +89,7 @@
         assertThrows(IllegalStateException.class, () -> manager.activeSchema(-1L));
 
         // Validate default schema.
-        assertEquals(INITIAL_CAUSALITY_TOKEN, defaultSchema.updateToken());
+        assertEquals(1, defaultSchema.updateToken());
     }
 
     @Test
@@ -108,8 +108,7 @@
 
         assertNotNull(schema);
         assertEquals(SCHEMA_NAME, schema.name());
-        assertSame(schema, manager.activeSchema(123L));
-        assertEquals(INITIAL_CAUSALITY_TOKEN, schema.updateToken());
+        assertEquals(1, schema.updateToken());
 
         assertNull(schema.table(TABLE_NAME));
         assertNull(manager.table(TABLE_NAME, 123L));
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index e0a34fb..d15c5ff 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -108,6 +108,7 @@
 import org.apache.ignite.internal.catalog.commands.CatalogUtils;
 import org.apache.ignite.internal.catalog.commands.ColumnParams;
 import org.apache.ignite.internal.catalog.commands.ColumnParams.Builder;
+import org.apache.ignite.internal.catalog.commands.CreateSchemaCommand;
 import org.apache.ignite.internal.catalog.commands.CreateZoneCommand;
 import org.apache.ignite.internal.catalog.commands.DefaultValue;
 import org.apache.ignite.internal.catalog.commands.DropIndexCommand;
@@ -173,12 +174,12 @@
 
     @Test
     public void testEmptyCatalog() {
-        CatalogSchemaDescriptor defaultSchema = manager.schema(DEFAULT_SCHEMA_NAME, 0);
+        CatalogSchemaDescriptor defaultSchema = manager.schema(DEFAULT_SCHEMA_NAME, 1);
 
         assertNotNull(defaultSchema);
         assertSame(defaultSchema, manager.activeSchema(DEFAULT_SCHEMA_NAME, clock.nowLong()));
-        assertSame(defaultSchema, manager.schema(0));
-        assertSame(defaultSchema, manager.schema(defaultSchema.id(), 0));
+        assertSame(defaultSchema, manager.schema(1));
+        assertSame(defaultSchema, manager.schema(defaultSchema.id(), 1));
         assertSame(defaultSchema, manager.activeSchema(clock.nowLong()));
 
         int nonExistingVersion = manager.latestCatalogVersion() + 1;
@@ -189,7 +190,7 @@
 
         // Validate default schema.
         assertEquals(DEFAULT_SCHEMA_NAME, defaultSchema.name());
-        assertEquals(0, defaultSchema.id());
+        assertEquals(1, defaultSchema.id());
         assertEquals(0, defaultSchema.tables().length);
         assertEquals(0, defaultSchema.indexes().length);
 
@@ -206,15 +207,15 @@
 
         // System schema should exist.
 
-        CatalogSchemaDescriptor systemSchema = manager.schema(SYSTEM_SCHEMA_NAME, 0);
+        CatalogSchemaDescriptor systemSchema = manager.schema(SYSTEM_SCHEMA_NAME, 1);
         assertNotNull(systemSchema, "system schema");
         assertSame(systemSchema, manager.activeSchema(SYSTEM_SCHEMA_NAME, clock.nowLong()));
-        assertSame(systemSchema, manager.schema(SYSTEM_SCHEMA_NAME, 0));
-        assertSame(systemSchema, manager.schema(systemSchema.id(), 0));
+        assertSame(systemSchema, manager.schema(SYSTEM_SCHEMA_NAME, 1));
+        assertSame(systemSchema, manager.schema(systemSchema.id(), 1));
 
         // Validate system schema.
         assertEquals(SYSTEM_SCHEMA_NAME, systemSchema.name());
-        assertEquals(1, systemSchema.id());
+        assertEquals(2, systemSchema.id());
         assertEquals(0, systemSchema.tables().length);
         assertEquals(0, systemSchema.indexes().length);
 
@@ -260,6 +261,8 @@
 
     @Test
     public void testCreateTable() {
+        long timePriorToTableCreation = clock.nowLong();
+
         int tableCreationVersion = await(
                 manager.execute(createTableCommand(
                         TABLE_NAME,
@@ -274,8 +277,7 @@
 
         assertNotNull(schema);
         assertEquals(SCHEMA_NAME, schema.name());
-        assertSame(schema, manager.activeSchema(0L));
-        assertSame(schema, manager.activeSchema(123L));
+        assertSame(schema, manager.activeSchema(timePriorToTableCreation));
 
         assertNull(schema.table(TABLE_NAME));
         assertNull(manager.table(TABLE_NAME, 123L));
@@ -1136,18 +1138,12 @@
             return falseCompletedFuture();
         });
 
-        CompletableFuture<?> createTableFut = manager.execute(List.of(
-                CreateZoneCommand.builder()
-                        .zoneName("TEST_ZONE")
-                        .storageProfilesParams(List.of(StorageProfileParams.builder().storageProfile(DEFAULT_STORAGE_PROFILE).build()))
-                        .build(),
-                AlterZoneSetDefaultCommand.builder()
-                        .zoneName("TEST_ZONE")
-                        .build(),
-                simpleTable("T")
-        ));
+        // It should not matter what a command does
+        CatalogCommand catalogCommand = catalog -> List.of(new ObjectIdGenUpdateEntry(1));
 
-        assertThat(createTableFut, willThrow(IgniteInternalException.class, "Max retry limit exceeded"));
+        CompletableFuture<?> fut = manager.execute(List.of(catalogCommand));
+
+        assertThat(fut, willThrow(IgniteInternalException.class, "Max retry limit exceeded"));
 
         // retry limit is hardcoded at org.apache.ignite.internal.catalog.CatalogServiceImpl.MAX_RETRY_COUNT
         verify(updateLogMock, times(10)).append(any());
@@ -1203,7 +1199,7 @@
 
         assertFalse(createTableFuture2.isDone());
 
-        verify(clockWaiter, timeout(10_000).times(2)).waitFor(any());
+        verify(clockWaiter, timeout(10_000).times(3)).waitFor(any());
 
         Catalog catalog0 = manager.catalog(manager.latestCatalogVersion());
 
@@ -1832,7 +1828,6 @@
         StoppingIndexEventParameters stoppingEventParameters = stoppingCaptor.getValue();
 
         assertEquals(indexId, stoppingEventParameters.indexId());
-        assertEquals(tableId, stoppingEventParameters.tableId());
 
         // Let's drop the table.
         assertThat(manager.execute(dropTableCommand(TABLE_NAME)), willCompleteSuccessfully());
@@ -2704,6 +2699,24 @@
     }
 
     @Test
+    public void testCreateSchema() {
+        String schemaName = "S1";
+
+        assertThat(manager.execute(CreateSchemaCommand.builder().name(schemaName).build()), willCompleteSuccessfully());
+
+        Catalog latestCatalog = manager.catalog(manager.activeCatalogVersion(clock.nowLong()));
+
+        assertNotNull(latestCatalog);
+        assertNotNull(latestCatalog.schema(schemaName));
+        assertNotNull(latestCatalog.schema(DEFAULT_SCHEMA_NAME));
+
+        assertThat(
+                manager.execute(CreateSchemaCommand.builder().name(schemaName).build()),
+                willThrowFast(CatalogValidationException.class, "Schema with name 'S1' already exists")
+        );
+    }
+
+    @Test
     public void testCatalogCompaction() throws Exception {
         assertThat(manager.execute(simpleTable(TABLE_NAME)), willCompleteSuccessfully());
         assertThat(manager.execute(simpleTable(TABLE_NAME_2)), willCompleteSuccessfully());
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogSystemViewTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogSystemViewTest.java
index 4d06b4a..0474a24 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogSystemViewTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogSystemViewTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.catalog;
 
 import static java.util.stream.Collectors.toList;
-import static org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
 import static org.apache.ignite.internal.catalog.CatalogService.SYSTEM_SCHEMA_NAME;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
@@ -119,7 +118,7 @@
 
         CatalogSchemaDescriptor schema = manager.activeSchema(clock.nowLong());
         assertNotNull(schema);
-        assertEquals(INITIAL_CAUSALITY_TOKEN, schema.updateToken());
+        assertEquals(1, schema.updateToken());
 
         assertThat(manager.execute(command), willCompleteSuccessfully());
 
@@ -131,7 +130,7 @@
         schema = manager.activeSchema(clock.nowLong());
         assertNotNull(schema);
         long schemaCausalityToken = schema.updateToken();
-        assertEquals(INITIAL_CAUSALITY_TOKEN, schemaCausalityToken);
+        assertEquals(1, schemaCausalityToken);
 
         // Assert that creation of the system view updates token for the descriptor.
         assertTrue(systemSchema.updateToken() > schemaCausalityToken);
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java
index a1e24c4..751b537 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java
@@ -389,7 +389,7 @@
 
         Exception e = assertThrows(CatalogValidationException.class, () -> replaceIndex(schema, index));
 
-        assertThat(e.getMessage(), is(String.format("Index with ID %d has not been found in schema with ID %d", index.id(), 0)));
+        assertThat(e.getMessage(), is(String.format("Index with ID %d has not been found in schema with ID %d", index.id(), 1)));
     }
 
     @Test
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommandValidationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommandValidationTest.java
new file mode 100644
index 0000000..ff85120
--- /dev/null
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommandValidationTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.commands;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogValidationException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests to verify validation of {@link CreateSchemaCommand}.
+ */
+public class CreateSchemaCommandValidationTest extends AbstractCommandValidationTest {
+
+    @ParameterizedTest(name = "[{index}] ''{argumentsWithNames}''")
+    @MethodSource("nullAndBlankStrings")
+    void schemaNameMustNotBeNullOrBlank(String name) {
+        CreateSchemaCommandBuilder builder = CreateSchemaCommand.builder().name(name);
+
+        assertThrows(
+                CatalogValidationException.class,
+                builder::build,
+                "Name of the schema can't be null or blank"
+        );
+    }
+
+    @Test
+    void commandFailsWhenSchemaAlreadyExists() {
+        String schemaName = "TEST";
+
+        CreateSchemaCommandBuilder builder = CreateSchemaCommand.builder().name(schemaName);
+
+        Catalog catalog = catalogWithSchema(schemaName);
+
+        assertThrows(
+                CatalogValidationException.class,
+                () -> builder.build().get(catalog),
+                "Schema with name 'TEST' already exists"
+        );
+    }
+
+    private static Catalog catalogWithSchema(String schemaName) {
+        return catalog(CreateSchemaCommand.builder().name(schemaName).build());
+    }
+}
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptorTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptorTest.java
index 41ed56c..92879a1 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptorTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptorTest.java
@@ -30,6 +30,7 @@
     void toStringContainsTypeAndFields() {
         var descriptor = new CatalogSystemViewDescriptor(
                 1,
+                2,
                 "view1",
                 List.of(),
                 SystemViewType.NODE
@@ -39,6 +40,7 @@
 
         assertThat(toString, startsWith("CatalogSystemViewDescriptor ["));
         assertThat(toString, containsString("id=1"));
+        assertThat(toString, containsString("schemaId=2"));
         assertThat(toString, containsString("name=view1"));
         assertThat(toString, containsString("systemViewType=NODE"));
     }
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/CatalogEntrySerializationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/CatalogEntrySerializationTest.java
index ac0e23b..4480404 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/CatalogEntrySerializationTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/CatalogEntrySerializationTest.java
@@ -106,15 +106,15 @@
                 break;
 
             case DROP_COLUMN:
-                checkSerialization(new DropColumnsEntry(1, Set.of("C1", "C2"), "PUBLIC"));
+                checkSerialization(new DropColumnsEntry(1, Set.of("C1", "C2")));
                 break;
 
             case DROP_INDEX:
-                checkSerialization(new DropIndexEntry(231, 23), new DropIndexEntry(231, 1));
+                checkSerialization(new DropIndexEntry(231), new DropIndexEntry(231));
                 break;
 
             case DROP_TABLE:
-                checkSerialization(new DropTableEntry(23, "PUBLIC"), new DropTableEntry(3, "SYSTEM"));
+                checkSerialization(new DropTableEntry(23), new DropTableEntry(3));
                 break;
 
             case DROP_ZONE:
@@ -169,6 +169,11 @@
                 checkSerialization(new SetDefaultZoneEntry(1), new SetDefaultZoneEntry(Integer.MAX_VALUE));
                 break;
 
+            case NEW_SCHEMA:
+                checkSerialization(new NewSchemaEntry(new CatalogSchemaDescriptor(
+                        0, "S", new CatalogTableDescriptor[0], new CatalogIndexDescriptor[0], new CatalogSystemViewDescriptor[0], 0)));
+                break;
+
             default:
                 throw new UnsupportedOperationException("Test not implemented " + type);
         }
@@ -282,10 +287,10 @@
                 newCatalogTableColumnDescriptor("c2", DefaultValue.functionCall("function"));
         CatalogTableColumnDescriptor desc4 = newCatalogTableColumnDescriptor("c3", DefaultValue.constant(null));
 
-        UpdateEntry entry1 = new AlterColumnEntry(1, desc1, "public");
-        UpdateEntry entry2 = new AlterColumnEntry(1, desc2, "public");
-        UpdateEntry entry3 = new AlterColumnEntry(1, desc3, "public");
-        UpdateEntry entry4 = new AlterColumnEntry(1, desc4, "public");
+        UpdateEntry entry1 = new AlterColumnEntry(1, desc1);
+        UpdateEntry entry2 = new AlterColumnEntry(1, desc2);
+        UpdateEntry entry3 = new AlterColumnEntry(1, desc3);
+        UpdateEntry entry4 = new AlterColumnEntry(1, desc4);
 
         VersionedUpdate update = newVersionedUpdate(entry1, entry2, entry3, entry4);
 
@@ -296,7 +301,7 @@
         CatalogTableColumnDescriptor columnDescriptor1 = newCatalogTableColumnDescriptor("c1", DefaultValue.constant(null));
         CatalogTableColumnDescriptor columnDescriptor2 = newCatalogTableColumnDescriptor("c2", DefaultValue.functionCall("func"));
 
-        NewColumnsEntry entry = new NewColumnsEntry(11, List.of(columnDescriptor1, columnDescriptor2), "PUBLIC");
+        NewColumnsEntry entry = new NewColumnsEntry(11, List.of(columnDescriptor1, columnDescriptor2));
 
         VersionedUpdate update = newVersionedUpdate(entry);
 
@@ -307,8 +312,8 @@
         CatalogSortedIndexDescriptor sortedIndexDescriptor = newSortedIndexDescriptor("idx1");
         CatalogHashIndexDescriptor hashIndexDescriptor = newHashIndexDescriptor("idx2");
 
-        NewIndexEntry sortedIdxEntry = new NewIndexEntry(sortedIndexDescriptor, "PUBLIC");
-        NewIndexEntry hashIdxEntry = new NewIndexEntry(hashIndexDescriptor, "PUBLIC");
+        NewIndexEntry sortedIdxEntry = new NewIndexEntry(sortedIndexDescriptor);
+        NewIndexEntry hashIdxEntry = new NewIndexEntry(hashIndexDescriptor);
 
         VersionedUpdate update = newVersionedUpdate(sortedIdxEntry, hashIdxEntry);
 
@@ -323,10 +328,10 @@
 
         List<CatalogTableColumnDescriptor> columns = List.of(col1, col2, col3, col4);
 
-        NewTableEntry entry1 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), null), "PUBLIC");
-        NewTableEntry entry2 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), List.of()), "PUBLIC");
-        NewTableEntry entry3 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), List.of("c2")), "PUBLIC");
-        NewTableEntry entry4 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), List.of("c1")), "PUBLIC");
+        NewTableEntry entry1 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), null));
+        NewTableEntry entry2 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), List.of()));
+        NewTableEntry entry3 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), List.of("c2")));
+        NewTableEntry entry4 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), List.of("c1")));
 
         VersionedUpdate update = newVersionedUpdate(entry1, entry2, entry3, entry4);
         VersionedUpdate deserialized = serialize(update);
@@ -342,12 +347,12 @@
         CatalogTableColumnDescriptor col2 = newCatalogTableColumnDescriptor("c2", null);
 
         CatalogSystemViewDescriptor nodeDesc =
-                new CatalogSystemViewDescriptor(1, "view1", List.of(col1, col2), SystemViewType.NODE);
+                new CatalogSystemViewDescriptor(1, 2, "view1", List.of(col1, col2), SystemViewType.NODE);
         CatalogSystemViewDescriptor clusterDesc =
-                new CatalogSystemViewDescriptor(1, "view1", List.of(col1, col2), SystemViewType.CLUSTER);
+                new CatalogSystemViewDescriptor(1, 2, "view1", List.of(col1, col2), SystemViewType.CLUSTER);
 
-        NewSystemViewEntry nodeEntry = new NewSystemViewEntry(nodeDesc, "PUBLIC");
-        NewSystemViewEntry clusterEntry = new NewSystemViewEntry(clusterDesc, "PUBLIC");
+        NewSystemViewEntry nodeEntry = new NewSystemViewEntry(nodeDesc);
+        NewSystemViewEntry clusterEntry = new NewSystemViewEntry(clusterDesc);
 
         VersionedUpdate update = newVersionedUpdate(nodeEntry, clusterEntry);
 
@@ -371,8 +376,8 @@
         };
 
         CatalogSystemViewDescriptor[] views = {
-                new CatalogSystemViewDescriptor(1, "view1", columns, SystemViewType.NODE),
-                new CatalogSystemViewDescriptor(1, "view2", columns, SystemViewType.CLUSTER)
+                new CatalogSystemViewDescriptor(1, 2, "view1", columns, SystemViewType.NODE),
+                new CatalogSystemViewDescriptor(1, 2, "view2", columns, SystemViewType.CLUSTER)
         };
 
         CatalogStorageProfilesDescriptor profiles =
diff --git a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
index d6491b6..6e9722f 100644
--- a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
+++ b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
@@ -158,7 +158,12 @@
 
     @Override
     public CompletableFuture<Void> catalogReadyFuture(int version) {
-        return null;
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> catalogInitializationFuture() {
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 5a4bfc0..47be872 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -69,7 +69,6 @@
  * <p>To avoid errors when using indexes while applying replication log during node recovery, the registration of indexes was moved to the
  * start of the tables.</p>
  */
-// TODO: IGNITE-19082 Delete this class
 public class IndexManager implements IgniteComponent {
     private static final IgniteLogger LOG = Loggers.forClass(IndexManager.class);
 
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index c90b014..8503e0f 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1114,6 +1114,7 @@
                         return cmgMgr.onJoinReady();
                     }, startupExecutor)
                     .thenComposeAsync(ignored -> awaitSelfInLocalLogicalTopology(), startupExecutor)
+                    .thenCompose(ignored -> catalogManager.catalogInitializationFuture())
                     .thenRunAsync(() -> {
                         try {
                             // Enable watermark events.
diff --git a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
index ec8875f1..9f044df 100644
--- a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
+++ b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
@@ -266,6 +266,7 @@
 
         return TestIgnitionManager.start(nodeName, config, workDir.resolve(nodeName))
                 .thenApply(IgniteImpl.class::cast)
+                .thenCompose(ignite -> ignite.catalogManager().catalogInitializationFuture().thenApply(ignored -> ignite))
                 .thenApply(ignite -> {
                     synchronized (nodes) {
                         while (nodes.size() < nodeIndex) {
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
index c30388f..58101d6 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.schema.registry;
 
-import static java.util.Collections.unmodifiableMap;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 
@@ -38,7 +37,6 @@
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.jetbrains.annotations.Nullable;
-import org.jetbrains.annotations.TestOnly;
 
 /**
  * Caching registry of actual schema descriptors for a table.
@@ -245,7 +243,7 @@
      * @param rowSchema Row schema.
      * @return Column mapper for target schema.
      */
-    ColumnMapper resolveMapping(SchemaDescriptor curSchema, SchemaDescriptor rowSchema) {
+    private ColumnMapper resolveMapping(SchemaDescriptor curSchema, SchemaDescriptor rowSchema) {
         assert curSchema.version() > rowSchema.version();
 
         if (curSchema.version() == rowSchema.version() + 1) {
@@ -254,9 +252,9 @@
 
         long mappingKey = (((long) curSchema.version()) << 32) | (rowSchema.version());
 
-        ColumnMapper mapping;
+        ColumnMapper mapping = mappingCache.get(mappingKey);
 
-        if ((mapping = mappingCache.get(mappingKey)) != null) {
+        if (mapping != null) {
             return mapping;
         }
 
@@ -292,34 +290,6 @@
         makeSchemaVersionAvailable(desc);
     }
 
-    /**
-     * Cleanup given schema version from history.
-     *
-     * @param ver Schema version to remove.
-     * @throws SchemaRegistryException If incorrect schema version provided.
-     */
-    public void onSchemaDropped(int ver) {
-        int lastVer = schemaCache.lastKey();
-
-        if (ver <= 0 || ver >= lastVer || ver > schemaCache.keySet().first()) {
-            throw new SchemaRegistryException("Incorrect schema version to clean up to: " + ver);
-        }
-
-        if (schemaCache.remove(ver) != null) {
-            mappingCache.keySet().removeIf(k -> (k & 0xFFFF_FFFFL) == ver);
-        }
-    }
-
-    /**
-     * For test purposes only.
-     *
-     * @return ColumnMapping cache.
-     */
-    @TestOnly
-    Map<Long, ColumnMapper> mappingCache() {
-        return unmodifiableMap(mappingCache);
-    }
-
     private CompletableFuture<SchemaDescriptor> tableSchemaAsync(int schemaVer) {
         if (schemaVer < lastKnownSchemaVersion()) {
             return completedFuture(loadStoredSchemaByVersion(schemaVer));
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
index 25e7a7b..cecab95 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
@@ -21,7 +21,6 @@
 import static java.util.stream.Collectors.toMap;
 import static org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor.INITIAL_TABLE_VERSION;
 import static org.apache.ignite.internal.schema.mapping.ColumnMapping.createMapper;
-import static org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowFast;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
@@ -30,7 +29,6 @@
 import static org.apache.ignite.internal.type.NativeTypes.INT64;
 import static org.apache.ignite.internal.type.NativeTypes.STRING;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -45,7 +43,6 @@
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaUtils;
-import org.apache.ignite.internal.schema.mapping.ColumnMapper;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -206,109 +203,6 @@
     }
 
     /**
-     * Check schema cleanup.
-     */
-    @Test
-    public void testSchemaCleanup() {
-        final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
-                new Column[]{new Column("keyLongCol", INT64, false)},
-                new Column[]{
-                        new Column("valStringCol", STRING, true)
-                });
-
-        final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
-                new Column[]{new Column("keyLongCol", INT64, false)},
-                new Column[]{
-                        new Column("valBytesCol", BYTES, true),
-                        new Column("valStringCol", STRING, true)
-                });
-
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, schemaV1);
-
-        assertEquals(INITIAL_TABLE_VERSION, reg.lastKnownSchemaVersion());
-
-        // Fail to cleanup initial schema
-        assertThrows(SchemaRegistryException.class, () -> reg.onSchemaDropped(INITIAL_TABLE_VERSION));
-        assertThrows(SchemaRegistryException.class, () -> reg.onSchemaDropped(0));
-
-        // Register schema with very first version.
-        assertThrows(SchemaRegistrationConflictException.class, () -> reg.onSchemaRegistered(schemaV1));
-
-        assertEquals(1, reg.lastKnownSchemaVersion());
-        assertNotNull(reg.lastKnownSchema());
-        assertNotNull(reg.schema(1));
-
-        // Try to remove latest schema.
-        assertThrows(SchemaRegistryException.class, () -> reg.onSchemaDropped(1));
-
-        assertEquals(1, reg.lastKnownSchemaVersion());
-        assertNotNull(reg.lastKnownSchema());
-        assertNotNull(reg.schema(1));
-
-        // Register new schema with next version.
-        reg.onSchemaRegistered(schemaV2);
-        reg.onSchemaRegistered(schemaV3);
-
-        assertEquals(3, reg.lastKnownSchemaVersion());
-        assertNotNull(reg.schema(1));
-        assertNotNull(reg.schema(2));
-        assertNotNull(reg.schema(3));
-
-        // Remove outdated schema 1.
-        reg.onSchemaDropped(1);
-
-        assertEquals(3, reg.lastKnownSchemaVersion());
-        assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
-        assertNotNull(reg.schema(2));
-        assertNotNull(reg.schema(3));
-
-        // Remove non-existed schemas.
-        reg.onSchemaDropped(1);
-
-        assertEquals(3, reg.lastKnownSchemaVersion());
-        assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
-        assertNotNull(reg.schema(2));
-        assertNotNull(reg.schema(3));
-
-        // Register new schema with next version.
-        reg.onSchemaRegistered(schemaV4);
-
-        assertEquals(4, reg.lastKnownSchemaVersion());
-        assertNotNull(reg.schema(2));
-        assertNotNull(reg.schema(3));
-        assertNotNull(reg.schema(4));
-
-        // Remove non-existed schemas.
-        reg.onSchemaDropped(1);
-
-        assertEquals(4, reg.lastKnownSchemaVersion());
-        assertSameSchema(schemaV4, reg.lastKnownSchema());
-        assertSameSchema(schemaV2, reg.schema(2));
-        assertSameSchema(schemaV3, reg.schema(3));
-        assertSameSchema(schemaV4, reg.schema(4));
-
-        // Out of order remove.
-        assertThrows(SchemaRegistryException.class, () -> reg.onSchemaDropped(3));
-
-        // Correct removal order.
-        reg.onSchemaDropped(2);
-        reg.onSchemaDropped(3);
-
-        assertEquals(4, reg.lastKnownSchemaVersion());
-        assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
-        assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
-        assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
-        assertSameSchema(schemaV4, reg.lastKnownSchema());
-        assertSameSchema(schemaV4, reg.schema(4));
-
-        // Try to remove latest schema.
-        assertThrows(SchemaRegistryException.class, () -> reg.onSchemaDropped(4));
-
-        assertEquals(4, reg.lastKnownSchemaVersion());
-        assertSameSchema(schemaV4, reg.schema(4));
-    }
-
-    /**
      * Check schema registration with full history.
      */
     @Test
@@ -420,105 +314,6 @@
         assertSameSchema(schemaV4, reg.schema(4));
     }
 
-    /**
-     * Check schema cleanup.
-     */
-    @Test
-    public void testSchemaWithHistoryCleanup() {
-        final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
-                new Column[]{new Column("keyLongCol", INT64, false)},
-                new Column[]{
-                        new Column("valStringCol", STRING, true)
-                });
-
-        final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
-                new Column[]{new Column("keyLongCol", INT64, false)},
-                new Column[]{
-                        new Column("valBytesCol", BYTES, true),
-                        new Column("valStringCol", STRING, true)
-                });
-
-        Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV2, schemaV3, schemaV4);
-
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, schemaV4);
-
-        assertEquals(4, reg.lastKnownSchemaVersion());
-        assertSameSchema(schemaV4, reg.lastKnownSchema());
-        assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
-        assertSameSchema(schemaV2, reg.schema(2));
-        assertSameSchema(schemaV3, reg.schema(3));
-        assertSameSchema(schemaV4, reg.schema(4));
-
-        history.remove(1);
-        reg.onSchemaDropped(1);
-
-        assertEquals(4, reg.lastKnownSchemaVersion());
-        assertNotNull(reg.schema(2));
-        assertNotNull(reg.schema(3));
-        assertNotNull(reg.schema(4));
-
-        history.remove(2);
-        history.remove(3);
-        reg.onSchemaDropped(2);
-        reg.onSchemaDropped(3);
-
-        assertEquals(4, reg.lastKnownSchemaVersion());
-        assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
-        assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
-        assertNotNull(reg.schema(4));
-    }
-
-    /**
-     * Check schema cache cleanup.
-     */
-    @Test
-    public void testSchemaCacheCleanup() {
-        final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
-                new Column[]{new Column("keyLongCol", INT64, false)},
-                new Column[]{
-                        new Column("valStringCol", STRING, true)
-                });
-
-        schemaV3.columnMapping(createMapper(schemaV3).add(
-                schemaV3.column("valStringCol").positionInRow(),
-                schemaV2.column("valStringCol").positionInRow())
-        );
-
-        final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
-                new Column[]{new Column("keyLongCol", INT64, false)},
-                new Column[]{
-                        new Column("valBytesCol", BYTES, true),
-                        new Column("valStringCol", STRING, true)
-                });
-
-        schemaV4.columnMapping(createMapper(schemaV4).add(schemaV4.column("valBytesCol")));
-
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, schemaV1);
-
-        final Map<Long, ColumnMapper> cache = reg.mappingCache();
-
-        assertThrows(SchemaRegistrationConflictException.class, () -> reg.onSchemaRegistered(schemaV1));
-        reg.onSchemaRegistered(schemaV2);
-        reg.onSchemaRegistered(schemaV3);
-        reg.onSchemaRegistered(schemaV4);
-
-        assertEquals(0, cache.size());
-
-        reg.resolveMapping(schemaV4, schemaV1);
-        reg.resolveMapping(schemaV3, schemaV1);
-        reg.resolveMapping(schemaV4, schemaV2);
-
-        assertEquals(3, cache.size());
-
-        reg.onSchemaDropped(schemaV1.version());
-
-        assertEquals(1, cache.size());
-
-        reg.onSchemaDropped(schemaV2.version());
-
-        assertEquals(0, cache.size());
-    }
-
     @Test
     void schemaAsyncReturnsExpectedResults() {
         Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV1);
@@ -536,19 +331,6 @@
         assertThat(schema2Future, willBe(schemaV2));
     }
 
-    @Test
-    void schemaAsyncReturnsExceptionForCompactedAwayVersion() {
-        Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV1, schemaV2);
-
-        SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, schemaV2);
-
-        history.remove(1);
-        reg.onSchemaDropped(1);
-
-        SchemaRegistryException ex = assertWillThrowFast(reg.schemaAsync(1), SchemaRegistryException.class);
-        assertThat(ex.getMessage(), is("Failed to find schema (was it compacted away?) [version=1]"));
-    }
-
     /**
      * SchemaHistory.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
index 8733e38..a2aaff1 100644
--- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
+++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
@@ -22,7 +22,6 @@
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.SYSTEM_SCHEMAS;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
-import static org.apache.ignite.internal.table.TableTestUtils.getTableStrict;
 import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_PARSE_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -301,25 +300,6 @@
     }
 
     /**
-     * Checks that schema version is updated even if column names are intersected.
-     */
-    // Need to be removed after https://issues.apache.org/jira/browse/IGNITE-19082
-    @Test
-    public void checkSchemaUpdatedWithEqAlterColumn() {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        IgniteImpl node = CLUSTER.aliveNode();
-
-        int tableVersionBefore = getTableStrict(node.catalogManager(), "TEST", node.clock().nowLong()).tableVersion();
-
-        sql("ALTER TABLE TEST ADD COLUMN (VAL1 INT)");
-
-        int tableVersionAfter = getTableStrict(node.catalogManager(), "TEST", node.clock().nowLong()).tableVersion();
-
-        assertEquals(tableVersionBefore + 1, tableVersionAfter);
-    }
-
-    /**
      * Check explicit colocation columns configuration.
      */
     @Test
diff --git a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java b/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java
index 75667d9..1a37242 100644
--- a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java
+++ b/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java
@@ -117,8 +117,7 @@
                     .map(SystemViewUtils::toSystemViewCreateCommand)
                     .collect(Collectors.toList());
 
-            catalogManager.execute(commands).whenComplete(
-                    (r, t) -> {
+            catalogManager.catalogReadyFuture(1).thenCompose((x) -> catalogManager.execute(commands)).whenComplete((r, t) -> {
                         viewsRegistrationFuture.complete(null);
 
                         if (t != null) {
diff --git a/modules/system-view/src/test/java/org/apache/ignite/internal/systemview/SystemViewManagerTest.java b/modules/system-view/src/test/java/org/apache/ignite/internal/systemview/SystemViewManagerTest.java
index 2f8c9cf..1ee56d5 100644
--- a/modules/system-view/src/test/java/org/apache/ignite/internal/systemview/SystemViewManagerTest.java
+++ b/modules/system-view/src/test/java/org/apache/ignite/internal/systemview/SystemViewManagerTest.java
@@ -39,10 +39,12 @@
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.BitSet;
@@ -75,7 +77,6 @@
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
-import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 
@@ -86,8 +87,7 @@
 public class SystemViewManagerTest extends BaseIgniteAbstractTest {
     private static final String LOCAL_NODE_NAME = "LOCAL_NODE_NAME";
 
-    @Mock
-    private CatalogManager catalog;
+    private final CatalogManager catalog = Mockito.mock(CatalogManager.class);
 
     private SystemViewManagerImpl viewMgr;
 
@@ -119,13 +119,16 @@
 
     @Test
     public void startAfterStartFails() {
-        Mockito.when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
+        when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
+        when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
+        when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
 
         viewMgr.register(() -> List.of(dummyView("test")));
 
         assertThat(viewMgr.startAsync(), willCompleteSuccessfully());
 
-        verify(catalog, only()).execute(anyList());
+        verify(catalog, times(1)).execute(anyList());
+        reset(catalog);
 
         assertThrows(IllegalStateException.class, viewMgr::startAsync);
 
@@ -148,12 +151,14 @@
     public void registerAllColumnTypes(NativeTypeSpec typeSpec) {
         NativeType type = SchemaTestUtils.specToType(typeSpec);
 
-        Mockito.when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
+        when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
+        when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
+        when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
 
         viewMgr.register(() -> List.of(dummyView("test", type)));
         assertThat(viewMgr.startAsync(), willCompleteSuccessfully());
 
-        verify(catalog, only()).execute(anyList());
+        verify(catalog, times(1)).execute(anyList());
         assertTrue(viewMgr.completeRegistration().isDone());
     }
 
@@ -161,20 +166,24 @@
     public void managerStartsSuccessfullyEvenIfCatalogRespondsWithError() {
         CatalogValidationException expected = new CatalogValidationException("Expected exception.");
 
-        Mockito.when(catalog.execute(anyList())).thenReturn(failedFuture(expected));
+        when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
+        when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
+        when(catalog.execute(anyList())).thenReturn(failedFuture(expected));
 
         viewMgr.register(() -> List.of(dummyView("test")));
 
         assertThat(viewMgr.startAsync(), willCompleteSuccessfully());
 
-        verify(catalog, only()).execute(anyList());
+        verify(catalog, times(1)).execute(anyList());
 
         assertThat(viewMgr.completeRegistration(), willBe(nullValue()));
     }
 
     @Test
     public void nodeAttributesUpdatedAfterStart() {
-        Mockito.when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
+        when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
+        when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
+        when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
 
         String name1 = "view1";
         String name2 = "view2";
@@ -185,9 +194,6 @@
 
         assertThat(viewMgr.startAsync(), willCompleteSuccessfully());
 
-        verify(catalog, only()).execute(anyList());
-        verifyNoMoreInteractions(catalog);
-
         assertThat(viewMgr.nodeAttributes(), is(Map.of(NODE_ATTRIBUTES_KEY, String.join(NODE_ATTRIBUTES_LIST_SEPARATOR, name1.toUpperCase(
                 Locale.ROOT), name2.toUpperCase(Locale.ROOT)))));
     }
@@ -251,7 +257,9 @@
 
     @Test
     void viewScanTest() {
-        Mockito.when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
+        when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
+        when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
+        when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
 
         String nodeView = "NODE_VIEW";
         String clusterView = "CLUSTER_VIEW";