| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.hcatalog.mapreduce; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FsStatus; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hive.common.FileUtils; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.io.WritableComparable; |
| import org.apache.hadoop.mapreduce.JobContext; |
| import org.apache.hadoop.mapreduce.JobStatus.State; |
| import org.apache.hadoop.mapreduce.OutputCommitter; |
| import org.apache.hadoop.mapreduce.OutputFormat; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; |
| import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
| import org.apache.hadoop.security.AccessControlException; |
| import org.apache.hcatalog.data.HCatRecord; |
| import org.apache.hcatalog.data.schema.HCatSchema; |
| |
| |
| /** The abstract class to be implemented by underlying storage drivers to enable data access from HCat through |
| * HCatOutputFormat. |
| */ |
| public abstract class HCatOutputStorageDriver { |
| |
| |
| /** |
| * Initialize the storage driver with specified properties, default implementation does nothing. |
| * @param context the job context object |
| * @param hcatProperties the properties for the storage driver |
| * @throws IOException Signals that an I/O exception has occurred. |
| */ |
| public void initialize(JobContext context, Properties hcatProperties) throws IOException { |
| } |
| |
| /** |
| * Returns the OutputFormat to use with this Storage Driver. |
| * @return the OutputFormat instance |
| * @throws IOException Signals that an I/O exception has occurred. |
| */ |
| public abstract OutputFormat<? super WritableComparable<?>, ? super Writable> getOutputFormat() throws IOException; |
| |
| /** |
| * Set the data location for the output. |
| * @param jobContext the job context object |
| * @param location the data location |
| * @throws IOException Signals that an I/O exception has occurred. |
| */ |
| public abstract void setOutputPath(JobContext jobContext, String location) throws IOException; |
| |
| /** |
| * Set the schema for the data being written out. |
| * @param jobContext the job context object |
| * @param schema the data schema |
| * @throws IOException Signals that an I/O exception has occurred. |
| */ |
| public abstract void setSchema(JobContext jobContext, HCatSchema schema) throws IOException; |
| |
| /** |
| * Sets the partition key values for the partition being written. |
| * @param jobContext the job context object |
| * @param partitionValues the partition values |
| * @throws IOException Signals that an I/O exception has occurred. |
| */ |
| public abstract void setPartitionValues(JobContext jobContext, Map<String, String> partitionValues) throws IOException; |
| |
| /** |
| * Generate the key for the underlying outputformat. The value given to HCatOutputFormat is passed as the |
| * argument. The key given to HCatOutputFormat is ignored.. |
| * @param value the value given to HCatOutputFormat |
| * @return a key instance |
| * @throws IOException Signals that an I/O exception has occurred. |
| */ |
| public abstract WritableComparable<?> generateKey(HCatRecord value) throws IOException; |
| |
| /** |
| * Convert the given HCatRecord value to the actual value type. |
| * @param value the HCatRecord value to convert |
| * @return a value instance |
| * @throws IOException Signals that an I/O exception has occurred. |
| */ |
| public abstract Writable convertValue(HCatRecord value) throws IOException; |
| |
| /** |
| * Gets the location to use for the specified partition values. |
| * The storage driver can override as required. |
| * @param jobContext the job context object |
| * @param tableLocation the location of the table |
| * @param partitionValues the partition values |
| * @param dynHash A unique hash value that represents the dynamic partitioning job used |
| * @return the location String. |
| * @throws IOException Signals that an I/O exception has occurred. |
| */ |
| public String getOutputLocation(JobContext jobContext, |
| String tableLocation, List<String> partitionCols, Map<String, String> partitionValues, String dynHash) throws IOException { |
| |
| String parentPath = tableLocation; |
| // For dynamic partitioned writes without all keyvalues specified, |
| // we create a temp dir for the associated write job |
| if (dynHash != null){ |
| parentPath = new Path(tableLocation, HCatOutputFormat.DYNTEMP_DIR_NAME+dynHash).toString(); |
| } |
| |
| // For non-partitioned tables, we send them to the temp dir |
| if((dynHash == null) && ( partitionValues == null || partitionValues.size() == 0 )) { |
| return new Path(tableLocation, HCatOutputFormat.TEMP_DIR_NAME).toString(); |
| } |
| |
| List<String> values = new ArrayList<String>(); |
| for(String partitionCol : partitionCols) { |
| values.add(partitionValues.get(partitionCol)); |
| } |
| |
| String partitionLocation = FileUtils.makePartName(partitionCols, values); |
| |
| Path path = new Path(parentPath, partitionLocation); |
| return path.toString(); |
| } |
| |
| /** Default implementation assumes FileOutputFormat. Storage drivers wrapping |
| * other OutputFormats should override this method. |
| */ |
| public Path getWorkFilePath(TaskAttemptContext context, String outputLoc) throws IOException{ |
| return new Path(new FileOutputCommitter(new Path(outputLoc), context).getWorkPath(), FileOutputFormat.getUniqueFile(context, "part","")); |
| } |
| |
| /** |
| * Implementation that calls the underlying output committer's setupJob, |
| * used in lieu of underlying committer's setupJob when using dynamic partitioning |
| * The default implementation should be overriden by underlying implementations |
| * that do not use FileOutputCommitter. |
| * The reason this function exists is so as to allow a storage driver implementor to |
| * override underlying OutputCommitter's setupJob implementation to allow for |
| * being called multiple times in a job, to make it idempotent. |
| * This should be written in a manner that is callable multiple times |
| * from individual tasks without stepping on each others' toes |
| * |
| * @param context |
| * @throws InterruptedException |
| * @throws IOException |
| */ |
| public void setupOutputCommitterJob(TaskAttemptContext context) |
| throws IOException, InterruptedException{ |
| getOutputFormat().getOutputCommitter(context).setupJob(context); |
| } |
| |
| /** |
| * Implementation that calls the underlying output committer's cleanupJob, |
| * used in lieu of underlying committer's cleanupJob when using dynamic partitioning |
| * This should be written in a manner that is okay to call after having had |
| * multiple underlying outputcommitters write to task dirs inside it. |
| * While the base MR cleanupJob should have sufficed normally, this is provided |
| * in order to let people implementing setupOutputCommitterJob to cleanup properly |
| * |
| * @param context |
| * @throws IOException |
| */ |
| public void cleanupOutputCommitterJob(TaskAttemptContext context) |
| throws IOException, InterruptedException{ |
| getOutputFormat().getOutputCommitter(context).cleanupJob(context); |
| } |
| |
| /** |
| * Implementation that calls the underlying output committer's abortJob, |
| * used in lieu of underlying committer's abortJob when using dynamic partitioning |
| * This should be written in a manner that is okay to call after having had |
| * multiple underlying outputcommitters write to task dirs inside it. |
| * While the base MR cleanupJob should have sufficed normally, this is provided |
| * in order to let people implementing setupOutputCommitterJob to abort properly |
| * |
| * @param context |
| * @param state |
| * @throws IOException |
| */ |
| public void abortOutputCommitterJob(TaskAttemptContext context, State state) |
| throws IOException, InterruptedException{ |
| getOutputFormat().getOutputCommitter(context).abortJob(context,state); |
| } |
| |
| |
| } |