blob: 53876738a3ecc0d5c7150c5a8713dcb25379c23f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.catalog.store;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.Pair;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.exception.CatalogException;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.common.exception.NotImplementedException;
import org.apache.tajo.exception.InternalException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType;
public class HCatalogStore extends CatalogConstants implements CatalogStore {
protected final Log LOG = LogFactory.getLog(getClass());
protected Configuration conf;
private static final int CLIENT_POOL_SIZE = 2;
private final HCatalogStoreClientPool clientPool;
public HCatalogStore(final Configuration conf)
throws InternalException {
this(conf, new HCatalogStoreClientPool(CLIENT_POOL_SIZE, conf));
}
public HCatalogStore(final Configuration conf, HCatalogStoreClientPool pool)
throws InternalException {
this.conf = conf;
this.clientPool = pool;
}
@Override
public boolean existTable(final String name) throws CatalogException {
boolean exist = false;
String dbName = null, tableName = null;
Pair<String, String> tablePair = null;
org.apache.hadoop.hive.ql.metadata.Table table = null;
HCatalogStoreClientPool.HCatalogStoreClient client = null;
// get db name and table name.
try {
tablePair = HCatUtil.getDbAndTableName(name);
dbName = tablePair.first;
tableName = tablePair.second;
} catch (Exception ioe) {
throw new CatalogException("Table name is wrong.", ioe);
}
// get table
try {
try {
client = clientPool.getClient();
table = HCatUtil.getTable(client.getHiveClient(), dbName, tableName);
if (table != null) {
exist = true;
}
} catch (NoSuchObjectException nsoe) {
exist = false;
} catch (Exception e) {
throw new CatalogException(e);
}
} finally {
client.release();
}
return exist;
}
@Override
public final CatalogProtos.TableDescProto getTable(final String name) throws CatalogException {
String dbName = null, tableName = null;
Pair<String, String> tablePair = null;
org.apache.hadoop.hive.ql.metadata.Table table = null;
HCatalogStoreClientPool.HCatalogStoreClient client = null;
Path path = null;
CatalogProtos.StoreType storeType = null;
org.apache.tajo.catalog.Schema schema = null;
Options options = null;
TableStats stats = null;
PartitionMethodDesc partitions = null;
// get db name and table name.
try {
tablePair = HCatUtil.getDbAndTableName(name);
dbName = tablePair.first;
tableName = tablePair.second;
} catch (Exception ioe) {
throw new CatalogException("Table name is wrong.", ioe);
}
//////////////////////////////////
// set tajo table schema.
//////////////////////////////////
try {
// get hive table schema
try {
client = clientPool.getClient();
table = HCatUtil.getTable(client.getHiveClient(), dbName, tableName);
path = table.getPath();
} catch (NoSuchObjectException nsoe) {
throw new CatalogException("Table not found. - tableName:" + name, nsoe);
} catch (Exception e) {
throw new CatalogException(e);
}
// convert hcatalog field schema into tajo field schema.
schema = new org.apache.tajo.catalog.Schema();
HCatSchema tableSchema = null;
try {
tableSchema = HCatUtil.getTableSchemaWithPtnCols(table);
} catch (IOException ioe) {
throw new CatalogException("Fail to get table schema. - tableName:" + name, ioe);
}
List<HCatFieldSchema> fieldSchemaList = tableSchema.getFields();
boolean isPartitionKey = false;
for (HCatFieldSchema eachField : fieldSchemaList) {
isPartitionKey = false;
if (table.getPartitionKeys() != null) {
for(FieldSchema partitionKey: table.getPartitionKeys()) {
if (partitionKey.getName().equals(eachField.getName())) {
isPartitionKey = true;
}
}
}
if (!isPartitionKey) {
String fieldName = dbName + CatalogUtil.IDENTIFIER_DELIMITER + tableName +
CatalogUtil.IDENTIFIER_DELIMITER + eachField.getName();
TajoDataTypes.Type dataType = HCatalogUtil.getTajoFieldType(eachField.getType().toString());
schema.addColumn(fieldName, dataType);
}
}
// validate field schema.
try {
HCatalogUtil.validateHCatTableAndTajoSchema(tableSchema);
} catch (Exception e) {
throw new CatalogException("HCatalog cannot support schema. - schema:" + tableSchema.toString(), e);
}
stats = new TableStats();
options = Options.create();
Properties properties = table.getMetadata();
if (properties != null) {
// set field delimiter
String fieldDelimiter = "", nullFormat = "";
if (properties.getProperty(serdeConstants.FIELD_DELIM) != null) {
fieldDelimiter = properties.getProperty(serdeConstants.FIELD_DELIM);
} else {
// if hive table used default row format delimiter, Properties doesn't have it.
// So, Tajo must set as follows:
fieldDelimiter = "\u0001";
}
// set null format
if (properties.getProperty(serdeConstants.SERIALIZATION_NULL_FORMAT) != null) {
nullFormat = properties.getProperty(serdeConstants.SERIALIZATION_NULL_FORMAT);
} else {
nullFormat = "\\N";
}
// set file output format
String fileOutputformat = properties.getProperty(hive_metastoreConstants.FILE_OUTPUT_FORMAT);
storeType = CatalogUtil.getStoreType(HCatalogUtil.getStoreType(fileOutputformat));
if (storeType.equals(CatalogProtos.StoreType.CSV)) {
options.put(CSVFILE_DELIMITER, StringEscapeUtils.escapeJava(fieldDelimiter));
options.put(CSVFILE_NULL, StringEscapeUtils.escapeJava(nullFormat));
} else if (storeType.equals(CatalogProtos.StoreType.RCFILE)) {
options.put(RCFILE_NULL, StringEscapeUtils.escapeJava(nullFormat));
String serde = properties.getProperty(serdeConstants.SERIALIZATION_LIB);
if (LazyBinaryColumnarSerDe.class.getName().equals(serde)) {
options.put(RCFILE_SERDE, RCFILE_BINARY_SERDE);
} else if (ColumnarSerDe.class.getName().equals(serde)) {
options.put(RCFILE_SERDE, RCFILE_TEXT_SERDE);
}
}
// set data size
long totalSize = 0;
if(properties.getProperty("totalSize") != null) {
totalSize = Long.parseLong(properties.getProperty("totalSize"));
} else {
try {
FileSystem fs = path.getFileSystem(conf);
if (fs.exists(path)) {
totalSize = fs.getContentSummary(path).getLength();
}
} catch (IOException ioe) {
throw new CatalogException("Fail to get path. - path:" + path.toString(), ioe);
}
}
stats.setNumBytes(totalSize);
}
// set partition keys
if (table.getPartitionKeys() != null) {
Schema expressionSchema = new Schema();
StringBuilder sb = new StringBuilder();
if (table.getPartitionKeys().size() > 0) {
List<FieldSchema> partitionKeys = table.getPartitionKeys();
for(int i = 0; i < partitionKeys.size(); i++) {
FieldSchema fieldSchema = partitionKeys.get(i);
TajoDataTypes.Type dataType = HCatalogUtil.getTajoFieldType(fieldSchema.getType().toString());
String fieldName = dbName + CatalogUtil.IDENTIFIER_DELIMITER + tableName +
CatalogUtil.IDENTIFIER_DELIMITER + fieldSchema.getName();
expressionSchema.addColumn(new Column(fieldName, dataType));
if (i > 0) {
sb.append(",");
}
sb.append(fieldSchema.getName());
}
partitions = new PartitionMethodDesc(
dbName + "." + tableName,
PartitionType.COLUMN,
sb.toString(),
expressionSchema);
}
}
} finally {
client.release();
}
TableMeta meta = new TableMeta(storeType, options);
TableDesc tableDesc = new TableDesc(dbName + "." + tableName, schema, meta, path);
if (stats != null) {
tableDesc.setStats(stats);
}
if (partitions != null) {
tableDesc.setPartitionMethod(partitions);
}
return tableDesc.getProto();
}
private TajoDataTypes.Type getDataType(final String typeStr) {
try {
return Enum.valueOf(TajoDataTypes.Type.class, typeStr);
} catch (IllegalArgumentException iae) {
LOG.error("Cannot find a matched type aginst from '" + typeStr + "'");
return null;
}
}
@Override
public final List<String> getAllTableNames() throws CatalogException {
List<String> dbs = null;
List<String> tables = null;
List<String> allTables = new ArrayList<String>();
HCatalogStoreClientPool.HCatalogStoreClient client = null;
try {
client = clientPool.getClient();
dbs = client.getHiveClient().getAllDatabases();
for(String eachDB: dbs) {
tables = client.getHiveClient().getAllTables(eachDB);
for(String eachTable: tables) {
allTables.add(eachDB + "." + eachTable);
}
}
} catch (MetaException e) {
throw new CatalogException(e);
} finally {
client.release();
}
return allTables;
}
@Override
public final void addTable(final CatalogProtos.TableDescProto tableDescProto) throws CatalogException {
String dbName = null, tableName = null;
Pair<String, String> tablePair = null;
HCatalogStoreClientPool.HCatalogStoreClient client = null;
TableDesc tableDesc = new TableDesc(tableDescProto);
// get db name and table name.
try {
tablePair = HCatUtil.getDbAndTableName(tableDesc.getName());
dbName = tablePair.first;
tableName = tablePair.second;
} catch (Exception ioe) {
throw new CatalogException("Table name is wrong.", ioe);
}
try {
client = clientPool.getClient();
org.apache.hadoop.hive.metastore.api.Table table = new org.apache.hadoop.hive.metastore.api.Table();
table.setDbName(dbName);
table.setTableName(tableName);
// TODO: set owner
//table.setOwner();
StorageDescriptor sd = new StorageDescriptor();
sd.setParameters(new HashMap<String, String>());
sd.setSerdeInfo(new SerDeInfo());
sd.getSerdeInfo().setParameters(new HashMap<String, String>());
sd.getSerdeInfo().setName(table.getTableName());
// if tajo set location method, thrift client make exception as follows:
// Caused by: MetaException(message:java.lang.NullPointerException)
// If you want to modify table path, you have to modify on Hive cli.
// sd.setLocation(tableDesc.getPath().toString());
// set column information
List<Column> columns = tableDesc.getSchema().getColumns();
ArrayList<FieldSchema> cols = new ArrayList<FieldSchema>(columns.size());
for (Column eachField : columns) {
cols.add(new FieldSchema(eachField.getSimpleName(),
HCatalogUtil.getHiveFieldType(eachField.getDataType().getType().name()), ""));
}
sd.setCols(cols);
// set partition keys
if (tableDesc.hasPartition() && tableDesc.getPartitionMethod().getPartitionType().equals(PartitionType.COLUMN)) {
List<FieldSchema> partitionKeys = new ArrayList<FieldSchema>();
for(Column eachPartitionKey: tableDesc.getPartitionMethod().getExpressionSchema().getColumns()) {
partitionKeys.add(new FieldSchema( eachPartitionKey.getSimpleName(),
HCatalogUtil.getHiveFieldType(eachPartitionKey.getDataType().getType().name()), ""));
}
table.setPartitionKeys(partitionKeys);
}
if (tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.RCFILE)) {
String serde = tableDesc.getMeta().getOption(RCFILE_SERDE);
sd.setInputFormat(org.apache.hadoop.hive.ql.io.RCFileInputFormat.class.getName());
sd.setOutputFormat(org.apache.hadoop.hive.ql.io.RCFileOutputFormat.class.getName());
if (RCFILE_TEXT_SERDE.equals(serde)) {
sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe.class.getName());
} else {
sd.getSerdeInfo().setSerializationLib(
org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe.class.getName());
}
if (tableDesc.getMeta().getOption(RCFILE_NULL) != null) {
sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_NULL_FORMAT,
StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(RCFILE_NULL)));
}
} else if (tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.CSV)) {
sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName());
sd.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class.getName());
sd.setOutputFormat(org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.class.getName());
String fieldDelimiter = tableDesc.getMeta().getOption(CSVFILE_DELIMITER, CSVFILE_DELIMITER_DEFAULT);
// User can use an unicode for filed delimiter such as \u0001, \001.
// In this case, java console will convert this value into "\\u001".
// And hive will un-espace this value again.
// As a result, user can use right field delimiter.
// So, we have to un-escape this value.
sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT,
StringEscapeUtils.unescapeJava(fieldDelimiter));
sd.getSerdeInfo().getParameters().put(serdeConstants.FIELD_DELIM,
StringEscapeUtils.unescapeJava(fieldDelimiter));
if (tableDesc.getMeta().getOption(CSVFILE_NULL) != null) {
sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_NULL_FORMAT,
StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(CSVFILE_NULL)));
}
} else {
throw new CatalogException(new NotImplementedException(tableDesc.getMeta().getStoreType().name()));
}
sd.setSortCols(new ArrayList<Order>());
table.setSd(sd);
client.getHiveClient().createTable(table);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new CatalogException(e);
} finally {
client.release();
}
}
@Override
public final void deleteTable(final String name) throws CatalogException {
String dbName = null, tableName = null;
Pair<String, String> tablePair = null;
HCatalogStoreClientPool.HCatalogStoreClient client = null;
// get db name and table name.
try {
tablePair = HCatUtil.getDbAndTableName(name);
dbName = tablePair.first;
tableName = tablePair.second;
} catch (Exception ioe) {
throw new CatalogException("Table name is wrong.", ioe);
}
try {
client = clientPool.getClient();
client.getHiveClient().dropTable(dbName, tableName, false, false);
} catch (NoSuchObjectException nsoe) {
} catch (Exception e) {
throw new CatalogException(e);
} finally {
client.release();
}
}
@Override
public void addPartitionMethod(CatalogProtos.PartitionMethodProto partitionMethodProto) throws CatalogException {
// TODO - not implemented yet
}
@Override
public CatalogProtos.PartitionMethodProto getPartitionMethod(String tableName) throws CatalogException {
return null; // TODO - not implemented yet
}
@Override
public boolean existPartitionMethod(String tableName) throws CatalogException {
return false; // TODO - not implemented yet
}
@Override
public void delPartitionMethod(String tableName) throws CatalogException {
// TODO - not implemented yet
}
@Override
public void addPartitions(CatalogProtos.PartitionsProto partitionsProto) throws CatalogException {
// TODO - not implemented yet
}
@Override
public void addPartition(CatalogProtos.PartitionDescProto partitionDescProto) throws CatalogException {
// TODO - not implemented yet
}
@Override
public CatalogProtos.PartitionsProto getPartitions(String tableName) throws CatalogException {
return null; // TODO - not implemented yet
}
@Override
public CatalogProtos.PartitionDescProto getPartition(String partitionName) throws CatalogException {
return null; // TODO - not implemented yet
}
@Override
public void delPartition(String partitionName) throws CatalogException {
// TODO - not implemented yet
}
@Override
public void delPartitions(String tableName) throws CatalogException {
// TODO - not implemented yet
}
@Override
public final void addFunction(final FunctionDesc func) throws CatalogException {
// TODO - not implemented yet
}
@Override
public final void deleteFunction(final FunctionDesc func) throws CatalogException {
// TODO - not implemented yet
}
@Override
public final void existFunction(final FunctionDesc func) throws CatalogException {
// TODO - not implemented yet
}
@Override
public final List<String> getAllFunctionNames() throws CatalogException {
// TODO - not implemented yet
return null;
}
@Override
public void delIndex(String indexName) throws CatalogException {
// TODO - not implemented yet
}
@Override
public boolean existIndex(String indexName) throws CatalogException {
// TODO - not implemented yet
return false;
}
@Override
public CatalogProtos.IndexDescProto[] getIndexes(String tableName) throws CatalogException {
// TODO - not implemented yet
return null;
}
@Override
public void addIndex(CatalogProtos.IndexDescProto proto) throws CatalogException {
// TODO - not implemented yet
}
@Override
public CatalogProtos.IndexDescProto getIndex(String indexName) throws CatalogException {
// TODO - not implemented yet
return null;
}
@Override
public CatalogProtos.IndexDescProto getIndex(String tableName, String columnName) throws CatalogException {
// TODO - not implemented yet
return null;
}
@Override
public boolean existIndex(String tableName, String columnName) throws CatalogException{
// TODO - not implemented yet
return false;
}
@Override
public final void close() {
clientPool.close();
}
}