// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.impala.service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.KuduColumn;
import org.apache.impala.catalog.KuduTable;
import org.apache.impala.catalog.Table;
import org.apache.impala.catalog.TableNotFoundException;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.Pair;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.thrift.TAlterTableAddDropRangePartitionParams;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.TCreateTableParams;
import org.apache.impala.thrift.TKuduPartitionParam;
import org.apache.impala.thrift.TRangePartition;
import org.apache.impala.thrift.TRangePartitionOperationType;
import org.apache.impala.util.KuduUtil;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
import org.apache.kudu.Schema;
import org.apache.kudu.client.AlterTableOptions;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.RangePartitionBound;
import org.apache.kudu.util.DecimalUtil;
import org.apache.log4j.Logger;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

/**
 * This is a helper for the CatalogOpExecutor to provide Kudu related DDL functionality
 * such as creating and dropping tables from Kudu.
 */
public class KuduCatalogOpExecutor {
  public static final Logger LOG = Logger.getLogger(KuduCatalogOpExecutor.class);

  private static final Object kuduDdlLock_ = new Object();

  /**
   * Create a table in Kudu with a schema equivalent to the schema stored in 'msTbl'.
   * Throws an exception if 'msTbl' represents an external table or if the table couldn't
   * be created in Kudu.
   */
  static void createSynchronizedTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
      TCreateTableParams params) throws ImpalaRuntimeException {
    Preconditions.checkState(KuduTable.isSynchronizedTable(msTbl));
    Preconditions.checkState(
        msTbl.getParameters().get(KuduTable.KEY_TABLE_ID) == null);
    String kuduTableName = msTbl.getParameters().get(KuduTable.KEY_TABLE_NAME);
    String masterHosts = msTbl.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
    if (LOG.isTraceEnabled()) {
      LOG.trace(String.format("Creating table '%s' in master '%s'", kuduTableName,
          masterHosts));
    }
    KuduClient kudu = KuduUtil.getKuduClient(masterHosts);
    try {
      // Acquire lock to protect table existence check and table creation, see IMPALA-8984
      synchronized (kuduDdlLock_) {
        // TODO: The IF NOT EXISTS case should be handled by Kudu to ensure atomicity.
        // (see KUDU-1710).
        boolean tableExists = kudu.tableExists(kuduTableName);
        if (tableExists && params.if_not_exists) return;

        // if table is managed or external with external.purge.table = true in
        // tblproperties we should create the Kudu table if it does not exist
        if (tableExists) {
          throw new ImpalaRuntimeException(String.format(
              "Table '%s' already exists in Kudu.", kuduTableName));
        }
        Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName));
        Schema schema = createTableSchema(params);
        CreateTableOptions tableOpts = buildTableOptions(msTbl, params, schema);
        org.apache.kudu.client.KuduTable table =
            kudu.createTable(kuduTableName, schema, tableOpts);
        // Populate table ID from Kudu table if Kudu's integration with the Hive
        // Metastore is enabled.
        if (KuduTable.isHMSIntegrationEnabled(masterHosts)) {
          String tableId = table.getTableId();
          Preconditions.checkNotNull(tableId);
          msTbl.getParameters().put(KuduTable.KEY_TABLE_ID, tableId);
        }
      }
    } catch (Exception e) {
      throw new ImpalaRuntimeException(String.format("Error creating Kudu table '%s'",
          kuduTableName), e);
    }
  }

  private static ColumnSchema createColumnSchema(TColumn column, boolean isKey)
      throws ImpalaRuntimeException {
    Type type = Type.fromThrift(column.getColumnType());
    Preconditions.checkState(type != null);
    org.apache.kudu.Type kuduType = KuduUtil.fromImpalaType(type);

    ColumnSchemaBuilder csb = new ColumnSchemaBuilder(column.getColumnName(), kuduType);
    csb.key(isKey);
    if (column.isSetIs_nullable()) {
      // If nullability is explicitly set and the column is a key, it must have been
      // set as NOT NULL. This is the default, but it is also valid to specify it.
      Preconditions.checkArgument(!isKey || !column.isIs_nullable());
      csb.nullable(column.isIs_nullable());
    } else {
      // Non-key columns are by default nullable unless the user explicitly sets its
      // nullability. Key columns are not nullable.
      csb.nullable(!isKey);
    }
    if (column.isSetDefault_value()) {
      csb.defaultValue(KuduUtil.getKuduDefaultValue(
          column.getDefault_value(), type, column.getColumnName()));
    }
    if (column.isSetBlock_size()) csb.desiredBlockSize(column.getBlock_size());
    if (column.isSetEncoding()) {
      csb.encoding(KuduUtil.fromThrift(column.getEncoding()));
    }
    if (column.isSetCompression()) {
      csb.compressionAlgorithm(KuduUtil.fromThrift(column.getCompression()));
    }
    if (type.isDecimal()) {
      csb.typeAttributes(
          DecimalUtil.typeAttributes(type.getPrecision(), type.getDecimalDigits()));
    }
    if (column.isSetComment() && !column.getComment().isEmpty()) {
      csb.comment(column.getComment());
    }
    return csb.build();
  }

  /**
   * Creates the schema of a new Kudu table.
   */
  private static Schema createTableSchema(TCreateTableParams params)
      throws ImpalaRuntimeException {
    List<String> keyColNames = params.getPrimary_key_column_names();
    Preconditions.checkState(!keyColNames.isEmpty());

    // Check that the key columns are listed first in the Kudu schema and in the
    // same order as in the PRIMARY KEY definition.
    List<String> colNames = ImmutableList.copyOf(Iterables.transform(params.getColumns(),
        TColumn::getColumnName));
    List<String> leadingColNames = colNames.subList(0, keyColNames.size());

    if (!leadingColNames.equals(keyColNames)) {
      throw new ImpalaRuntimeException(String.format(
          "Kudu PRIMARY KEY columns must be specified as the first columns " +
          "in the table (expected leading columns (%s) but found (%s))",
          PrintUtils.joinQuoted(keyColNames),
          PrintUtils.joinQuoted(leadingColNames)));
    }

    List<ColumnSchema> colSchemas = new ArrayList<>(params.getColumnsSize());
    for (TColumn column: params.getColumns()) {
      boolean isKey = colSchemas.size() < keyColNames.size();
      colSchemas.add(createColumnSchema(column, isKey));
    }
    return new Schema(colSchemas);
  }

  /**
   * Builds the table options of a new Kudu table.
   */
  private static CreateTableOptions buildTableOptions(
      org.apache.hadoop.hive.metastore.api.Table msTbl,
      TCreateTableParams params, Schema schema) throws ImpalaRuntimeException {
    CreateTableOptions tableOpts = new CreateTableOptions();
    // Set the partitioning schemes
    List<TKuduPartitionParam> partitionParams = params.getPartition_by();
    if (partitionParams != null) {
      boolean hasRangePartitioning = false;
      for (TKuduPartitionParam partParam: partitionParams) {
        if (partParam.isSetBy_hash_param()) {
          Preconditions.checkState(!partParam.isSetBy_range_param());
          tableOpts.addHashPartitions(partParam.getBy_hash_param().getColumns(),
              partParam.getBy_hash_param().getNum_partitions());
        } else {
          Preconditions.checkState(partParam.isSetBy_range_param());
          hasRangePartitioning = true;
          List<String> rangePartitionColumns = partParam.getBy_range_param().getColumns();
          tableOpts.setRangePartitionColumns(rangePartitionColumns);
          for (TRangePartition rangePartition:
               partParam.getBy_range_param().getRange_partitions()) {
            List<Pair<PartialRow, RangePartitionBound>> rangeBounds =
                getRangePartitionBounds(rangePartition, schema, rangePartitionColumns);
            Preconditions.checkState(rangeBounds.size() == 2);
            Pair<PartialRow, RangePartitionBound> lowerBound = rangeBounds.get(0);
            Pair<PartialRow, RangePartitionBound> upperBound = rangeBounds.get(1);
            tableOpts.addRangePartition(lowerBound.first, upperBound.first,
                lowerBound.second, upperBound.second);
          }
        }
      }
      // If no range-based partitioning is specified in a CREATE TABLE statement, Kudu
      // generates one by default that includes all the primary key columns. To prevent
      // this from happening, explicitly set the range partition columns to be
      // an empty list.
      if (!hasRangePartitioning) {
        tableOpts.setRangePartitionColumns(Collections.<String>emptyList());
      }
    } else {
      // This table is unpartitioned, which Kudu represents as a table range partitioned
      // on no columns.
      tableOpts.setRangePartitionColumns(Collections.<String>emptyList());
    }

    // Set the number of table replicas, if specified.
    String replication = msTbl.getParameters().get(KuduTable.KEY_TABLET_REPLICAS);
    if (!Strings.isNullOrEmpty(replication)) {
      int parsedReplicas = -1;
      try {
        parsedReplicas = Integer.parseInt(replication);
        Preconditions.checkState(parsedReplicas > 0,
            "Invalid number of replicas table property:" + replication);
      } catch (Exception e) {
        throw new ImpalaRuntimeException(String.format("Invalid number of table " +
            "replicas specified: '%s'", replication));
      }
      tableOpts.setNumReplicas(parsedReplicas);
    }

    // Set the table's owner.
    tableOpts.setOwner(msTbl.getOwner());
    return tableOpts;
  }

  /**
   * Drops the table in Kudu. If the table does not exist and 'ifExists' is false, a
   * TableNotFoundException is thrown. If the table exists and could not be dropped,
   * an ImpalaRuntimeException is thrown.
   */
  static void dropTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
      boolean ifExists) throws ImpalaRuntimeException, TableNotFoundException {
    Preconditions.checkState(KuduTable.isSynchronizedTable(msTbl));
    String tableName = msTbl.getParameters().get(KuduTable.KEY_TABLE_NAME);
    String masterHosts = msTbl.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
    if (LOG.isTraceEnabled()) {
      LOG.trace(String.format("Dropping table '%s' from master '%s'", tableName,
          masterHosts));
    }
    KuduClient kudu = KuduUtil.getKuduClient(masterHosts);
    try {
      Preconditions.checkState(!Strings.isNullOrEmpty(tableName));
      // TODO: The IF EXISTS case should be handled by Kudu to ensure atomicity.
      // (see KUDU-1710).
      if (kudu.tableExists(tableName)) {
        kudu.deleteTable(tableName);
      } else if (!ifExists) {
        throw new TableNotFoundException(String.format(
            "Table '%s' does not exist in Kudu master(s) '%s'.", tableName, masterHosts));
      }
    } catch (Exception e) {
      throw new ImpalaRuntimeException(String.format("Error dropping table '%s'",
          tableName), e);
    }
  }

  /**
   * Reads the column definitions from a Kudu table and populates 'msTbl' with
   * an equivalent schema for external tables. Throws an exception if any errors
   * are encountered.
   */
  public static void populateExternalTableColsFromKudu(
      org.apache.hadoop.hive.metastore.api.Table msTbl) throws ImpalaRuntimeException {
    org.apache.hadoop.hive.metastore.api.Table msTblCopy = msTbl.deepCopy();
    List<FieldSchema> cols = msTblCopy.getSd().getCols();
    // External table should not have table ID.
    Preconditions.checkState(Table.isExternalTable(msTbl));
    Preconditions.checkState(
        msTblCopy.getParameters().get(KuduTable.KEY_TABLE_ID) == null);
    String kuduTableName = msTblCopy.getParameters().get(KuduTable.KEY_TABLE_NAME);
    Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName));
    String masterHosts = msTblCopy.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
    if (LOG.isTraceEnabled()) {
      LOG.trace(String.format("Loading schema of table '%s' from master '%s'",
          kuduTableName, masterHosts));
    }
    KuduClient kudu = KuduUtil.getKuduClient(masterHosts);
    try {
      if (!kudu.tableExists(kuduTableName)) {
        throw new ImpalaRuntimeException(String.format("Table does not exist in Kudu: " +
            "'%s'", kuduTableName));
      }
      org.apache.kudu.client.KuduTable kuduTable = kudu.openTable(kuduTableName);
      // Replace the columns in the Metastore table with the columns from the recently
      // accessed Kudu schema.
      cols.clear();
      Set<String> lowerCaseColNames = Sets.newHashSet();
      for (ColumnSchema colSchema : kuduTable.getSchema().getColumns()) {
        if (!lowerCaseColNames.add(colSchema.getName().toLowerCase())) {
          throw new ImpalaRuntimeException(String.format(
              "Error loading Kudu table: Impala does not support column names that " +
              "differ only in casing '%s'", colSchema.getName()));
        }
        Type type =
            KuduUtil.toImpalaType(colSchema.getType(), colSchema.getTypeAttributes());
        String comment =
            !colSchema.getComment().isEmpty() ? colSchema.getComment() : null;
        cols.add(new FieldSchema(colSchema.getName(), type.toSql().toLowerCase(),
            comment));
      }
    } catch (Exception e) {
      throw new ImpalaRuntimeException(String.format("Error loading schema of table " +
          "'%s'", kuduTableName), e);
    }
    List<FieldSchema> newCols = msTbl.getSd().getCols();
    newCols.clear();
    newCols.addAll(cols);
  }

  /**
   * Validates the table properties of a Kudu table. It checks that the master
   * addresses point to valid Kudu masters and that the table exists.
   * Throws an ImpalaRuntimeException if this is not the case.
   */
  public static void validateKuduTblExists(
      org.apache.hadoop.hive.metastore.api.Table msTbl) throws ImpalaRuntimeException {
    Preconditions.checkArgument(KuduTable.isKuduTable(msTbl));

    Map<String, String> properties = msTbl.getParameters();
    String masterHosts = properties.get(KuduTable.KEY_MASTER_HOSTS);
    Preconditions.checkState(!Strings.isNullOrEmpty(masterHosts));
    String kuduTableName = properties.get(KuduTable.KEY_TABLE_NAME);
    Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName));
    KuduClient kudu = KuduUtil.getKuduClient(masterHosts);
    try {
      kudu.tableExists(kuduTableName);
    } catch (Exception e) {
      // TODO: This is misleading when there are other errors, e.g. timeouts.
      throw new ImpalaRuntimeException(String.format("Kudu table '%s' does not exist " +
          "on master '%s'", kuduTableName, masterHosts), e);
    }
  }

  /**
   * Renames a Kudu table.
   */
  public static void renameTable(KuduTable tbl, String newName)
      throws ImpalaRuntimeException {
    Preconditions.checkState(!Strings.isNullOrEmpty(newName));
    AlterTableOptions alterTableOptions = new AlterTableOptions();
    alterTableOptions.renameTable(newName);
    String errMsg = String.format("Error renaming Kudu table " +
        "%s to %s", tbl.getKuduTableName(), newName);
    KuduClient client = KuduUtil.getKuduClient(tbl.getKuduMasterHosts());
    try {
      client.alterTable(tbl.getKuduTableName(), alterTableOptions);
      if (!client.isAlterTableDone(newName)) {
        throw new ImpalaRuntimeException(errMsg + ": Kudu operation timed out");
      }
    } catch (KuduException e) {
      throw new ImpalaRuntimeException(errMsg, e);
    }
  }

  /**
   * Adds/drops a range partition.
   */
  public static void addDropRangePartition(KuduTable tbl,
      TAlterTableAddDropRangePartitionParams params) throws ImpalaRuntimeException {
    TRangePartition rangePartition = params.getRange_partition_spec();
    List<Pair<PartialRow, RangePartitionBound>> rangeBounds =
        getRangePartitionBounds(rangePartition, tbl);
    Preconditions.checkState(rangeBounds.size() == 2);
    Pair<PartialRow, RangePartitionBound> lowerBound = rangeBounds.get(0);
    Pair<PartialRow, RangePartitionBound> upperBound = rangeBounds.get(1);
    AlterTableOptions alterTableOptions = new AlterTableOptions();
    TRangePartitionOperationType type = params.getType();
    if (type == TRangePartitionOperationType.ADD) {
      alterTableOptions.addRangePartition(lowerBound.first, upperBound.first,
          lowerBound.second, upperBound.second);
    } else {
      alterTableOptions.dropRangePartition(lowerBound.first, upperBound.first,
          lowerBound.second, upperBound.second);
    }
    String errMsg = String.format("Error %s range partition in " +
        "table %s", (type == TRangePartitionOperationType.ADD ? "adding" : "dropping"),
        tbl.getName());
    try {
      alterKuduTable(tbl, alterTableOptions, errMsg);
    } catch (ImpalaRuntimeException e) {
      if (!params.isIgnore_errors()) throw e;
    }
  }

  private static List<Pair<PartialRow, RangePartitionBound>> getRangePartitionBounds(
      TRangePartition rangePartition, KuduTable tbl) throws ImpalaRuntimeException {
    List<String> rangePartitioningColNames =
        FeKuduTable.Utils.getRangePartitioningColNames(tbl);
    List<String> rangePartitioningKuduColNames =
      Lists.newArrayListWithCapacity(rangePartitioningColNames.size());
    for (String colName : rangePartitioningColNames) {
      rangePartitioningKuduColNames.add(((KuduColumn)tbl.getColumn(colName)).getKuduName());
    }
    return getRangePartitionBounds(rangePartition, tbl.getKuduSchema(),
        rangePartitioningKuduColNames);
  }

  /**
   * Returns the bounds of a range partition in two <PartialRow, RangePartitionBound>
   * pairs to be used in Kudu API calls for ALTER and CREATE TABLE statements.
   * 'rangePartitioningColNames' must be specified in Kudu case.
   */
  private static List<Pair<PartialRow, RangePartitionBound>> getRangePartitionBounds(
      TRangePartition rangePartition, Schema schema,
      List<String> rangePartitioningColNames) throws ImpalaRuntimeException {
    Preconditions.checkNotNull(schema);
    Preconditions.checkState(!rangePartitioningColNames.isEmpty());
    Preconditions.checkState(rangePartition.isSetLower_bound_values()
        || rangePartition.isSetUpper_bound_values());
    List<Pair<PartialRow, RangePartitionBound>> rangeBounds =
        Lists.newArrayListWithCapacity(2);
    Pair<PartialRow, RangePartitionBound> lowerBound =
        KuduUtil.buildRangePartitionBound(schema, rangePartitioningColNames,
        rangePartition.getLower_bound_values(),
        rangePartition.isIs_lower_bound_inclusive());
    rangeBounds.add(lowerBound);
    Pair<PartialRow, RangePartitionBound> upperBound =
        KuduUtil.buildRangePartitionBound(schema, rangePartitioningColNames,
        rangePartition.getUpper_bound_values(),
        rangePartition.isIs_upper_bound_inclusive());
    rangeBounds.add(upperBound);
    return rangeBounds;
  }

  /**
   * Adds a column to an existing Kudu table.
   */
  public static void addColumn(KuduTable tbl, List<TColumn> columns)
      throws ImpalaRuntimeException {
    AlterTableOptions alterTableOptions = new AlterTableOptions();
    for (TColumn column: columns) {
      alterTableOptions.addColumn(createColumnSchema(column, false));
    }
    String errMsg = "Error adding columns to Kudu table " + tbl.getName();
    alterKuduTable(tbl, alterTableOptions, errMsg);
  }

  /**
   * Drops a column from a Kudu table.
   */
  public static void dropColumn(KuduTable tbl, String colName)
      throws ImpalaRuntimeException {
    Preconditions.checkState(!Strings.isNullOrEmpty(colName));
    KuduColumn col = (KuduColumn) tbl.getColumn(colName);
    AlterTableOptions alterTableOptions = new AlterTableOptions();
    alterTableOptions.dropColumn(col.getKuduName());
    String errMsg = String.format("Error dropping column %s from " +
        "Kudu table %s", colName, tbl.getName());
    alterKuduTable(tbl, alterTableOptions, errMsg);
  }

  /**
   * Updates the column matching 'colName' to have the name and options specified in
   * 'newCol'. Setting comments or updating the type, primary key status, or nullability
   * are not currently supported by Kudu.
   *
   * For the storage attrbiutes - encoding, compression, and block size - Kudu does not
   * rewrite old rowsets to have these attributes during the alter. They are applied to
   * new rowsets as they are written out, and possibly to old rowsets if they are
   * compacted into new rowsets depending on cost based decisions Kudu makes.
   */
  public static void alterColumn(KuduTable tbl, String colName, TColumn newCol)
      throws ImpalaRuntimeException {
    Preconditions.checkState(!Strings.isNullOrEmpty(colName));
    Preconditions.checkNotNull(newCol);
    Preconditions.checkState(!newCol.isIs_key());
    Preconditions.checkState(!newCol.isSetIs_nullable());
    if (LOG.isTraceEnabled()) {
      LOG.trace(
          String.format("Altering column '%s' to '%s'", colName, newCol.toString()));
    }
    KuduColumn col = (KuduColumn) tbl.getColumn(colName);
    String kuduColName = col.getKuduName();
    AlterTableOptions alterTableOptions = new AlterTableOptions();

    if (newCol.isSetDefault_value()) {
      Type type = Type.fromThrift(newCol.getColumnType());
      Object defaultValue = KuduUtil.getKuduDefaultValue(
          newCol.getDefault_value(), type, newCol.getColumnName());
      if (defaultValue == null) {
        alterTableOptions.removeDefault(kuduColName);
      } else {
        alterTableOptions.changeDefault(kuduColName, defaultValue);
      }
    }
    if (newCol.isSetBlock_size()) {
      alterTableOptions.changeDesiredBlockSize(kuduColName, newCol.getBlock_size());
    }
    if (newCol.isSetEncoding()) {
      alterTableOptions.changeEncoding(
          kuduColName, KuduUtil.fromThrift(newCol.getEncoding()));
    }
    if (newCol.isSetCompression()) {
      alterTableOptions.changeCompressionAlgorithm(
          kuduColName, KuduUtil.fromThrift(newCol.getCompression()));
    }
    String newColName = newCol.getColumnName();
    if (!newColName.toLowerCase().equals(colName.toLowerCase())) {
      alterTableOptions.renameColumn(kuduColName, newColName);
    }
    if (newCol.isSetComment()) {
      alterTableOptions.changeComment(kuduColName, newCol.getComment());
    }

    String errMsg = String.format(
        "Error altering column %s in Kudu table %s", colName, tbl.getName());
    alterKuduTable(tbl, alterTableOptions, errMsg);
  }

  /**
   * Alters a Kudu table based on the specified AlterTableOptions params. Blocks until
   * the alter table operation is finished or until the operation timeout is reached.
   * Throws an ImpalaRuntimeException if the operation cannot be completed successfully.
   */
  public static void alterKuduTable(KuduTable tbl, AlterTableOptions ato, String errMsg)
      throws ImpalaRuntimeException {
    KuduClient client = KuduUtil.getKuduClient(tbl.getKuduMasterHosts());
    try {
      client.alterTable(tbl.getKuduTableName(), ato);
      if (!client.isAlterTableDone(tbl.getKuduTableName())) {
        throw new ImpalaRuntimeException(errMsg + ": Kudu operation timed out");
      }
    } catch (KuduException e) {
      throw new ImpalaRuntimeException(errMsg, e);
    }
  }
}
