blob: 61bdf0297520a9a6728911199510e44f9d262c43 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lens.cube.metadata;
import java.lang.reflect.Constructor;
import java.util.*;
import java.util.Map.Entry;
import org.apache.lens.server.api.error.LensException;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.metadata.*;
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.mapred.TextInputFormat;
import com.google.common.collect.Maps;
/**
* Storage is Named Interface which would represent the underlying storage of the data.
*/
public abstract class Storage extends AbstractCubeTable implements PartitionMetahook {
private static final List<FieldSchema> COLUMNS = new ArrayList<FieldSchema>();
static {
COLUMNS.add(new FieldSchema("dummy", "string", "dummy column"));
}
protected Storage(String name, Map<String, String> properties) {
super(name, COLUMNS, properties, 0L);
addProperties();
}
public Storage(Table hiveTable) {
super(hiveTable);
}
/**
* Get the name prefix of the storage
*
* @return Name followed by storage separator
*/
public String getPrefix() {
return getPrefix(getName());
}
@Override
public CubeTableType getTableType() {
return CubeTableType.STORAGE;
}
@Override
public Set<String> getStorages() {
throw new NotImplementedException();
}
@Override
protected void addProperties() {
super.addProperties();
getProperties().put(MetastoreUtil.getStorageClassKey(getName()), getClass().getCanonicalName());
}
/**
* Get the name prefix of the storage
*
* @param name Name of the storage
* @return Name followed by storage separator
*/
public static String getPrefix(String name) {
return name + StorageConstants.STORGAE_SEPARATOR;
}
public static final class LatestInfo {
Map<String, LatestPartColumnInfo> latestParts = new HashMap<String, LatestPartColumnInfo>();
Partition part = null;
void addLatestPartInfo(String partCol, LatestPartColumnInfo partInfo) {
latestParts.put(partCol, partInfo);
}
void setPart(Partition part) {
this.part = part;
}
}
public static final class LatestPartColumnInfo extends HashMap<String, String> {
public LatestPartColumnInfo(Map<String, String> partParams) {
putAll(partParams);
}
public Map<String, String> getPartParams(Map<String, String> parentParams) {
putAll(parentParams);
return this;
}
}
/**
* Get the storage table descriptor for the given parent table.
*
* @param storageTableNamePrefix Storage table prefix based on update period
* @param client The metastore client
* @param parent Is either Fact or Dimension table
* @param crtTbl Create table info
* @return Table describing the storage table
* @throws HiveException
*/
public static Table getStorageTable(String storageTableNamePrefix, Hive client, Table parent, StorageTableDesc crtTbl)
throws HiveException {
// Change it to the appropriate storage table name.
String storageTableName = MetastoreUtil
.getStorageTableName(parent.getTableName(), Storage.getPrefix(storageTableNamePrefix));
Table tbl = client.getTable(storageTableName, false);
if (tbl == null) {
tbl = client.newTable(storageTableName);
}
tbl.getTTable().setSd(new StorageDescriptor(parent.getTTable().getSd()));
if (crtTbl.getTblProps() != null) {
tbl.getTTable().getParameters().putAll(crtTbl.getTblProps());
}
if (crtTbl.getPartCols() != null) {
tbl.setPartCols(crtTbl.getPartCols());
}
if (crtTbl.getNumBuckets() != -1) {
tbl.setNumBuckets(crtTbl.getNumBuckets());
}
if (!StringUtils.isBlank(crtTbl.getStorageHandler())) {
tbl.setProperty(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE,
crtTbl.getStorageHandler());
}
HiveStorageHandler storageHandler = tbl.getStorageHandler();
if (crtTbl.getSerName() == null) {
if (storageHandler == null || storageHandler.getSerDeClass() == null) {
tbl.setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName());
} else {
String serDeClassName = storageHandler.getSerDeClass().getName();
tbl.setSerializationLib(serDeClassName);
}
} else {
// let's validate that the serde exists
tbl.setSerializationLib(crtTbl.getSerName());
}
if (crtTbl.getFieldDelim() != null) {
tbl.setSerdeParam(serdeConstants.FIELD_DELIM, crtTbl.getFieldDelim());
tbl.setSerdeParam(serdeConstants.SERIALIZATION_FORMAT, crtTbl.getFieldDelim());
}
if (crtTbl.getFieldEscape() != null) {
tbl.setSerdeParam(serdeConstants.ESCAPE_CHAR, crtTbl.getFieldEscape());
}
if (crtTbl.getCollItemDelim() != null) {
tbl.setSerdeParam(serdeConstants.COLLECTION_DELIM, crtTbl.getCollItemDelim());
}
if (crtTbl.getMapKeyDelim() != null) {
tbl.setSerdeParam(serdeConstants.MAPKEY_DELIM, crtTbl.getMapKeyDelim());
}
if (crtTbl.getLineDelim() != null) {
tbl.setSerdeParam(serdeConstants.LINE_DELIM, crtTbl.getLineDelim());
}
if (crtTbl.getSerdeProps() != null) {
for (Entry<String, String> m : crtTbl.getSerdeProps().entrySet()) {
tbl.setSerdeParam(m.getKey(), m.getValue());
}
}
if (crtTbl.getBucketCols() != null) {
tbl.setBucketCols(crtTbl.getBucketCols());
}
if (crtTbl.getSortCols() != null) {
tbl.setSortCols(crtTbl.getSortCols());
}
if (crtTbl.getComment() != null) {
tbl.setProperty("comment", crtTbl.getComment());
}
if (crtTbl.getLocation() != null) {
tbl.setDataLocation(new Path(crtTbl.getLocation()));
}
if (crtTbl.getSkewedColNames() != null) {
tbl.setSkewedColNames(crtTbl.getSkewedColNames());
}
if (crtTbl.getSkewedColValues() != null) {
tbl.setSkewedColValues(crtTbl.getSkewedColValues());
}
tbl.setStoredAsSubDirectories(crtTbl.isStoredAsSubDirectories());
if (crtTbl.getInputFormat() != null) {
tbl.setInputFormatClass(crtTbl.getInputFormat());
} else {
tbl.setInputFormatClass(TextInputFormat.class.getName());
}
if (crtTbl.getOutputFormat() != null) {
tbl.setOutputFormatClass(crtTbl.getOutputFormat());
} else {
tbl.setOutputFormatClass(IgnoreKeyTextOutputFormat.class.getName());
}
tbl.getTTable().getSd().setInputFormat(tbl.getInputFormatClass().getName());
tbl.getTTable().getSd().setOutputFormat(tbl.getOutputFormatClass().getName());
if (crtTbl.isExternal()) {
tbl.setProperty("EXTERNAL", "TRUE");
tbl.setTableType(TableType.EXTERNAL_TABLE);
}
return tbl;
}
/**
* Add given partitions in the underlying hive table and update latest partition links
*
* @param client hive client instance
* @param factOrDimTable fact or dim name
* @param updatePeriod update period of partitions.
* @param storagePartitionDescs all partitions to be added
* @param latestInfos new latest info. atleast one partition for the latest value exists for each part
* column
* @throws HiveException
*/
public List<Partition> addPartitions(Hive client, String factOrDimTable, UpdatePeriod updatePeriod,
List<StoragePartitionDesc> storagePartitionDescs,
Map<Map<String, String>, LatestInfo> latestInfos, String tableName) throws HiveException {
preAddPartitions(storagePartitionDescs);
Map<Map<String, String>, Map<String, Integer>> latestPartIndexForPartCols = Maps.newHashMap();
boolean success = false;
try {
String dbName = SessionState.get().getCurrentDatabase();
AddPartitionDesc addParts = new AddPartitionDesc(dbName, tableName, true);
Table storageTbl = client.getTable(dbName, tableName);
for (StoragePartitionDesc addPartitionDesc : storagePartitionDescs) {
String location = null;
if (addPartitionDesc.getLocation() != null) {
Path partLocation = new Path(addPartitionDesc.getLocation());
if (partLocation.isAbsolute()) {
location = addPartitionDesc.getLocation();
} else {
location = new Path(storageTbl.getPath(), partLocation).toString();
}
}
Map<String, String> partParams = addPartitionDesc.getPartParams();
if (partParams == null) {
partParams = new HashMap<String, String>();
}
partParams.put(MetastoreConstants.PARTITION_UPDATE_PERIOD, addPartitionDesc.getUpdatePeriod().name());
addParts.addPartition(addPartitionDesc.getStoragePartSpec(), location);
int curIndex = addParts.getPartitionCount() - 1;
addParts.getPartition(curIndex).setPartParams(partParams);
addParts.getPartition(curIndex).setInputFormat(addPartitionDesc.getInputFormat());
addParts.getPartition(curIndex).setOutputFormat(addPartitionDesc.getOutputFormat());
addParts.getPartition(curIndex).setNumBuckets(addPartitionDesc.getNumBuckets());
addParts.getPartition(curIndex).setCols(addPartitionDesc.getCols());
addParts.getPartition(curIndex).setSerializationLib(addPartitionDesc.getSerializationLib());
addParts.getPartition(curIndex).setSerdeParams(addPartitionDesc.getSerdeParams());
addParts.getPartition(curIndex).setBucketCols(addPartitionDesc.getBucketCols());
addParts.getPartition(curIndex).setSortCols(addPartitionDesc.getSortCols());
if (latestInfos != null && latestInfos.get(addPartitionDesc.getNonTimePartSpec()) != null) {
for (Map.Entry<String, LatestPartColumnInfo> entry : latestInfos
.get(addPartitionDesc.getNonTimePartSpec()).latestParts.entrySet()) {
if (addPartitionDesc.getTimePartSpec().containsKey(entry.getKey())
&& entry.getValue().get(MetastoreUtil.getLatestPartTimestampKey(entry.getKey())).equals(
updatePeriod.format(addPartitionDesc.getTimePartSpec().get(entry.getKey())))) {
if (latestPartIndexForPartCols.get(addPartitionDesc.getNonTimePartSpec()) == null) {
latestPartIndexForPartCols.put(addPartitionDesc.getNonTimePartSpec(),
Maps.<String, Integer>newHashMap());
}
latestPartIndexForPartCols.get(addPartitionDesc.getNonTimePartSpec()).put(entry.getKey(), curIndex);
}
}
}
}
if (latestInfos != null) {
for (Map.Entry<Map<String, String>, LatestInfo> entry1 : latestInfos.entrySet()) {
Map<String, String> nonTimeParts = entry1.getKey();
LatestInfo latestInfo = entry1.getValue();
for (Map.Entry<String, LatestPartColumnInfo> entry : latestInfo.latestParts.entrySet()) {
// symlink this partition to latest
List<Partition> latest;
String latestPartCol = entry.getKey();
try {
latest = client
.getPartitionsByFilter(storageTbl, StorageConstants.getLatestPartFilter(latestPartCol, nonTimeParts));
} catch (Exception e) {
throw new HiveException("Could not get latest partition", e);
}
if (!latest.isEmpty()) {
client.dropPartition(storageTbl.getTableName(), latest.get(0).getValues(), false);
}
if (latestPartIndexForPartCols.get(nonTimeParts).containsKey(latestPartCol)) {
AddPartitionDesc.OnePartitionDesc latestPartWithFullTimestamp = addParts.getPartition(
latestPartIndexForPartCols.get(nonTimeParts).get(latestPartCol));
addParts.addPartition(
StorageConstants.getLatestPartSpec(latestPartWithFullTimestamp.getPartSpec(), latestPartCol),
latestPartWithFullTimestamp.getLocation());
int curIndex = addParts.getPartitionCount() - 1;
addParts.getPartition(curIndex).setPartParams(entry.getValue().getPartParams(
latestPartWithFullTimestamp.getPartParams()));
addParts.getPartition(curIndex).setInputFormat(latestPartWithFullTimestamp.getInputFormat());
addParts.getPartition(curIndex).setOutputFormat(latestPartWithFullTimestamp.getOutputFormat());
addParts.getPartition(curIndex).setNumBuckets(latestPartWithFullTimestamp.getNumBuckets());
addParts.getPartition(curIndex).setCols(latestPartWithFullTimestamp.getCols());
addParts.getPartition(curIndex).setSerializationLib(latestPartWithFullTimestamp.getSerializationLib());
addParts.getPartition(curIndex).setSerdeParams(latestPartWithFullTimestamp.getSerdeParams());
addParts.getPartition(curIndex).setBucketCols(latestPartWithFullTimestamp.getBucketCols());
addParts.getPartition(curIndex).setSortCols(latestPartWithFullTimestamp.getSortCols());
}
}
}
}
client = Hive.get();
List<Partition> partitionsAdded = client.createPartitions(addParts);
success = true;
return partitionsAdded;
} finally {
if (success) {
commitAddPartitions(storagePartitionDescs);
} else {
rollbackAddPartitions(storagePartitionDescs);
}
}
}
/**
* Update existing partition
* @param client hive client instance
* @param fact fact name
* @param partition partition to be updated
* @throws InvalidOperationException
* @throws HiveException
*/
public void updatePartition(Hive client, String fact, Partition partition)
throws InvalidOperationException, HiveException {
client.alterPartition(MetastoreUtil.getFactOrDimtableStorageTableName(fact, getName()), partition, null);
}
/**
* Update existing partitions
* @param client hive client instance
* @param fact fact name
* @param partitions partitions to be updated
* @throws InvalidOperationException
* @throws HiveException
*/
public void updatePartitions(String storageTable, Hive client, String fact, List<Partition> partitions)
throws InvalidOperationException, HiveException {
boolean success = false;
try {
client.alterPartitions(storageTable, partitions, null);
success = true;
} finally {
if (success) {
commitUpdatePartition(partitions);
} else {
rollbackUpdatePartition(partitions);
}
}
}
/**
* Drop the partition in the underlying hive table and update latest partition link
*
* @param client The metastore client
* @param storageTableName TableName
* @param partVals Partition specification
* @param updateLatestInfo The latest partition info if it needs update, null if latest should not be updated
* @param nonTimePartSpec
* @throws HiveException
*/
public void dropPartition(Hive client, String storageTableName, List<String> partVals,
Map<String, LatestInfo> updateLatestInfo, Map<String, String> nonTimePartSpec) throws HiveException {
preDropPartition(storageTableName, partVals);
boolean success = false;
try {
client.dropPartition(storageTableName, partVals, false);
String dbName = SessionState.get().getCurrentDatabase();
Table storageTbl = client.getTable(storageTableName);
// update latest info
if (updateLatestInfo != null) {
for (Entry<String, LatestInfo> entry : updateLatestInfo.entrySet()) {
String latestPartCol = entry.getKey();
// symlink this partition to latest
List<Partition> latestParts;
try {
latestParts = client.getPartitionsByFilter(storageTbl,
StorageConstants.getLatestPartFilter(latestPartCol, nonTimePartSpec));
MetastoreUtil.filterPartitionsByNonTimeParts(latestParts, nonTimePartSpec, latestPartCol);
} catch (Exception e) {
throw new HiveException("Could not get latest partition", e);
}
if (!latestParts.isEmpty()) {
assert latestParts.size() == 1;
client.dropPartition(storageTbl.getTableName(), latestParts.get(0).getValues(), false);
}
LatestInfo latest = entry.getValue();
if (latest != null && latest.part != null) {
AddPartitionDesc latestPart = new AddPartitionDesc(dbName, storageTableName, true);
latestPart.addPartition(StorageConstants.getLatestPartSpec(latest.part.getSpec(), latestPartCol),
latest.part.getLocation());
latestPart.getPartition(0).setPartParams(
latest.latestParts.get(latestPartCol).getPartParams(latest.part.getParameters()));
latestPart.getPartition(0).setInputFormat(latest.part.getInputFormatClass().getCanonicalName());
latestPart.getPartition(0).setOutputFormat(latest.part.getOutputFormatClass().getCanonicalName());
latestPart.getPartition(0).setNumBuckets(latest.part.getBucketCount());
latestPart.getPartition(0).setCols(latest.part.getCols());
latestPart.getPartition(0).setSerializationLib(
latest.part.getTPartition().getSd().getSerdeInfo().getSerializationLib());
latestPart.getPartition(0).setSerdeParams(
latest.part.getTPartition().getSd().getSerdeInfo().getParameters());
latestPart.getPartition(0).setBucketCols(latest.part.getBucketCols());
latestPart.getPartition(0).setSortCols(latest.part.getSortCols());
client.createPartitions(latestPart);
}
}
}
success = true;
} finally {
if (success) {
commitDropPartition(storageTableName, partVals);
} else {
rollbackDropPartition(storageTableName, partVals);
}
}
}
static Storage createInstance(Table tbl) throws LensException {
String storageName = tbl.getTableName();
String storageClassName = tbl.getParameters().get(MetastoreUtil.getStorageClassKey(storageName));
try {
Class<?> clazz = Class.forName(storageClassName);
Constructor<?> constructor = clazz.getConstructor(Table.class);
return (Storage) constructor.newInstance(tbl);
} catch (Exception e) {
throw new LensException("Could not create storage class" + storageClassName, e);
}
}
}