blob: 53431cc50a27889a77f8eadfca441517b161ae83 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.mapreduce;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hcatalog.common.ErrorType;
import org.apache.hcatalog.common.HCatException;
public class HCatEximOutputCommitter extends HCatBaseOutputCommitter {
private static final Log LOG = LogFactory.getLog(HCatEximOutputCommitter.class);
public HCatEximOutputCommitter(JobContext context, OutputCommitter baseCommitter) {
super(context,baseCommitter);
}
@Override
public void cleanupJob(JobContext jobContext) throws IOException {
LOG.info("HCatEximOutputCommitter.cleanup invoked; m.o.d : " +
jobContext.getConfiguration().get("mapred.output.dir"));
if (baseCommitter != null) {
LOG.info("baseCommitter.class = " + baseCommitter.getClass().getName());
baseCommitter.cleanupJob(jobContext);
}
OutputJobInfo jobInfo = HCatBaseOutputFormat.getJobInfo(jobContext);
Configuration conf = jobContext.getConfiguration();
FileSystem fs;
try {
fs = FileSystem.get(new URI(jobInfo.getTable().getSd().getLocation()), conf);
} catch (URISyntaxException e) {
throw new IOException(e);
}
doCleanup(jobInfo, fs);
}
private static void doCleanup(OutputJobInfo jobInfo, FileSystem fs) throws IOException,
HCatException {
try {
Table ttable = jobInfo.getTable();
org.apache.hadoop.hive.ql.metadata.Table table = new org.apache.hadoop.hive.ql.metadata.Table(
ttable);
StorageDescriptor tblSD = ttable.getSd();
Path tblPath = new Path(tblSD.getLocation());
Path path = new Path(tblPath, "_metadata");
List<Partition> tpartitions = null;
try {
Map.Entry<org.apache.hadoop.hive.metastore.api.Table, List<Partition>> rv = EximUtil
.readMetaData(fs, path);
tpartitions = rv.getValue();
} catch (IOException e) {
}
List<org.apache.hadoop.hive.ql.metadata.Partition> partitions =
new ArrayList<org.apache.hadoop.hive.ql.metadata.Partition>();
if (tpartitions != null) {
for (Partition tpartition : tpartitions) {
partitions.add(new org.apache.hadoop.hive.ql.metadata.Partition(table, tpartition));
}
}
if (!table.getPartitionKeys().isEmpty()) {
Map<String, String> partitionValues = jobInfo.getTableInfo().getPartitionValues();
org.apache.hadoop.hive.ql.metadata.Partition partition =
new org.apache.hadoop.hive.ql.metadata.Partition(table,
partitionValues,
new Path(tblPath, Warehouse.makePartPath(partitionValues)));
partition.getTPartition().setParameters(table.getParameters());
partitions.add(partition);
}
EximUtil.createExportDump(fs, path, (table), partitions);
} catch (SemanticException e) {
throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
} catch (HiveException e) {
throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
} catch (MetaException e) {
throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
}
}
}