blob: ba603eb13d3e3b1539b838e5455eb6bd61d1302c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.pig.drivers;
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.mapreduce.OutputJobInfo;
import org.apache.hcatalog.pig.PigHCatUtil;
import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.StoreMetadata;
import org.apache.pig.data.Tuple;
public class StoreFuncBasedOutputFormat extends
OutputFormat<BytesWritable, Tuple> {
private final StoreFuncInterface storeFunc;
public StoreFuncBasedOutputFormat(StoreFuncInterface storeFunc) {
this.storeFunc = storeFunc;
}
@Override
public void checkOutputSpecs(JobContext jobContext) throws IOException,
InterruptedException {
OutputFormat<BytesWritable, Tuple> outputFormat = storeFunc.getOutputFormat();
outputFormat.checkOutputSpecs(jobContext);
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext ctx)
throws IOException, InterruptedException {
String serializedJobInfo = ctx.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(serializedJobInfo);
ResourceSchema rs = PigHCatUtil.getResourceSchema(outputJobInfo.getOutputSchema());
String location = outputJobInfo.getLocation();
OutputFormat<BytesWritable, Tuple> outputFormat = storeFunc.getOutputFormat();
return new StoreFuncBasedOutputCommitter(storeFunc, outputFormat.getOutputCommitter(ctx), location, rs);
}
@Override
public RecordWriter<BytesWritable, Tuple> getRecordWriter(
TaskAttemptContext ctx) throws IOException, InterruptedException {
RecordWriter<BytesWritable, Tuple> writer = storeFunc.getOutputFormat().getRecordWriter(ctx);
String serializedJobInfo = ctx.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(serializedJobInfo);
ResourceSchema rs = PigHCatUtil.getResourceSchema(outputJobInfo.getOutputSchema());
String location = outputJobInfo.getLocation();
return new StoreFuncBasedRecordWriter(writer, storeFunc, location, rs);
}
static class StoreFuncBasedRecordWriter extends RecordWriter<BytesWritable, Tuple> {
private final RecordWriter<BytesWritable, Tuple> writer;
private final StoreFuncInterface storeFunc;
private final ResourceSchema schema;
private final String location;
public StoreFuncBasedRecordWriter(RecordWriter<BytesWritable, Tuple> writer, StoreFuncInterface sf, String location, ResourceSchema rs) throws IOException {
this.writer = writer;
this.storeFunc = sf;
this.schema = rs;
this.location = location;
storeFunc.prepareToWrite(writer);
}
@Override
public void close(TaskAttemptContext ctx) throws IOException,
InterruptedException {
writer.close(ctx);
}
@Override
public void write(BytesWritable key, Tuple value) throws IOException,
InterruptedException {
storeFunc.putNext(value);
}
}
static class StoreFuncBasedOutputCommitter extends OutputCommitter {
StoreFuncInterface sf;
OutputCommitter wrappedOutputCommitter;
String location;
ResourceSchema rs;
public StoreFuncBasedOutputCommitter(StoreFuncInterface sf, OutputCommitter outputCommitter, String location, ResourceSchema rs) {
this.sf = sf;
this.wrappedOutputCommitter = outputCommitter;
this.location = location;
this.rs = rs;
}
@Override
public void abortTask(TaskAttemptContext context) throws IOException {
wrappedOutputCommitter.abortTask(context);
}
@Override
public void commitTask(TaskAttemptContext context) throws IOException {
wrappedOutputCommitter.commitTask(context);
}
@Override
public boolean needsTaskCommit(TaskAttemptContext context)
throws IOException {
return wrappedOutputCommitter.needsTaskCommit(context);
}
@Override
public void setupJob(JobContext context) throws IOException {
wrappedOutputCommitter.setupJob(context);
}
@Override
public void setupTask(TaskAttemptContext context) throws IOException {
wrappedOutputCommitter.setupTask(context);
}
public void commitJob(JobContext context) throws IOException {
wrappedOutputCommitter.commitJob(context);
if (sf instanceof StoreMetadata) {
if (rs != null) {
((StoreMetadata) sf).storeSchema(
rs, location, new Job(context.getConfiguration()));
}
}
}
@Override
public void cleanupJob(JobContext context) throws IOException {
wrappedOutputCommitter.cleanupJob(context);
if (sf instanceof StoreMetadata) {
if (rs != null) {
((StoreMetadata) sf).storeSchema(
rs, location, new Job(context.getConfiguration()));
}
}
}
public void abortJob(JobContext context, JobStatus.State state) throws IOException {
wrappedOutputCommitter.abortJob(context, state);
}
}
}