IGNITE-19460 Sql.  Implement missed DDL commands with using Catalog (#2085)

diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
index 2724d5d..2c9e0f5 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
@@ -20,7 +20,10 @@
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.catalog.commands.AlterTableAddColumnParams;
 import org.apache.ignite.internal.catalog.commands.AlterTableDropColumnParams;
+import org.apache.ignite.internal.catalog.commands.CreateHashIndexParams;
+import org.apache.ignite.internal.catalog.commands.CreateSortedIndexParams;
 import org.apache.ignite.internal.catalog.commands.CreateTableParams;
+import org.apache.ignite.internal.catalog.commands.DropIndexParams;
 import org.apache.ignite.internal.catalog.commands.DropTableParams;
 import org.apache.ignite.internal.manager.IgniteComponent;
 
@@ -59,4 +62,28 @@
      * @return Operation future.
      */
     CompletableFuture<Void> dropColumn(AlterTableDropColumnParams params);
+
+    /**
+     * Creates new sorted index.
+     *
+     * @param params Parameters.
+     * @return Operation future.
+     */
+    CompletableFuture<Void> createIndex(CreateSortedIndexParams params);
+
+    /**
+     * Creates new hash index.
+     *
+     * @param params Parameters.
+     * @return Operation future.
+     */
+    CompletableFuture<Void> createIndex(CreateHashIndexParams params);
+
+    /**
+     * Drops index.
+     *
+     * @param params Parameters.
+     * @return Operation future.
+     */
+    CompletableFuture<Void> dropIndex(DropIndexParams params);
 }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
index 70e70aa..629078c 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
@@ -41,6 +41,8 @@
 
     TableDescriptor table(int tableId, long timestamp);
 
+    IndexDescriptor index(String indexName, long timestamp);
+
     IndexDescriptor index(int indexId, long timestamp);
 
     Collection<IndexDescriptor> tableIndexes(int tableId, long timestamp);
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
index 1cadf55..bfbe1c8 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
@@ -23,6 +23,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
@@ -30,12 +31,16 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.catalog.commands.AlterTableAddColumnParams;
 import org.apache.ignite.internal.catalog.commands.AlterTableDropColumnParams;
 import org.apache.ignite.internal.catalog.commands.CatalogUtils;
 import org.apache.ignite.internal.catalog.commands.ColumnParams;
+import org.apache.ignite.internal.catalog.commands.CreateHashIndexParams;
+import org.apache.ignite.internal.catalog.commands.CreateSortedIndexParams;
 import org.apache.ignite.internal.catalog.commands.CreateTableParams;
+import org.apache.ignite.internal.catalog.commands.DropIndexParams;
 import org.apache.ignite.internal.catalog.commands.DropTableParams;
 import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.SchemaDescriptor;
@@ -44,12 +49,16 @@
 import org.apache.ignite.internal.catalog.events.AddColumnEventParameters;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
 import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
 import org.apache.ignite.internal.catalog.events.DropColumnEventParameters;
+import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
 import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
 import org.apache.ignite.internal.catalog.storage.DropColumnsEntry;
+import org.apache.ignite.internal.catalog.storage.DropIndexEntry;
 import org.apache.ignite.internal.catalog.storage.DropTableEntry;
 import org.apache.ignite.internal.catalog.storage.NewColumnsEntry;
+import org.apache.ignite.internal.catalog.storage.NewIndexEntry;
 import org.apache.ignite.internal.catalog.storage.NewTableEntry;
 import org.apache.ignite.internal.catalog.storage.ObjectIdGenUpdateEntry;
 import org.apache.ignite.internal.catalog.storage.UpdateEntry;
@@ -64,9 +73,12 @@
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.ColumnAlreadyExistsException;
 import org.apache.ignite.lang.ColumnNotFoundException;
+import org.apache.ignite.lang.ErrorGroups;
 import org.apache.ignite.lang.ErrorGroups.Common;
 import org.apache.ignite.lang.ErrorGroups.Sql;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IndexAlreadyExistsException;
+import org.apache.ignite.lang.IndexNotFoundException;
 import org.apache.ignite.lang.TableAlreadyExistsException;
 import org.apache.ignite.lang.TableNotFoundException;
 import org.apache.ignite.sql.SqlException;
@@ -131,6 +143,12 @@
 
     /** {@inheritDoc} */
     @Override
+    public IndexDescriptor index(String indexName, long timestamp) {
+        return catalogAt(timestamp).schema(CatalogService.PUBLIC).index(indexName);
+    }
+
+    /** {@inheritDoc} */
+    @Override
     public IndexDescriptor index(int indexId, long timestamp) {
         return catalogAt(timestamp).index(indexId);
     }
@@ -208,9 +226,15 @@
                 throw new TableNotFoundException(schemaName, params.tableName());
             }
 
-            return List.of(
-                    new DropTableEntry(table.id())
-            );
+            List<UpdateEntry> updateEntries = new ArrayList<>();
+
+            Arrays.stream(schema.indexes())
+                    .filter(index -> index.tableId() == table.id())
+                    .forEach(index -> updateEntries.add(new DropIndexEntry(index.id())));
+
+            updateEntries.add(new DropTableEntry(table.id()));
+
+            return updateEntries;
         });
     }
 
@@ -297,6 +321,129 @@
         });
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> createIndex(CreateHashIndexParams params) {
+        return saveUpdate(catalog -> {
+            String schemaName = Objects.requireNonNullElse(params.schemaName(), CatalogService.PUBLIC);
+
+            SchemaDescriptor schema = Objects.requireNonNull(catalog.schema(schemaName), "No schema found: " + schemaName);
+
+            if (schema.index(params.indexName()) != null) {
+                throw new IndexAlreadyExistsException(schemaName, params.indexName());
+            }
+
+            TableDescriptor table = schema.table(params.tableName());
+
+            if (table == null) {
+                throw new TableNotFoundException(schemaName, params.tableName());
+            }
+
+            if (params.columns().isEmpty()) {
+                throw new IgniteInternalException(
+                        ErrorGroups.Index.INVALID_INDEX_DEFINITION_ERR,
+                        "No index columns was specified."
+                );
+            }
+
+            Predicate<String> duplicateValidator = Predicate.not(new HashSet<>()::add);
+
+            for (String columnName : params.columns()) {
+                TableColumnDescriptor columnDescriptor = table.columnDescriptor(columnName);
+
+                if (columnDescriptor == null) {
+                    throw new ColumnNotFoundException(columnName);
+                } else if (duplicateValidator.test(columnName)) {
+                    throw new IgniteInternalException(
+                            ErrorGroups.Index.INVALID_INDEX_DEFINITION_ERR,
+                            "Can't create index on duplicate columns: columnName=" + columnName
+                    );
+                }
+            }
+
+            IndexDescriptor index = CatalogUtils.fromParams(catalog.objectIdGenState(), table.id(), params);
+
+            return List.of(
+                    new NewIndexEntry(index),
+                    new ObjectIdGenUpdateEntry(1)
+            );
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> createIndex(CreateSortedIndexParams params) {
+        return saveUpdate(catalog -> {
+            String schemaName = Objects.requireNonNullElse(params.schemaName(), CatalogService.PUBLIC);
+
+            SchemaDescriptor schema = Objects.requireNonNull(catalog.schema(schemaName), "No schema found: " + schemaName);
+
+            if (schema.index(params.indexName()) != null) {
+                throw new IndexAlreadyExistsException(schemaName, params.indexName());
+            }
+
+            TableDescriptor table = schema.table(params.tableName());
+
+            if (table == null) {
+                throw new TableNotFoundException(schemaName, params.tableName());
+            }
+
+            if (params.columns().isEmpty()) {
+                throw new IgniteInternalException(
+                        ErrorGroups.Index.INVALID_INDEX_DEFINITION_ERR,
+                        "No index columns was specified."
+                );
+            } else if (params.collations().size() != params.columns().size()) {
+                throw new IgniteInternalException(
+                        ErrorGroups.Index.INVALID_INDEX_DEFINITION_ERR,
+                        "Columns collations doesn't match number of columns."
+                );
+            }
+
+            Predicate<String> duplicateValidator = Predicate.not(new HashSet<>()::add);
+
+            for (String columnName : params.columns()) {
+                TableColumnDescriptor columnDescriptor = table.columnDescriptor(columnName);
+
+                if (columnDescriptor == null) {
+                    throw new ColumnNotFoundException(columnName);
+                } else if (duplicateValidator.test(columnName)) {
+                    throw new IgniteInternalException(
+                            ErrorGroups.Index.INVALID_INDEX_DEFINITION_ERR,
+                            "Can't create index on duplicate columns: columnName=" + columnName
+                    );
+                }
+            }
+
+            IndexDescriptor index = CatalogUtils.fromParams(catalog.objectIdGenState(), table.id(), params);
+
+            return List.of(
+                    new NewIndexEntry(index),
+                    new ObjectIdGenUpdateEntry(1)
+            );
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> dropIndex(DropIndexParams params) {
+        return saveUpdate(catalog -> {
+            String schemaName = Objects.requireNonNullElse(params.schemaName(), CatalogService.PUBLIC);
+
+            SchemaDescriptor schema = Objects.requireNonNull(catalog.schema(schemaName), "No schema found: " + schemaName);
+
+            IndexDescriptor index = schema.index(params.indexName());
+
+            if (index == null) {
+                throw new IndexNotFoundException(schemaName, params.indexName());
+            }
+
+            return List.of(
+                    new DropIndexEntry(index.id())
+            );
+        });
+    }
+
     private void registerCatalog(Catalog newCatalog) {
         catalogByVer.put(newCatalog.version(), newCatalog);
         catalogByTs.put(newCatalog.time(), newCatalog);
@@ -453,6 +600,44 @@
                             CatalogEvent.TABLE_ALTER,
                             new DropColumnEventParameters(version, tableId, columns)
                     ));
+                } else if (entry instanceof NewIndexEntry) {
+                    catalog = new Catalog(
+                            version,
+                            System.currentTimeMillis(),
+                            catalog.objectIdGenState(),
+                            new SchemaDescriptor(
+                                    schema.id(),
+                                    schema.name(),
+                                    version,
+                                    schema.tables(),
+                                    ArrayUtils.concat(schema.indexes(), ((NewIndexEntry) entry).descriptor())
+                            )
+                    );
+
+                    eventFutures.add(fireEvent(
+                            CatalogEvent.INDEX_CREATE,
+                            new CreateIndexEventParameters(version, ((NewIndexEntry) entry).descriptor())
+                    ));
+                } else if (entry instanceof DropIndexEntry) {
+                    int indexId = ((DropIndexEntry) entry).indexId();
+
+                    catalog = new Catalog(
+                            version,
+                            System.currentTimeMillis(),
+                            catalog.objectIdGenState(),
+                            new SchemaDescriptor(
+                                    schema.id(),
+                                    schema.name(),
+                                    version,
+                                    schema.tables(),
+                                    Arrays.stream(schema.indexes()).filter(t -> t.id() != indexId).toArray(IndexDescriptor[]::new)
+                            )
+                    );
+
+                    eventFutures.add(fireEvent(
+                            CatalogEvent.INDEX_DROP,
+                            new DropIndexEventParameters(version, indexId)
+                    ));
                 } else if (entry instanceof ObjectIdGenUpdateEntry) {
                     catalog = new Catalog(
                             version,
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AbstractIndexCommandParams.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AbstractIndexCommandParams.java
new file mode 100644
index 0000000..5652661
--- /dev/null
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AbstractIndexCommandParams.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.commands;
+
+/**
+ * Abstract index ddl command.
+ */
+public abstract class AbstractIndexCommandParams implements DdlCommandParams {
+    /** Index name. */
+    protected String indexName;
+
+    /** Schema name where this new index will be created. */
+    protected String schema;
+
+    /**
+     * Returns index simple name.
+     */
+    public String indexName() {
+        return indexName;
+    }
+
+    /**
+     * Returns schema name.
+     */
+    public String schemaName() {
+        return schema;
+    }
+
+    /**
+     * Parameters builder.
+     */
+    protected abstract static class AbstractBuilder<ParamT extends AbstractIndexCommandParams, BuilderT> {
+        protected ParamT params;
+
+        AbstractBuilder(ParamT params) {
+            this.params = params;
+        }
+
+        /**
+         * Sets schema name.
+         *
+         * @param schemaName Schema name.
+         * @return {@code this}.
+         */
+        public BuilderT schemaName(String schemaName) {
+            params.schema = schemaName;
+            return (BuilderT) this;
+        }
+
+        /**
+         * Sets index simple name.
+         *
+         * @param indexName Index simple name.
+         * @return {@code this}.
+         */
+        public BuilderT indexName(String indexName) {
+            params.indexName = indexName;
+            return (BuilderT) this;
+        }
+
+        /**
+         * Builds parameters.
+         *
+         * @return Parameters.
+         */
+        public ParamT build() {
+            ParamT params0 = params;
+            params = null;
+            return params0;
+        }
+    }
+}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AbstractTableCommandParams.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AbstractTableCommandParams.java
index 6041418..67c7a58 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AbstractTableCommandParams.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AbstractTableCommandParams.java
@@ -27,10 +27,16 @@
     /** Schema name where this new table will be created. */
     protected String schema;
 
+    /**
+     * Returns table simple name.
+     */
     public String tableName() {
         return tableName;
     }
 
+    /**
+     * Returns schema name.
+     */
     public String schemaName() {
         return schema;
     }
@@ -57,9 +63,9 @@
         }
 
         /**
-         * Sets table schema.
+         * Sets table simple name.
          *
-         * @param tableName Table name.
+         * @param tableName Table simple name.
          * @return {@code this}.
          */
         public BuilderT tableName(String tableName) {
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
index d462be6..fe80cff 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
@@ -17,7 +17,14 @@
 
 package org.apache.ignite.internal.catalog.commands;
 
+import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.catalog.descriptors.ColumnCollation;
+import org.apache.ignite.internal.catalog.descriptors.HashIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.IndexColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.SortedIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
 
@@ -42,6 +49,43 @@
     }
 
     /**
+     * Converts CreateIndex command params to hash index descriptor.
+     *
+     * @param id Index id.
+     * @param tableId Table id.
+     * @param params Parameters.
+     * @return Index descriptor.
+     */
+    public static IndexDescriptor fromParams(int id, int tableId, CreateHashIndexParams params) {
+        return new HashIndexDescriptor(id,
+                params.indexName(),
+                tableId,
+                false,
+                params.columns()
+        );
+    }
+
+    /**
+     * Converts CreateIndex command params to sorted index descriptor.
+     *
+     * @param id Index id.
+     * @param tableId Table id.
+     * @param params Parameters.
+     * @return Index descriptor.
+     */
+    public static IndexDescriptor fromParams(int id, int tableId, CreateSortedIndexParams params) {
+        List<ColumnCollation> collations = params.collations();
+
+        assert collations.size() == params.columns().size();
+
+        List<IndexColumnDescriptor> columnDescriptors = IntStream.range(0, collations.size())
+                .mapToObj(i -> new IndexColumnDescriptor(params.columns().get(i), collations.get(i)))
+                .collect(Collectors.toList());
+
+        return new SortedIndexDescriptor(id, params.indexName(), tableId, params.isUnique(), columnDescriptors);
+    }
+
+    /**
      * Converts AlterTableAdd command columns parameters to column descriptor.
      *
      * @param params Parameters.
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateHashIndexParams.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateHashIndexParams.java
new file mode 100644
index 0000000..bf260a3
--- /dev/null
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateHashIndexParams.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.commands;
+
+import java.util.List;
+
+/**
+ * CREATE INDEX statement.
+ */
+public class CreateHashIndexParams extends AbstractIndexCommandParams {
+    /** Creates parameters builder. */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /** Table name. */
+    private String tableName;
+
+    /** Indexed columns. */
+    private List<String> columns;
+
+    /**
+     * Gets table name.
+     */
+    public String tableName() {
+        return tableName;
+    }
+
+    /**
+     * Gets indexed columns.
+     */
+    public List<String> columns() {
+        return columns;
+    }
+
+    /**
+     * Parameters builder.
+     */
+    public static class Builder extends AbstractIndexCommandParams.AbstractBuilder<CreateHashIndexParams, CreateHashIndexParams.Builder> {
+        private Builder() {
+            super(new CreateHashIndexParams());
+        }
+
+        /**
+         * Set table name.
+         *
+         * @param tableName Table name.
+         * @return {@code this}.
+         */
+        public Builder tableName(String tableName) {
+            params.tableName = tableName;
+
+            return this;
+        }
+
+        /**
+         * Set columns names.
+         *
+         * @param columns Columns names.
+         * @return {@code this}.
+         */
+        public Builder columns(List<String> columns) {
+            params.columns = columns;
+
+            return this;
+        }
+    }
+}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSortedIndexParams.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSortedIndexParams.java
new file mode 100644
index 0000000..d512f96
--- /dev/null
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSortedIndexParams.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.commands;
+
+import java.util.List;
+import org.apache.ignite.internal.catalog.descriptors.ColumnCollation;
+
+/**
+ * CREATE INDEX statement.
+ */
+public class CreateSortedIndexParams extends AbstractIndexCommandParams {
+    /** Creates parameters builder. */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /** Table name. */
+    private String tableName;
+
+    /** Indexed columns. */
+    private List<String> columns;
+
+    /** Columns collations. */
+    private List<ColumnCollation> collations;
+
+    /** Unique index flag. */
+    protected boolean unique;
+
+    /**
+     * Gets table name.
+     */
+    public String tableName() {
+        return tableName;
+    }
+
+    /**
+     * Gets indexed columns.
+     */
+    public List<String> columns() {
+        return columns;
+    }
+
+    /**
+     * Gets columns collations.
+     */
+    public List<ColumnCollation> collations() {
+        return collations;
+    }
+
+    /**
+     * Returns {@code true} if index is unique, {@code false} otherwise.
+     */
+    public boolean isUnique() {
+        return unique;
+    }
+
+    /**
+     * Parameters builder.
+     */
+    public static class Builder extends AbstractBuilder<CreateSortedIndexParams, CreateSortedIndexParams.Builder> {
+        private Builder() {
+            super(new CreateSortedIndexParams());
+        }
+
+        /**
+         * Set table name.
+         *
+         * @param tableName Table name.
+         * @return {@code this}.
+         */
+        public Builder tableName(String tableName) {
+            params.tableName = tableName;
+
+            return this;
+        }
+
+        /**
+         * Set columns names.
+         *
+         * @param columns Columns names.
+         * @return {@code this}.
+         */
+        public Builder columns(List<String> columns) {
+            params.columns = columns;
+
+            return this;
+        }
+
+        /**
+         * Set columns collations.
+         *
+         * @param collations Columns collations.
+         * @return {@code this}.
+         */
+        public Builder collations(List<ColumnCollation> collations) {
+            params.collations = collations;
+
+            return this;
+        }
+
+        /**
+         * Sets unique flag.
+         */
+        public Builder unique() {
+            params.unique = true;
+
+            return this;
+        }
+    }
+}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableParams.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableParams.java
index 3047de8..525219a 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableParams.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableParams.java
@@ -24,6 +24,7 @@
  * CREATE TABLE statement.
  */
 public class CreateTableParams extends AbstractTableCommandParams {
+    /** Creates parameters builder. */
     public static Builder builder() {
         return new Builder();
     }
@@ -39,33 +40,29 @@
     /** Columns. */
     private List<ColumnParams> cols;
 
+    /** Distribution zone name. */
     @Nullable
     private String zone;
 
     private CreateTableParams() {
-
     }
 
     /**
-     * Get table columns.
-     *
-     * @return Columns.
+     * Gets table columns.
      */
     public List<ColumnParams> columns() {
         return cols;
     }
 
     /**
-     * Get primary key columns.
+     * Gets primary key columns.
      */
     public List<String> primaryKeyColumns() {
         return pkCols;
     }
 
     /**
-     * Get colocation column names.
-     *
-     * @return Collocation column names.
+     * Gets colocation column names.
      */
     @Nullable
     public List<String> colocationColumns() {
@@ -73,7 +70,7 @@
     }
 
     /**
-     * Get zone name.
+     * Gets zone name.
      */
     @Nullable
     public String zone() {
@@ -89,7 +86,7 @@
         }
 
         /**
-         * Set table columns.
+         * Sets table columns.
          *
          * @param cols Columns.
          * @return {@code this}.
@@ -101,7 +98,7 @@
         }
 
         /**
-         * Set primary key columns.
+         * Sets primary key columns.
          *
          * @return {@code this}.
          */
@@ -124,7 +121,7 @@
         }
 
         /**
-         * Set zone name.
+         * Sets zone name.
          *
          * @param zoneName Zone name.
          * @return {@code this}.
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexParams.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexParams.java
new file mode 100644
index 0000000..0ccc866
--- /dev/null
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexParams.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.commands;
+
+/**
+ * DROP INDEX statement.
+ */
+public class DropIndexParams implements DdlCommandParams {
+    /** Creates parameters builder. */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /** Index name. */
+    protected String indexName;
+
+    /** Schema name where this new index will be created. */
+    protected String schema;
+
+    /**
+     * Returns index simple name.
+     */
+    public String indexName() {
+        return indexName;
+    }
+
+    /**
+     * Returns schema name.
+     */
+    public String schemaName() {
+        return schema;
+    }
+
+    /**
+     * Parameters builder.
+     */
+    public static class Builder {
+        private DropIndexParams params;
+
+        Builder() {
+            this.params = new DropIndexParams();
+        }
+
+        /**
+         * Sets schema name.
+         *
+         * @param schemaName Schema name.
+         * @return {@code this}.
+         */
+        public Builder schemaName(String schemaName) {
+            params.schema = schemaName;
+            return this;
+        }
+
+        /**
+         * Sets index simple name.
+         *
+         * @param indexName Index simple name.
+         * @return {@code this}.
+         */
+        public Builder indexName(String indexName) {
+            params.indexName = indexName;
+            return this;
+        }
+
+        /**
+         * Builds parameters.
+         *
+         * @return Parameters.
+         */
+        public DropIndexParams build() {
+            DropIndexParams params0 = params;
+            params = null;
+            return params0;
+        }
+    }
+}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/HashIndexDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/HashIndexDescriptor.java
index e741e10..14f6f8b 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/HashIndexDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/HashIndexDescriptor.java
@@ -35,11 +35,12 @@
      * @param id Id of the index.
      * @param name Name of the index.
      * @param tableId Id of the table index belongs to.
+     * @param unique Unique flag.
      * @param columns A list of indexed columns. Must not contains duplicates.
      * @throws IllegalArgumentException If columns list contains duplicates.
      */
-    public HashIndexDescriptor(int id, String name, int tableId, List<String> columns) {
-        super(id, name, tableId, true);
+    public HashIndexDescriptor(int id, String name, int tableId, boolean unique, List<String> columns) {
+        super(id, name, tableId, unique);
 
         this.columns = List.copyOf(Objects.requireNonNull(columns, "columns"));
     }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/IndexDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/IndexDescriptor.java
index 8b63d1d..55aa43d 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/IndexDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/IndexDescriptor.java
@@ -29,7 +29,7 @@
     private final int tableId;
 
     /** Unique constraint flag. */
-    private boolean unique;
+    private final boolean unique;
 
     /** Write only flag. {@code True} when index is building. */
     private boolean writeOnly;
@@ -40,10 +40,12 @@
         this.unique = unique;
     }
 
+    /** Gets table id. */
     public int tableId() {
         return tableId;
     }
 
+    /** Gets index unique flag. */
     public boolean unique() {
         return unique;
     }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/SortedIndexDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/SortedIndexDescriptor.java
index f99ec47..1060fd1 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/SortedIndexDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/SortedIndexDescriptor.java
@@ -36,11 +36,12 @@
      * @param id Id of the index.
      * @param name Name of the index.
      * @param tableId Id of the table index belongs to.
+     * @param unique Unique flag.
      * @param columns A list of columns descriptors.
      * @throws IllegalArgumentException If columns list contains duplicates or columns size doesn't match the collations size.
      */
-    public SortedIndexDescriptor(int id, String name, int tableId, List<IndexColumnDescriptor> columns) {
-        super(id, name, tableId, false);
+    public SortedIndexDescriptor(int id, String name, int tableId, boolean unique, List<IndexColumnDescriptor> columns) {
+        super(id, name, tableId, unique);
 
         this.columns = Objects.requireNonNull(columns, "columns");
     }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java
index 803397c..ab3026a 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java
@@ -78,6 +78,13 @@
         assert Set.copyOf(primaryKeyColumns).containsAll(colocationColumns);
     }
 
+    /**
+     * Returns column descriptor for column with given name.
+     */
+    public TableColumnDescriptor columnDescriptor(String columnName) {
+        return columnsMap.get(columnName);
+    }
+
     public int zoneId() {
         return zoneId;
     }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java
index 96691d5..cfdad99 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java
@@ -31,4 +31,10 @@
 
     /** This event is fired, when a column was added to or dropped from a table. */
     TABLE_ALTER,
+
+    /** This event is fired, when an index was created in Catalog. */
+    INDEX_CREATE,
+
+    /** This event is fired, when an index was dropped in Catalog. */
+    INDEX_DROP
 }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateIndexEventParameters.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateIndexEventParameters.java
new file mode 100644
index 0000000..297a25e
--- /dev/null
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateIndexEventParameters.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.events;
+
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+
+/**
+ * Create index event parameters that contains a newly created index descriptor.
+ */
+public class CreateIndexEventParameters extends CatalogEventParameters {
+
+    private final IndexDescriptor indexDescriptor;
+
+    /**
+     * Constructor.
+     *
+     * @param causalityToken Causality token.
+     * @param indexDescriptor Newly created index descriptor.
+     */
+    public CreateIndexEventParameters(long causalityToken, IndexDescriptor indexDescriptor) {
+        super(causalityToken);
+
+        this.indexDescriptor = indexDescriptor;
+    }
+
+    /**
+     * Gets index descriptor for a newly created index.
+     */
+    public IndexDescriptor indexDescriptor() {
+        return indexDescriptor;
+    }
+}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
new file mode 100644
index 0000000..135d46a
--- /dev/null
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.events;
+
+/**
+ * Drop index event parameters that contains an id of dropped index.
+ */
+public class DropIndexEventParameters extends CatalogEventParameters {
+
+    private final int indexId;
+
+    /**
+     * Constructor.
+     *
+     * @param causalityToken Causality token.
+     * @param indexId An id of dropped index.
+     */
+    public DropIndexEventParameters(long causalityToken, int indexId) {
+        super(causalityToken);
+
+        this.indexId = indexId;
+    }
+
+    /** Returns an id of dropped index. */
+    public int indexId() {
+        return indexId;
+    }
+}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
new file mode 100644
index 0000000..2efef88
--- /dev/null
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.storage;
+
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Describes deletion of an index.
+ */
+public class DropIndexEntry implements UpdateEntry {
+    private static final long serialVersionUID = -604729846502020728L;
+
+    private final int indexId;
+
+    /**
+     * Constructs the object.
+     *
+     * @param indexId An id of an index to drop.
+     */
+    public DropIndexEntry(int indexId) {
+        this.indexId = indexId;
+    }
+
+    /** Returns an id of an index to drop. */
+    public int indexId() {
+        return indexId;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
+}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
new file mode 100644
index 0000000..a3b85ab
--- /dev/null
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.storage;
+
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Describes addition of a new index.
+ */
+public class NewIndexEntry implements UpdateEntry {
+    private static final long serialVersionUID = 6717363577013237711L;
+
+    private final IndexDescriptor descriptor;
+
+    /**
+     * Constructs the object.
+     *
+     * @param descriptor A descriptor of an index to add.
+     */
+    public NewIndexEntry(IndexDescriptor descriptor) {
+        this.descriptor = descriptor;
+    }
+
+    /** Gets descriptor of an index to add. */
+    public IndexDescriptor descriptor() {
+        return descriptor;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
+}
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
index 08c1e3c..5ff386c 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
@@ -23,6 +23,7 @@
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -43,17 +44,25 @@
 import org.apache.ignite.internal.catalog.commands.AlterTableAddColumnParams;
 import org.apache.ignite.internal.catalog.commands.AlterTableDropColumnParams;
 import org.apache.ignite.internal.catalog.commands.ColumnParams;
+import org.apache.ignite.internal.catalog.commands.CreateHashIndexParams;
+import org.apache.ignite.internal.catalog.commands.CreateSortedIndexParams;
 import org.apache.ignite.internal.catalog.commands.CreateTableParams;
 import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.commands.DropIndexParams;
 import org.apache.ignite.internal.catalog.commands.DropTableParams;
+import org.apache.ignite.internal.catalog.descriptors.ColumnCollation;
+import org.apache.ignite.internal.catalog.descriptors.HashIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.SchemaDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.SortedIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
 import org.apache.ignite.internal.catalog.events.AddColumnEventParameters;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
 import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
 import org.apache.ignite.internal.catalog.events.DropColumnEventParameters;
+import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
 import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
 import org.apache.ignite.internal.catalog.storage.ObjectIdGenUpdateEntry;
 import org.apache.ignite.internal.catalog.storage.UpdateLog;
@@ -69,6 +78,8 @@
 import org.apache.ignite.lang.ColumnAlreadyExistsException;
 import org.apache.ignite.lang.ColumnNotFoundException;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IndexAlreadyExistsException;
+import org.apache.ignite.lang.IndexNotFoundException;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.lang.TableAlreadyExistsException;
 import org.apache.ignite.lang.TableNotFoundException;
@@ -91,6 +102,7 @@
     private static final String TABLE_NAME_2 = "myTable2";
     private static final String NEW_COLUMN_NAME = "NEWCOL";
     private static final String NEW_COLUMN_NAME_2 = "NEWCOL2";
+    private static final String INDEX_NAME = "myIndex";
 
     private MetaStorageManager metastore;
 
@@ -380,6 +392,7 @@
     @Test
     public void testDropIndexedColumn() {
         assertThat(service.createTable(simpleTable(TABLE_NAME)), willBe((Object) null));
+        assertThat(service.createIndex(simpleIndex(INDEX_NAME, TABLE_NAME)), willBe((Object) null));
 
         // Try to drop indexed column
         AlterTableDropColumnParams params = AlterTableDropColumnParams.builder()
@@ -388,9 +401,7 @@
                 .columns(Set.of("VAL"))
                 .build();
 
-        //TODO: uncomment "https://issues.apache.org/jira/browse/IGNITE-19460"
-        // assertThat(service.createIndex("CREATE INDEX myIndex ON myTable (VAL)"), willBe((Object) null));
-        // assertThat(service.dropColumn(params), willThrow(IllegalArgumentException.class));
+        assertThat(service.dropColumn(params), willThrow(SqlException.class));
 
         // Try to drop PK column
         params = AlterTableDropColumnParams.builder()
@@ -405,7 +416,7 @@
         SchemaDescriptor schema = service.activeSchema(System.currentTimeMillis());
         assertNotNull(schema);
         assertNotNull(schema.table(TABLE_NAME));
-        assertEquals(1, schema.version());
+        assertEquals(2, schema.version());
 
         assertNotNull(schema.table(TABLE_NAME).column("ID"));
         assertNotNull(schema.table(TABLE_NAME).column("VAL"));
@@ -481,6 +492,155 @@
     }
 
     @Test
+    public void testDropTableWithIndex() throws InterruptedException {
+        CreateHashIndexParams params = CreateHashIndexParams.builder()
+                .indexName(INDEX_NAME)
+                .tableName(TABLE_NAME)
+                .columns(List.of("VAL"))
+                .build();
+
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), willBe((Object) null));
+        assertThat(service.createIndex(params), willBe((Object) null));
+
+        long beforeDropTimestamp = System.currentTimeMillis();
+
+        Thread.sleep(5);
+
+        DropTableParams dropTableParams = DropTableParams.builder().schemaName("PUBLIC").tableName(TABLE_NAME).build();
+
+        assertThat(service.dropTable(dropTableParams), willBe((Object) null));
+
+        // Validate catalog version from the past.
+        SchemaDescriptor schema = service.schema(2);
+
+        assertNotNull(schema);
+        assertEquals(0, schema.id());
+        assertEquals(CatalogService.PUBLIC, schema.name());
+        assertEquals(2, schema.version());
+        assertSame(schema, service.activeSchema(beforeDropTimestamp));
+
+        assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME, beforeDropTimestamp));
+        assertSame(schema.table(TABLE_NAME), service.table(1, beforeDropTimestamp));
+
+        assertSame(schema.index(INDEX_NAME), service.index(INDEX_NAME, beforeDropTimestamp));
+        assertSame(schema.index(INDEX_NAME), service.index(2, beforeDropTimestamp));
+
+        // Validate actual catalog
+        schema = service.schema(3);
+
+        assertNotNull(schema);
+        assertEquals(0, schema.id());
+        assertEquals(CatalogService.PUBLIC, schema.name());
+        assertEquals(3, schema.version());
+        assertSame(schema, service.activeSchema(System.currentTimeMillis()));
+
+        assertNull(schema.table(TABLE_NAME));
+        assertNull(service.table(TABLE_NAME, System.currentTimeMillis()));
+        assertNull(service.table(1, System.currentTimeMillis()));
+
+        assertNull(schema.index(INDEX_NAME));
+        assertNull(service.index(INDEX_NAME, System.currentTimeMillis()));
+        assertNull(service.index(2, System.currentTimeMillis()));
+    }
+
+    @Test
+    public void testCreateHashIndex() {
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), willBe((Object) null));
+
+        CreateHashIndexParams params = CreateHashIndexParams.builder()
+                .indexName(INDEX_NAME)
+                .tableName(TABLE_NAME)
+                .columns(List.of("VAL", "ID"))
+                .build();
+
+        assertThat(service.createIndex(params), willBe((Object) null));
+
+        // Validate catalog version from the past.
+        SchemaDescriptor schema = service.schema(1);
+
+        assertNotNull(schema);
+        assertNull(schema.index(INDEX_NAME));
+        assertNull(service.index(INDEX_NAME, 123L));
+        assertNull(service.index(2, 123L));
+
+        // Validate actual catalog
+        schema = service.schema(2);
+
+        assertNotNull(schema);
+        assertNull(service.index(1, System.currentTimeMillis()));
+        assertSame(schema.index(INDEX_NAME), service.index(INDEX_NAME, System.currentTimeMillis()));
+        assertSame(schema.index(INDEX_NAME), service.index(2, System.currentTimeMillis()));
+
+        // Validate newly created hash index
+        HashIndexDescriptor index = (HashIndexDescriptor) schema.index(INDEX_NAME);
+
+        assertEquals(2L, index.id());
+        assertEquals(INDEX_NAME, index.name());
+        assertEquals(schema.table(TABLE_NAME).id(), index.tableId());
+        assertEquals(List.of("VAL", "ID"), index.columns());
+        assertFalse(index.unique());
+        assertFalse(index.writeOnly());
+    }
+
+    @Test
+    public void testCreateSortedIndex() {
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), willBe((Object) null));
+
+        CreateSortedIndexParams params = CreateSortedIndexParams.builder()
+                .indexName(INDEX_NAME)
+                .tableName(TABLE_NAME)
+                .unique()
+                .columns(List.of("VAL", "ID"))
+                .collations(List.of(ColumnCollation.DESC_NULLS_FIRST, ColumnCollation.ASC_NULLS_LAST))
+                .build();
+
+        assertThat(service.createIndex(params), willBe((Object) null));
+
+        // Validate catalog version from the past.
+        SchemaDescriptor schema = service.schema(1);
+
+        assertNotNull(schema);
+        assertNull(schema.index(INDEX_NAME));
+        assertNull(service.index(INDEX_NAME, 123L));
+        assertNull(service.index(2, 123L));
+
+        // Validate actual catalog
+        schema = service.schema(2);
+
+        assertNotNull(schema);
+        assertNull(service.index(1, System.currentTimeMillis()));
+        assertSame(schema.index(INDEX_NAME), service.index(INDEX_NAME, System.currentTimeMillis()));
+        assertSame(schema.index(INDEX_NAME), service.index(2, System.currentTimeMillis()));
+
+        // Validate newly created sorted index
+        SortedIndexDescriptor index = (SortedIndexDescriptor) schema.index(INDEX_NAME);
+
+        assertEquals(2L, index.id());
+        assertEquals(INDEX_NAME, index.name());
+        assertEquals(schema.table(TABLE_NAME).id(), index.tableId());
+        assertEquals("VAL", index.columns().get(0).name());
+        assertEquals("ID", index.columns().get(1).name());
+        assertEquals(ColumnCollation.DESC_NULLS_FIRST, index.columns().get(0).collation());
+        assertEquals(ColumnCollation.ASC_NULLS_LAST, index.columns().get(1).collation());
+        assertTrue(index.unique());
+        assertFalse(index.writeOnly());
+    }
+
+    @Test
+    public void testCreateIndexWithSameName() {
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), willBe((Object) null));
+
+        CreateHashIndexParams params = CreateHashIndexParams.builder()
+                .indexName(INDEX_NAME)
+                .tableName(TABLE_NAME)
+                .columns(List.of("VAL"))
+                .build();
+
+        assertThat(service.createIndex(params), willBe((Object) null));
+        assertThat(service.createIndex(params), willThrow(IndexAlreadyExistsException.class));
+    }
+
+    @Test
     public void operationWillBeRetriedFiniteAmountOfTimes() {
         UpdateLog updateLogMock = Mockito.mock(UpdateLog.class);
 
@@ -532,7 +692,7 @@
 
     @Test
     public void testTableEvents() {
-        CreateTableParams params = CreateTableParams.builder()
+        CreateTableParams createTableParams = CreateTableParams.builder()
                 .schemaName(SCHEMA_NAME)
                 .tableName(TABLE_NAME)
                 .zone(ZONE_NAME)
@@ -545,25 +705,76 @@
                 .colocationColumns(List.of("key2"))
                 .build();
 
+        DropTableParams dropTableparams = DropTableParams.builder().tableName(TABLE_NAME).build();
+
         EventListener<CatalogEventParameters> eventListener = Mockito.mock(EventListener.class);
         when(eventListener.notify(any(), any())).thenReturn(completedFuture(false));
 
         service.listen(CatalogEvent.TABLE_CREATE, eventListener);
         service.listen(CatalogEvent.TABLE_DROP, eventListener);
 
-        CompletableFuture<Void> fut = service.createTable(params);
-
-        assertThat(fut, willBe((Object) null));
-
+        assertThat(service.createTable(createTableParams), willBe((Object) null));
         verify(eventListener).notify(any(CreateTableEventParameters.class), ArgumentMatchers.isNull());
 
+        assertThat(service.dropTable(dropTableparams), willBe((Object) null));
+        verify(eventListener).notify(any(DropTableEventParameters.class), ArgumentMatchers.isNull());
+
+        verifyNoMoreInteractions(eventListener);
+    }
+
+    @Test
+    public void testCreateIndexEvents() {
+        CreateTableParams createTableParams = CreateTableParams.builder()
+                .schemaName(CatalogService.PUBLIC)
+                .tableName(TABLE_NAME)
+                .zone("ZONE")
+                .columns(List.of(
+                        ColumnParams.builder().name("key1").type(ColumnType.INT32).build(),
+                        ColumnParams.builder().name("key2").type(ColumnType.INT32).build(),
+                        ColumnParams.builder().name("val").type(ColumnType.INT32).nullable(true).build()
+                ))
+                .primaryKeyColumns(List.of("key1", "key2"))
+                .colocationColumns(List.of("key2"))
+                .build();
+
         DropTableParams dropTableparams = DropTableParams.builder().tableName(TABLE_NAME).build();
 
-        fut = service.dropTable(dropTableparams);
+        CreateHashIndexParams createIndexParams = CreateHashIndexParams.builder()
+                .schemaName(CatalogService.PUBLIC)
+                .indexName(INDEX_NAME)
+                .tableName(TABLE_NAME)
+                .columns(List.of("key2"))
+                .build();
 
-        assertThat(fut, willBe((Object) null));
+        DropIndexParams dropIndexParams = DropIndexParams.builder().indexName(INDEX_NAME).build();
 
-        verify(eventListener).notify(any(DropTableEventParameters.class), ArgumentMatchers.isNull());
+        EventListener<CatalogEventParameters> eventListener = Mockito.mock(EventListener.class);
+        when(eventListener.notify(any(), any())).thenReturn(completedFuture(false));
+
+        service.listen(CatalogEvent.INDEX_CREATE, eventListener);
+        service.listen(CatalogEvent.INDEX_DROP, eventListener);
+
+        // Try to create index without table.
+        assertThat(service.createIndex(createIndexParams), willThrow(TableNotFoundException.class));
+        verifyNoInteractions(eventListener);
+
+        // Create table.
+        assertThat(service.createTable(createTableParams), willBe((Object) null));
+
+        // Create index.
+        assertThat(service.createIndex(createIndexParams), willBe((Object) null));
+        verify(eventListener).notify(any(CreateIndexEventParameters.class), ArgumentMatchers.isNull());
+
+        // Drop index.
+        assertThat(service.dropIndex(dropIndexParams), willBe((Object) null));
+        verify(eventListener).notify(any(DropIndexEventParameters.class), ArgumentMatchers.isNull());
+
+        // Drop table.
+        assertThat(service.dropTable(dropTableparams), willBe((Object) null));
+
+        // Try drop index once again.
+        assertThat(service.dropIndex(dropIndexParams), willThrow(IndexNotFoundException.class));
+
         verifyNoMoreInteractions(eventListener);
     }
 
@@ -625,4 +836,14 @@
                 .primaryKeyColumns(List.of("ID"))
                 .build();
     }
+
+    private static CreateSortedIndexParams simpleIndex(String indexName, String tableName) {
+        return CreateSortedIndexParams.builder()
+                .indexName(indexName)
+                .tableName(tableName)
+                .unique()
+                .columns(List.of("VAL"))
+                .collations(List.of(ColumnCollation.ASC_NULLS_LAST))
+                .build();
+    }
 }
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
index a3eac69..98a5800 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
@@ -471,12 +471,14 @@
         cols.add(SchemaBuilders.column("valInt", ColumnType.INT32).asNullable(true).build());
         cols.add(SchemaBuilders.column("valStr", ColumnType.string()).withDefaultValue("default").build());
 
-        return await(((TableManager) node.tables()).createTableAsync(
-                tableName,
-                DEFAULT_ZONE_NAME,
-                tblCh -> convert(SchemaBuilders.tableBuilder(SCHEMA, tableName).columns(
-                        cols).withPrimaryKey("key").build(), tblCh)
-        ));
+        var tmpl = "CREATE TABLE %s (key BIGINT PRIMARY KEY, valInt INT, valStr VARCHAR)";
+        var sql = String.format(tmpl, tableName);
+
+        try (Session ses = node.sql().createSession()) {
+            ses.execute(null, sql);
+        }
+
+        return node.tables().table(tableName);
     }
 
     /**
@@ -602,7 +604,7 @@
      */
     protected void addIndexIfNotExists(Ignite node, String tableName) {
         try (Session ses = node.sql().createSession()) {
-            ses.execute(null, String.format("CREATE INDEX IF NOT EXISTS testHI ON %s (CAT_ID)", tableName));
+            ses.execute(null, String.format("CREATE INDEX IF NOT EXISTS testHI ON %s (valInt)", tableName));
         }
     }
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerWrapper.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerWrapper.java
index 72cf3f2..06830dc 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerWrapper.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerWrapper.java
@@ -20,15 +20,22 @@
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.commands.AbstractIndexCommandParams;
+import org.apache.ignite.internal.catalog.commands.CreateHashIndexParams;
+import org.apache.ignite.internal.catalog.commands.CreateSortedIndexParams;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.index.IndexManager;
 import org.apache.ignite.internal.sql.engine.prepare.ddl.AlterTableAddCommand;
 import org.apache.ignite.internal.sql.engine.prepare.ddl.AlterTableDropCommand;
+import org.apache.ignite.internal.sql.engine.prepare.ddl.CreateIndexCommand;
 import org.apache.ignite.internal.sql.engine.prepare.ddl.CreateTableCommand;
 import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlCommand;
+import org.apache.ignite.internal.sql.engine.prepare.ddl.DropIndexCommand;
 import org.apache.ignite.internal.sql.engine.prepare.ddl.DropTableCommand;
 import org.apache.ignite.internal.storage.DataStorageManager;
 import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.lang.IndexAlreadyExistsException;
+import org.apache.ignite.lang.IndexNotFoundException;
 import org.apache.ignite.lang.TableAlreadyExistsException;
 import org.apache.ignite.lang.TableNotFoundException;
 
@@ -86,6 +93,21 @@
                     .thenCompose(res -> catalogManager.dropColumn(DdlToCatalogCommandConverter.convert(dropCommand))
                             .handle(handleModificationResult(dropCommand.ifTableExists(), TableNotFoundException.class))
                     );
+        } else if (cmd instanceof CreateIndexCommand) {
+            return ddlCommandFuture
+                    .thenCompose(res -> {
+                        AbstractIndexCommandParams params = DdlToCatalogCommandConverter.convert((CreateIndexCommand) cmd);
+                        if (params instanceof CreateSortedIndexParams) {
+                            return catalogManager.createIndex((CreateSortedIndexParams) params);
+                        } else {
+                            return catalogManager.createIndex((CreateHashIndexParams) params);
+                        }
+                    }).handle(handleModificationResult(((CreateIndexCommand) cmd).ifNotExists(), IndexAlreadyExistsException.class));
+        } else if (cmd instanceof DropIndexCommand) {
+            return ddlCommandFuture
+                    .thenCompose(res -> catalogManager.dropIndex(DdlToCatalogCommandConverter.convert((DropIndexCommand) cmd))
+                            .handle(handleModificationResult(((DropIndexCommand) cmd).ifNotExists(), IndexNotFoundException.class))
+                    );
         }
 
         return ddlCommandFuture;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlToCatalogCommandConverter.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlToCatalogCommandConverter.java
index fadf9ec..f7c4d5f 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlToCatalogCommandConverter.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlToCatalogCommandConverter.java
@@ -19,18 +19,26 @@
 
 import java.util.List;
 import java.util.stream.Collectors;
+import org.apache.ignite.internal.catalog.commands.AbstractIndexCommandParams;
 import org.apache.ignite.internal.catalog.commands.AlterTableAddColumnParams;
 import org.apache.ignite.internal.catalog.commands.AlterTableDropColumnParams;
 import org.apache.ignite.internal.catalog.commands.ColumnParams;
+import org.apache.ignite.internal.catalog.commands.CreateHashIndexParams;
+import org.apache.ignite.internal.catalog.commands.CreateSortedIndexParams;
 import org.apache.ignite.internal.catalog.commands.CreateTableParams;
 import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.commands.DropIndexParams;
 import org.apache.ignite.internal.catalog.commands.DropTableParams;
+import org.apache.ignite.internal.catalog.descriptors.ColumnCollation;
 import org.apache.ignite.internal.sql.engine.prepare.ddl.AlterTableAddCommand;
 import org.apache.ignite.internal.sql.engine.prepare.ddl.AlterTableDropCommand;
 import org.apache.ignite.internal.sql.engine.prepare.ddl.ColumnDefinition;
+import org.apache.ignite.internal.sql.engine.prepare.ddl.CreateIndexCommand;
 import org.apache.ignite.internal.sql.engine.prepare.ddl.CreateTableCommand;
 import org.apache.ignite.internal.sql.engine.prepare.ddl.DefaultValueDefinition;
+import org.apache.ignite.internal.sql.engine.prepare.ddl.DropIndexCommand;
 import org.apache.ignite.internal.sql.engine.prepare.ddl.DropTableCommand;
+import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 
 /**
@@ -83,6 +91,43 @@
     }
 
 
+    static AbstractIndexCommandParams convert(CreateIndexCommand cmd) {
+        switch (cmd.type()) {
+            case HASH:
+                return CreateHashIndexParams.builder()
+                        .schemaName(cmd.schemaName())
+                        .indexName(cmd.indexName())
+
+                        .tableName(cmd.tableName())
+                        .columns(cmd.columns())
+
+                        .build();
+            case SORTED:
+                List<ColumnCollation> collations = cmd.collations().stream()
+                        .map(DdlToCatalogCommandConverter::convert)
+                        .collect(Collectors.toList());
+
+                return CreateSortedIndexParams.builder()
+                        .schemaName(cmd.schemaName())
+                        .indexName(cmd.indexName())
+
+                        .tableName(cmd.tableName())
+                        .columns(cmd.columns())
+                        .collations(collations)
+
+                        .build();
+            default:
+                throw new IllegalArgumentException("Unsupported index type: " + cmd.type());
+        }
+    }
+
+    static DropIndexParams convert(DropIndexCommand cmd) {
+        return DropIndexParams.builder()
+                .schemaName(cmd.schemaName())
+                .indexName(cmd.indexName())
+                .build();
+    }
+
     private static ColumnParams convert(ColumnDefinition def) {
         return ColumnParams.builder()
                 .name(def.name())
@@ -104,4 +149,8 @@
                 throw new IllegalArgumentException("Default value definition: " + def.type());
         }
     }
+
+    private static ColumnCollation convert(IgniteIndex.Collation collation) {
+        return ColumnCollation.get(collation.asc, collation.nullsFirst);
+    }
 }
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchemaTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchemaTest.java
index fbe2ce2..ae1bd92 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchemaTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchemaTest.java
@@ -47,7 +47,7 @@
 
     @NotNull
     private static HashIndexDescriptor someIndex(int id, String name) {
-        return new HashIndexDescriptor(id, name, 1, List.of("a"));
+        return new HashIndexDescriptor(id, name, 1, true, List.of("a"));
     }
 
     @NotNull