blob: 0ab8c22f7019108e8c0c350cb733e7a1378e6ed6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hcatalog.mapreduce;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hcatalog.common.ErrorType;
import org.apache.hcatalog.common.HCatException;
public class HCatEximOutputCommitter extends OutputCommitter {
private static final Log LOG = LogFactory.getLog(HCatEximOutputCommitter.class);
private final OutputCommitter baseCommitter;
public HCatEximOutputCommitter(JobContext context, OutputCommitter baseCommitter) {
this.baseCommitter = baseCommitter;
}
@Override
public void abortTask(TaskAttemptContext context) throws IOException {
baseCommitter.abortTask(context);
}
@Override
public void commitTask(TaskAttemptContext context) throws IOException {
baseCommitter.commitTask(context);
}
@Override
public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
return baseCommitter.needsTaskCommit(context);
}
@Override
public void setupJob(JobContext context) throws IOException {
if( baseCommitter != null ) {
baseCommitter.setupJob(context);
}
}
@Override
public void setupTask(TaskAttemptContext context) throws IOException {
baseCommitter.setupTask(context);
}
@Override
public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
if(baseCommitter != null) {
baseCommitter.abortJob(jobContext, state);
}
OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
Path src = new Path(jobInfo.getLocation());
FileSystem fs = src.getFileSystem(jobContext.getConfiguration());
fs.delete(src, true);
}
@Override
public void commitJob(JobContext jobContext) throws IOException {
if(baseCommitter != null) {
baseCommitter.commitJob(jobContext);
}
}
@Override
public void cleanupJob(JobContext jobContext) throws IOException {
LOG.info("HCatEximOutputCommitter.cleanup invoked; m.o.d : " +
jobContext.getConfiguration().get("mapred.output.dir"));
if (baseCommitter != null) {
LOG.info("baseCommitter.class = " + baseCommitter.getClass().getName());
baseCommitter.cleanupJob(jobContext);
}
OutputJobInfo jobInfo = HCatBaseOutputFormat.getJobInfo(jobContext);
Configuration conf = jobContext.getConfiguration();
FileSystem fs;
try {
fs = FileSystem.get(new URI(jobInfo.getTableInfo().getTable().getSd().getLocation()), conf);
} catch (URISyntaxException e) {
throw new IOException(e);
}
doCleanup(jobInfo, fs);
}
private static void doCleanup(OutputJobInfo jobInfo, FileSystem fs) throws IOException,
HCatException {
try {
Table ttable = jobInfo.getTableInfo().getTable();
org.apache.hadoop.hive.ql.metadata.Table table = new org.apache.hadoop.hive.ql.metadata.Table(
ttable);
StorageDescriptor tblSD = ttable.getSd();
Path tblPath = new Path(tblSD.getLocation());
Path path = new Path(tblPath, "_metadata");
List<Partition> tpartitions = null;
try {
Map.Entry<org.apache.hadoop.hive.metastore.api.Table, List<Partition>> rv = EximUtil
.readMetaData(fs, path);
tpartitions = rv.getValue();
} catch (IOException e) {
}
List<org.apache.hadoop.hive.ql.metadata.Partition> partitions =
new ArrayList<org.apache.hadoop.hive.ql.metadata.Partition>();
if (tpartitions != null) {
for (Partition tpartition : tpartitions) {
partitions.add(new org.apache.hadoop.hive.ql.metadata.Partition(table, tpartition));
}
}
if (!table.getPartitionKeys().isEmpty()) {
Map<String, String> partitionValues = jobInfo.getPartitionValues();
org.apache.hadoop.hive.ql.metadata.Partition partition =
new org.apache.hadoop.hive.ql.metadata.Partition(table,
partitionValues,
new Path(tblPath, Warehouse.makePartPath(partitionValues)));
partition.getTPartition().setParameters(table.getParameters());
partitions.add(partition);
}
EximUtil.createExportDump(fs, path, (table), partitions);
} catch (SemanticException e) {
throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
} catch (HiveException e) {
throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
} catch (MetaException e) {
throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
}
}
}