| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hcatalog.mapreduce; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hive.common.FileUtils; |
| import org.apache.hadoop.hive.metastore.HiveMetaHook; |
| import org.apache.hadoop.hive.ql.io.RCFile; |
| import org.apache.hadoop.hive.ql.metadata.HiveException; |
| import org.apache.hadoop.hive.ql.plan.TableDesc; |
| import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider; |
| import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; |
| import org.apache.hadoop.hive.serde2.SerDe; |
| import org.apache.hadoop.mapred.InputFormat; |
| import org.apache.hadoop.mapred.OutputFormat; |
| import org.apache.hcatalog.common.HCatConstants; |
| import org.apache.hcatalog.common.HCatUtil; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| |
| /** |
| * This class is used to encapsulate the InputFormat, OutputFormat and SerDe |
| * artifacts of tables which don't define a SerDe. This StorageHandler assumes |
| * the supplied storage artifacts are for a file-based storage system. |
| */ |
| public class FosterStorageHandler extends HCatStorageHandler { |
| |
| public Configuration conf; |
| /** The directory under which data is initially written for a partitioned table */ |
| protected static final String DYNTEMP_DIR_NAME = "_DYN"; |
| |
| /** The directory under which data is initially written for a non partitioned table */ |
| protected static final String TEMP_DIR_NAME = "_TEMP"; |
| |
| private Class<? extends InputFormat> ifClass; |
| private Class<? extends OutputFormat> ofClass; |
| private Class<? extends SerDe> serDeClass; |
| |
| public FosterStorageHandler(String ifName, String ofName, String serdeName) throws ClassNotFoundException { |
| this((Class<? extends InputFormat>) Class.forName(ifName), |
| (Class<? extends OutputFormat>) Class.forName(ofName), |
| (Class<? extends SerDe>) Class.forName(serdeName)); |
| } |
| public FosterStorageHandler(Class<? extends InputFormat> ifClass, |
| Class<? extends OutputFormat> ofClass, |
| Class<? extends SerDe> serDeClass) { |
| this.ifClass = ifClass; |
| this.ofClass = ofClass; |
| this.serDeClass = serDeClass; |
| } |
| |
| @Override |
| public Class<? extends InputFormat> getInputFormatClass() { |
| return ifClass; //To change body of overridden methods use File | Settings | File Templates. |
| } |
| |
| @Override |
| public Class<? extends OutputFormat> getOutputFormatClass() { |
| return ofClass; //To change body of overridden methods use File | Settings | File Templates. |
| } |
| |
| @Override |
| public Class<? extends SerDe> getSerDeClass() { |
| return serDeClass; //To change body of implemented methods use File | Settings | File Templates. |
| } |
| |
| @Override |
| public HiveMetaHook getMetaHook() { |
| return null; |
| } |
| |
| @Override |
| public void configureInputJobProperties(TableDesc tableDesc, |
| Map<String, String> jobProperties) { |
| |
| } |
| |
| @Override |
| public void configureOutputJobProperties(TableDesc tableDesc, |
| Map<String, String> jobProperties) { |
| try { |
| OutputJobInfo jobInfo = (OutputJobInfo) |
| HCatUtil.deserialize(tableDesc.getJobProperties().get( |
| HCatConstants.HCAT_KEY_OUTPUT_INFO)); |
| String parentPath = jobInfo.getTableInfo().getTableLocation(); |
| String dynHash = tableDesc.getJobProperties().get( |
| HCatConstants.HCAT_DYNAMIC_PTN_JOBID); |
| |
| // For dynamic partitioned writes without all keyvalues specified, |
| // we create a temp dir for the associated write job |
| if (dynHash != null){ |
| parentPath = new Path(parentPath, |
| DYNTEMP_DIR_NAME+dynHash).toString(); |
| } |
| |
| String outputLocation; |
| |
| // For non-partitioned tables, we send them to the temp dir |
| if(dynHash == null && jobInfo.getPartitionValues().size() == 0) { |
| outputLocation = TEMP_DIR_NAME; |
| } |
| else { |
| List<String> cols = new ArrayList<String>(); |
| List<String> values = new ArrayList<String>(); |
| |
| //Get the output location in the order partition keys are defined for the table. |
| for(String name: |
| jobInfo.getTableInfo(). |
| getPartitionColumns().getFieldNames()) { |
| String value = jobInfo.getPartitionValues().get(name); |
| cols.add(name); |
| values.add(value); |
| } |
| outputLocation = FileUtils.makePartName(cols, values); |
| } |
| |
| jobInfo.setLocation(new Path(parentPath,outputLocation).toString()); |
| |
| //only set output dir if partition is fully materialized |
| if(jobInfo.getPartitionValues().size() |
| == jobInfo.getTableInfo().getPartitionColumns().size()) { |
| jobProperties.put("mapred.output.dir", jobInfo.getLocation()); |
| } |
| |
| //TODO find a better home for this, RCFile specifc |
| jobProperties.put(RCFile.COLUMN_NUMBER_CONF_STR, |
| Integer.toOctalString( |
| jobInfo.getOutputSchema().getFields().size())); |
| jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO, |
| HCatUtil.serialize(jobInfo)); |
| } catch (IOException e) { |
| throw new IllegalStateException("Failed to set output path",e); |
| } |
| |
| } |
| |
| @Override |
| OutputFormatContainer getOutputFormatContainer( |
| org.apache.hadoop.mapred.OutputFormat outputFormat) { |
| return new FileOutputFormatContainer(outputFormat); |
| } |
| |
| @Override |
| public Configuration getConf() { |
| return conf; |
| } |
| |
| @Override |
| public void setConf(Configuration conf) { |
| this.conf = conf; |
| } |
| |
| @Override |
| public HiveAuthorizationProvider getAuthorizationProvider() |
| throws HiveException { |
| return new DefaultHiveAuthorizationProvider(); |
| } |
| |
| } |