blob: 031afdaa37c4f0456b820cbae0e02d733bddabca [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.pig;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.mapreduce.InputJobInfo;
import org.apache.hcatalog.mapreduce.PartInfo;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.LoadPushDown;
import org.apache.pig.PigException;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.UDFContext;
/**
* Base class for HCatLoader and HCatEximLoader
*/
public abstract class HCatBaseLoader extends LoadFunc implements LoadMetadata, LoadPushDown {
protected static final String PRUNE_PROJECTION_INFO = "prune.projection.info";
private RecordReader<?, ?> reader;
protected String signature;
HCatSchema outputSchema = null;
@Override
public Tuple getNext() throws IOException {
try {
HCatRecord hr = (HCatRecord) (reader.nextKeyValue() ? reader.getCurrentValue() : null);
Tuple t = PigHCatUtil.transformToTuple(hr, outputSchema);
// TODO : we were discussing an iter interface, and also a LazyTuple
// change this when plans for that solidifies.
return t;
} catch (ExecException e) {
int errCode = 6018;
String errMsg = "Error while reading input";
throw new ExecException(errMsg, errCode,
PigException.REMOTE_ENVIRONMENT, e);
} catch (Exception eOther) {
int errCode = 6018;
String errMsg = "Error converting read value to tuple";
throw new ExecException(errMsg, errCode,
PigException.REMOTE_ENVIRONMENT, eOther);
}
}
@Override
public void prepareToRead(RecordReader reader, PigSplit arg1) throws IOException {
this.reader = reader;
}
@Override
public ResourceStatistics getStatistics(String location, Job job) throws IOException {
// statistics not implemented currently
return null;
}
@Override
public List<OperatorSet> getFeatures() {
return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
}
@Override
public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldsInfo) throws FrontendException {
// Store the required fields information in the UDFContext so that we
// can retrieve it later.
storeInUDFContext(signature, PRUNE_PROJECTION_INFO, requiredFieldsInfo);
// HCat will always prune columns based on what we ask of it - so the
// response is true
return new RequiredFieldResponse(true);
}
@Override
public void setUDFContextSignature(String signature) {
this.signature = signature;
}
// helper methods
protected void storeInUDFContext(String signature, String key, Object value) {
UDFContext udfContext = UDFContext.getUDFContext();
Properties props = udfContext.getUDFProperties(
this.getClass(), new String[]{signature});
props.put(key, value);
}
/**
* A utility method to get the size of inputs. This is accomplished by summing the
* size of all input paths on supported FileSystems. Locations whose size cannot be
* determined are ignored. Note non-FileSystem and unpartitioned locations will not
* report their input size by default.
*/
protected static long getSizeInBytes(InputJobInfo inputJobInfo) throws IOException {
Configuration conf = new Configuration();
long sizeInBytes = 0;
for (PartInfo partInfo : inputJobInfo.getPartitions()) {
try {
Path p = new Path(partInfo.getLocation());
if (p.getFileSystem(conf).isFile(p)) {
sizeInBytes += p.getFileSystem(conf).getFileStatus(p).getLen();
} else {
for (FileStatus child : p.getFileSystem(conf).listStatus(p)) {
sizeInBytes += child.getLen();
}
}
} catch (IOException e) {
// Report size to the extent possible.
}
}
return sizeInBytes;
}
}