blob: 7e4dba441018406bc458a4d1bb6fb9c6083d3c07 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.har;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.Constants;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.tools.HadoopArchives;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatException;
public class HarOutputCommitterPostProcessor {
boolean isEnabled = false;
public boolean isEnabled() {
return isEnabled;
}
public void setEnabled(boolean enabled) {
this.isEnabled = enabled;
}
public void exec(JobContext context, Partition partition, Path partPath) throws IOException {
// LOG.info("Archiving partition ["+partPath.toString()+"]");
makeHar(context, partPath.toUri().toString(), harFile(partPath));
partition.getParameters().put(Constants.IS_ARCHIVED, "true");
}
public String harFile(Path ptnPath) throws IOException {
String harFile = ptnPath.toString().replaceFirst("/+$", "") + ".har";
// LOG.info("har file : " + harFile);
return harFile;
}
public String getParentFSPath(Path ptnPath) throws IOException {
return ptnPath.toUri().getPath().replaceFirst("/+$", "");
}
public String getProcessedLocation(Path ptnPath) throws IOException {
String harLocn = ("har://" + ptnPath.toUri().getPath()).replaceFirst("/+$", "") + ".har" + Path.SEPARATOR;
// LOG.info("har location : " + harLocn);
return harLocn;
}
/**
* Creates a har file from the contents of a given directory, using that as root.
* @param dir Directory to archive
* @param harFile The HAR file to create
*/
public static void makeHar(JobContext context, String dir, String harFile) throws IOException {
// Configuration conf = context.getConfiguration();
// Credentials creds = context.getCredentials();
// HCatUtil.logAllTokens(LOG,context);
int lastSep = harFile.lastIndexOf(Path.SEPARATOR_CHAR);
Path archivePath = new Path(harFile.substring(0, lastSep));
final String[] args = {
"-archiveName",
harFile.substring(lastSep + 1, harFile.length()),
"-p",
dir,
"*",
archivePath.toString()
};
// for (String arg : args){
// LOG.info("Args to har : "+ arg);
// }
try {
Configuration newConf = new Configuration();
FileSystem fs = archivePath.getFileSystem(newConf);
String hadoopTokenFileLocationEnvSetting = System.getenv(HCatConstants.SYSENV_HADOOP_TOKEN_FILE_LOCATION);
if ((hadoopTokenFileLocationEnvSetting != null) && (!hadoopTokenFileLocationEnvSetting.isEmpty())) {
newConf.set(HCatConstants.CONF_MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocationEnvSetting);
// LOG.info("System.getenv(\"HADOOP_TOKEN_FILE_LOCATION\") =["+ System.getenv("HADOOP_TOKEN_FILE_LOCATION")+"]");
}
// for (FileStatus ds : fs.globStatus(new Path(dir, "*"))){
// LOG.info("src : "+ds.getPath().toUri().toString());
// }
final HadoopArchives har = new HadoopArchives(newConf);
int rc = ToolRunner.run(har, args);
if (rc != 0) {
throw new Exception("Har returned error code " + rc);
}
// for (FileStatus hs : fs.globStatus(new Path(harFile, "*"))){
// LOG.info("dest : "+hs.getPath().toUri().toString());
// }
// doHarCheck(fs,harFile);
// LOG.info("Nuking " + dir);
fs.delete(new Path(dir), true);
} catch (Exception e) {
throw new HCatException("Error creating Har [" + harFile + "] from [" + dir + "]", e);
}
}
}