blob: 1552103cd568ea3983e1d6f49932351d8b5131dc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
import java.io.IOException;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullablePartitionWritable;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.library.api.KeyValueWriter;
/**
* POLocalRearrangeTez is used to write to a Tez OrderedPartitionedKVOutput
* (shuffle) or UnorderedKVOutput (broadcast)
*/
public class POLocalRearrangeTez extends POLocalRearrange implements TezOutput {
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(POLocalRearrangeTez.class);
protected String outputKey;
protected boolean connectedToPackage = true;
protected boolean isSkewedJoin = false;
protected transient KeyValueWriter writer;
public POLocalRearrangeTez(OperatorKey k) {
super(k);
}
public POLocalRearrangeTez(OperatorKey k, int rp) {
super(k, rp);
}
public POLocalRearrangeTez(POLocalRearrange copy) {
super(copy);
if (copy instanceof POLocalRearrangeTez) {
POLocalRearrangeTez copyTez = (POLocalRearrangeTez) copy;
this.isSkewedJoin = copyTez.isSkewedJoin;
this.connectedToPackage = copyTez.connectedToPackage;
this.outputKey = copyTez.outputKey;
}
}
public boolean containsOutputKey(String key) {
return outputKey.equals(key);
}
public void setOutputKey(String outputKey) {
this.outputKey = outputKey;
}
public boolean isConnectedToPackage() {
return connectedToPackage;
}
public void setConnectedToPackage(boolean connectedToPackage) {
this.connectedToPackage = connectedToPackage;
}
public boolean isSkewedJoin() {
return isSkewedJoin;
}
public void setSkewedJoin(boolean isSkewedJoin) {
this.isSkewedJoin = isSkewedJoin;
}
@Override
public String[] getTezOutputs() {
return new String[] { outputKey };
}
@Override
public void replaceOutput(String oldOutputKey, String newOutputKey) {
if (oldOutputKey.equals(outputKey)) {
outputKey = newOutputKey;
}
}
@Override
public void attachOutputs(Map<String, LogicalOutput> outputs,
Configuration conf) throws ExecException {
LogicalOutput output = outputs.get(outputKey);
if (output == null) {
throw new ExecException("Output to vertex " + outputKey + " is missing");
}
try {
writer = (KeyValueWriter) output.getWriter();
LOG.info("Attached output to vertex " + outputKey + " : output=" + output + ", writer=" + writer);
} catch (Exception e) {
throw new ExecException(e);
}
}
protected Result getRearrangedTuple() throws ExecException {
return super.getNextTuple();
}
@Override
public Result getNextTuple() throws ExecException {
res = super.getNextTuple();
if (writer == null) { // In the case of combiner
return res;
}
try {
switch (res.returnStatus) {
case POStatus.STATUS_OK:
if (illustrator == null) {
Tuple result = (Tuple) res.result;
Byte index = (Byte) result.get(0);
PigNullableWritable key = HDataType.getWritableComparableTypes(result.get(1), keyType);
NullableTuple val = new NullableTuple((Tuple)result.get(2));
// Both the key and the value need the index. The key needs it so
// that it can be sorted on the index in addition to the key
// value. The value needs it so that POPackage can properly
// assign the tuple to its slot in the projection.
key.setIndex(index);
val.setIndex(index);
if (isSkewedJoin) {
// Wrap into a NullablePartitionWritable to match the key
// of the right table from POPartitionRearrangeTez for the skewed join
NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key);
wrappedKey.setPartition(-1);
key = wrappedKey;
}
writer.write(key, val);
} else {
illustratorMarkup(res.result, res.result, 0);
}
res = RESULT_EMPTY;
break;
case POStatus.STATUS_EOP:
case POStatus.STATUS_ERR:
case POStatus.STATUS_NULL:
default:
break;
}
} catch (IOException ioe) {
int errCode = 2135;
String msg = "Received error from POLocalRearrage function." + ioe.getMessage();
throw new ExecException(msg, errCode, ioe);
}
return inp;
}
@Override
public POLocalRearrangeTez clone() throws CloneNotSupportedException {
return (POLocalRearrangeTez) super.clone();
}
@Override
public String name() {
return super.name() + "\t->\t " + outputKey;
}
}