blob: 418d009a3df3bcf88e24edd241c8d941154ae320 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog.local;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.impala.analysis.ColumnDef;
import org.apache.impala.analysis.KuduPartitionParam;
import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.FeCatalogUtils;
import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.KuduColumn;
import org.apache.impala.catalog.KuduTable;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.thrift.TKuduTable;
import org.apache.impala.thrift.TTableDescriptor;
import org.apache.impala.thrift.TTableType;
import org.apache.impala.util.KuduUtil;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.errorprone.annotations.Immutable;
public class LocalKuduTable extends LocalTable implements FeKuduTable {
private final TableParams tableParams_;
private final List<KuduPartitionParam> partitionBy_;
private final ImmutableList<String> primaryKeyColumnNames_;
/**
* Create a new instance based on the table metadata 'msTable' stored
* in the metastore.
*/
static LocalTable loadFromKudu(LocalDb db, Table msTable, TableMetaRef ref)
throws TableLoadingException {
Preconditions.checkNotNull(db);
Preconditions.checkNotNull(msTable);
String fullTableName = msTable.getDbName() + "." + msTable.getTableName();
TableParams params = new TableParams(msTable);
org.apache.kudu.client.KuduTable kuduTable = params.openTable();
List<Column> cols = new ArrayList<>();
List<FieldSchema> fieldSchemas = new ArrayList<>();
convertColsFromKudu(kuduTable.getSchema(), cols, fieldSchemas);
// TODO(todd): update the schema in HMS if it doesn't match? This will
// no longer be necessary after the Kudu-HMS integration is complete, so
// maybe not worth implementing here for the LocalCatalog implementation.
// Use the schema derived from Kudu, rather than the one stored in the HMS.
msTable.getSd().setCols(fieldSchemas);
List<String> pkNames = new ArrayList<>();
for (ColumnSchema c: kuduTable.getSchema().getPrimaryKeyColumns()) {
pkNames.add(c.getName().toLowerCase());
}
List<KuduPartitionParam> partitionBy = Utils.loadPartitionByParams(kuduTable);
ColumnMap cmap = new ColumnMap(cols, /*numClusteringCols=*/0, fullTableName);
return new LocalKuduTable(db, msTable, ref, cmap, pkNames, partitionBy);
}
public static FeKuduTable createCtasTarget(LocalDb db, Table msTable,
List<ColumnDef> columnDefs, List<ColumnDef> primaryKeyColumnDefs,
List<KuduPartitionParam> kuduPartitionParams) {
String fullTableName = msTable.getDbName() + "." + msTable.getTableName();
List<Column> columns = new ArrayList<>();
List<String> pkNames = new ArrayList<>();
int pos = 0;
for (ColumnDef colDef: columnDefs) {
// TODO(todd): it seems odd that for CTAS targets, the columns are of type
// 'Column' instead of 'KuduColumn'.
columns.add(new Column(colDef.getColName(), colDef.getType(), pos++));
}
for (ColumnDef pkColDef: primaryKeyColumnDefs) {
pkNames.add(pkColDef.getColName());
}
ColumnMap cmap = new ColumnMap(columns, /*numClusteringCols=*/0, fullTableName);
return new LocalKuduTable(db, msTable, /*ref=*/null, cmap, pkNames,
kuduPartitionParams);
}
private static void convertColsFromKudu(Schema schema, List<Column> cols,
List<FieldSchema> fieldSchemas) {
Preconditions.checkArgument(cols.isEmpty());;
Preconditions.checkArgument(fieldSchemas.isEmpty());;
int pos = 0;
for (ColumnSchema colSchema: schema.getColumns()) {
KuduColumn kuduCol;
try {
kuduCol = KuduColumn.fromColumnSchema(colSchema, pos++);
} catch (ImpalaRuntimeException e) {
throw new LocalCatalogException(e);
}
Preconditions.checkNotNull(kuduCol);
// Add the HMS column
fieldSchemas.add(new FieldSchema(kuduCol.getName(),
kuduCol.getType().toSql().toLowerCase(), /*comment=*/null));
cols.add(kuduCol);
}
}
private LocalKuduTable(LocalDb db, Table msTable, TableMetaRef ref, ColumnMap cmap,
List<String> primaryKeyColumnNames,
List<KuduPartitionParam> partitionBy) {
super(db, msTable, ref, cmap);
tableParams_ = new TableParams(msTable);
partitionBy_ = ImmutableList.copyOf(partitionBy);
primaryKeyColumnNames_ = ImmutableList.copyOf(primaryKeyColumnNames);
}
@Override
public String getKuduMasterHosts() {
return tableParams_.masters_;
}
@Override
public String getKuduTableName() {
return tableParams_.kuduTableName_;
}
@Override
public List<String> getPrimaryKeyColumnNames() {
return primaryKeyColumnNames_;
}
@Override
public List<KuduPartitionParam> getPartitionBy() {
return partitionBy_;
}
@Override
public TTableDescriptor toThriftDescriptor(int tableId,
Set<Long> referencedPartitions) {
// TODO(todd): the old implementation passes kuduTableName_ instead of name below.
TTableDescriptor desc = new TTableDescriptor(tableId, TTableType.KUDU_TABLE,
FeCatalogUtils.getTColumnDescriptors(this),
getNumClusteringCols(),
name_, db_.getName());
TKuduTable tbl = new TKuduTable();
tbl.setKey_columns(Preconditions.checkNotNull(primaryKeyColumnNames_));
tbl.setMaster_addresses(tableParams_.getMastersAsList());
tbl.setTable_name(tableParams_.kuduTableName_);
Preconditions.checkNotNull(partitionBy_);
// IMPALA-5154: partitionBy_ may be empty if Kudu table created outside Impala,
// partition_by must be explicitly created because the field is required.
tbl.partition_by = new ArrayList<>();
for (KuduPartitionParam partitionParam: partitionBy_) {
tbl.addToPartition_by(partitionParam.toThrift());
}
desc.setKuduTable(tbl);
return desc;
}
/**
* Parsed parameters from the HMS indicating the cluster and table name for
* a Kudu table.
*/
@Immutable
private static class TableParams {
private final String kuduTableName_;
private final String masters_;
TableParams(Table msTable) {
String fullTableName = msTable.getDbName() + "." + msTable.getTableName();
Map<String, String> params = msTable.getParameters();
kuduTableName_ = params.get(KuduTable.KEY_TABLE_NAME);
if (kuduTableName_ == null) {
throw new LocalCatalogException("No " + KuduTable.KEY_TABLE_NAME +
" property found for table " + fullTableName);
}
masters_ = params.get(KuduTable.KEY_MASTER_HOSTS);
if (masters_ == null) {
throw new LocalCatalogException("No " + KuduTable.KEY_MASTER_HOSTS +
" property found for table " + fullTableName);
}
}
public List<String> getMastersAsList() {
return Lists.newArrayList(masters_.split(","));
}
public org.apache.kudu.client.KuduTable openTable() throws TableLoadingException {
KuduClient kuduClient = KuduUtil.getKuduClient(masters_);
org.apache.kudu.client.KuduTable kuduTable;
try {
kuduTable = kuduClient.openTable(kuduTableName_);
} catch (KuduException e) {
throw new TableLoadingException(
String.format("Error opening Kudu table '%s', Kudu error: %s",
kuduTableName_, e.getMessage()));
}
return kuduTable;
}
}
}