blob: 3e21cc765dba8791f90dd035fa91870a4f8f18f2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hcatalog.pig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatException;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.mapreduce.HCatEximOutputCommitter;
import org.apache.hcatalog.mapreduce.HCatEximOutputFormat;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.pig.ResourceSchema;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
/**
* HCatEximStorer.
*
*/
public class HCatEximStorer extends HCatBaseStorer {
private static final Log LOG = LogFactory.getLog(HCatEximStorer.class);
private final String outputLocation;
public HCatEximStorer(String outputLocation) throws Exception {
this(outputLocation, null, null);
}
public HCatEximStorer(String outputLocation, String partitionSpec) throws Exception {
this(outputLocation, partitionSpec, null);
}
public HCatEximStorer(String outputLocation, String partitionSpec, String schema)
throws Exception {
super(partitionSpec, schema);
this.outputLocation = outputLocation;
LOG.debug("HCatEximStorer called");
}
@Override
public OutputFormat getOutputFormat() throws IOException {
LOG.debug("getOutputFormat called");
return new HCatEximOutputFormat();
}
@Override
public void setStoreLocation(String location, Job job) throws IOException {
LOG.debug("setStoreLocation called with :" + location);
String[] userStr = location.split("\\.");
String dbname = MetaStoreUtils.DEFAULT_DATABASE_NAME;
String tablename = null;
if (userStr.length == 2) {
dbname = userStr[0];
tablename = userStr[1];
} else {
tablename = userStr[0];
}
Properties p = UDFContext.getUDFContext()
.getUDFProperties(this.getClass(), new String[] {sign});
Configuration config = job.getConfiguration();
if (!HCatUtil.checkJobContextIfRunningFromBackend(job)) {
Schema schema = (Schema) ObjectSerializer.deserialize(p.getProperty(PIG_SCHEMA));
if (schema != null) {
pigSchema = schema;
}
if (pigSchema == null) {
throw new FrontendException("Schema for data cannot be determined.",
PigHCatUtil.PIG_EXCEPTION_CODE);
}
HCatSchema hcatTblSchema = new HCatSchema(new ArrayList<HCatFieldSchema>());
try {
doSchemaValidations(pigSchema, hcatTblSchema);
} catch (HCatException he) {
throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
}
List<HCatFieldSchema> hcatFields = new ArrayList<HCatFieldSchema>();
List<String> partVals = new ArrayList<String>();
for (String key : partitionKeys) {
hcatFields.add(new HCatFieldSchema(key, HCatFieldSchema.Type.STRING, ""));
partVals.add(partitions.get(key));
}
HCatSchema outputSchema = convertPigSchemaToHCatSchema(pigSchema,
hcatTblSchema);
LOG.debug("Pig Schema '" + pigSchema.toString() + "' was converted to HCatSchema '"
+ outputSchema);
HCatEximOutputFormat.setOutput(job,
dbname, tablename,
outputLocation,
new HCatSchema(hcatFields),
partVals,
outputSchema);
p.setProperty(COMPUTED_OUTPUT_SCHEMA, ObjectSerializer.serialize(outputSchema));
p.setProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO,
config.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
if (config.get(HCatConstants.HCAT_KEY_HIVE_CONF) != null) {
p.setProperty(HCatConstants.HCAT_KEY_HIVE_CONF,
config.get(HCatConstants.HCAT_KEY_HIVE_CONF));
}
} else {
config.set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO));
if (p.getProperty(HCatConstants.HCAT_KEY_HIVE_CONF) != null) {
config.set(HCatConstants.HCAT_KEY_HIVE_CONF,
p.getProperty(HCatConstants.HCAT_KEY_HIVE_CONF));
}
}
}
@Override
public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException {
if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") ) {
//In local mode, mapreduce will not call OutputCommitter.cleanupJob.
//Calling it from here so that the partition publish happens.
//This call needs to be removed after MAPREDUCE-1447 is fixed.
new HCatEximOutputCommitter(job,null).cleanupJob(job);
}
}
}