| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator; |
| |
| import java.io.IOException; |
| import java.util.Map; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.pig.backend.executionengine.ExecException; |
| import org.apache.pig.backend.hadoop.HDataType; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; |
| import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.impl.io.NullablePartitionWritable; |
| import org.apache.pig.impl.io.NullableTuple; |
| import org.apache.pig.impl.io.PigNullableWritable; |
| import org.apache.pig.impl.plan.OperatorKey; |
| import org.apache.tez.runtime.api.LogicalOutput; |
| import org.apache.tez.runtime.library.api.KeyValueWriter; |
| |
| /** |
| * POLocalRearrangeTez is used to write to a Tez OrderedPartitionedKVOutput |
| * (shuffle) or UnorderedKVOutput (broadcast) |
| */ |
| public class POLocalRearrangeTez extends POLocalRearrange implements TezOutput { |
| |
| private static final long serialVersionUID = 1L; |
| private static final Log LOG = LogFactory.getLog(POLocalRearrangeTez.class); |
| |
| protected String outputKey; |
| protected boolean connectedToPackage = true; |
| protected boolean isSkewedJoin = false; |
| |
| protected transient KeyValueWriter writer; |
| |
| public POLocalRearrangeTez(OperatorKey k) { |
| super(k); |
| } |
| |
| public POLocalRearrangeTez(OperatorKey k, int rp) { |
| super(k, rp); |
| } |
| |
| public POLocalRearrangeTez(POLocalRearrange copy) { |
| super(copy); |
| if (copy instanceof POLocalRearrangeTez) { |
| POLocalRearrangeTez copyTez = (POLocalRearrangeTez) copy; |
| this.isSkewedJoin = copyTez.isSkewedJoin; |
| this.connectedToPackage = copyTez.connectedToPackage; |
| this.outputKey = copyTez.outputKey; |
| } |
| } |
| |
| public boolean containsOutputKey(String key) { |
| return outputKey.equals(key); |
| } |
| |
| public void setOutputKey(String outputKey) { |
| this.outputKey = outputKey; |
| } |
| |
| public boolean isConnectedToPackage() { |
| return connectedToPackage; |
| } |
| |
| public void setConnectedToPackage(boolean connectedToPackage) { |
| this.connectedToPackage = connectedToPackage; |
| } |
| |
| public boolean isSkewedJoin() { |
| return isSkewedJoin; |
| } |
| |
| public void setSkewedJoin(boolean isSkewedJoin) { |
| this.isSkewedJoin = isSkewedJoin; |
| } |
| |
| @Override |
| public String[] getTezOutputs() { |
| return new String[] { outputKey }; |
| } |
| |
| @Override |
| public void replaceOutput(String oldOutputKey, String newOutputKey) { |
| if (oldOutputKey.equals(outputKey)) { |
| outputKey = newOutputKey; |
| } |
| } |
| |
| @Override |
| public void attachOutputs(Map<String, LogicalOutput> outputs, |
| Configuration conf) throws ExecException { |
| LogicalOutput output = outputs.get(outputKey); |
| if (output == null) { |
| throw new ExecException("Output to vertex " + outputKey + " is missing"); |
| } |
| try { |
| writer = (KeyValueWriter) output.getWriter(); |
| LOG.info("Attached output to vertex " + outputKey + " : output=" + output + ", writer=" + writer); |
| } catch (Exception e) { |
| throw new ExecException(e); |
| } |
| } |
| |
| protected Result getRearrangedTuple() throws ExecException { |
| return super.getNextTuple(); |
| } |
| |
| @Override |
| public Result getNextTuple() throws ExecException { |
| res = super.getNextTuple(); |
| if (writer == null) { // In the case of combiner |
| return res; |
| } |
| |
| try { |
| switch (res.returnStatus) { |
| case POStatus.STATUS_OK: |
| if (illustrator == null) { |
| Tuple result = (Tuple) res.result; |
| Byte index = (Byte) result.get(0); |
| PigNullableWritable key = HDataType.getWritableComparableTypes(result.get(1), keyType); |
| NullableTuple val = new NullableTuple((Tuple)result.get(2)); |
| |
| // Both the key and the value need the index. The key needs it so |
| // that it can be sorted on the index in addition to the key |
| // value. The value needs it so that POPackage can properly |
| // assign the tuple to its slot in the projection. |
| key.setIndex(index); |
| val.setIndex(index); |
| if (isSkewedJoin) { |
| // Wrap into a NullablePartitionWritable to match the key |
| // of the right table from POPartitionRearrangeTez for the skewed join |
| NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key); |
| wrappedKey.setPartition(-1); |
| key = wrappedKey; |
| } |
| writer.write(key, val); |
| } else { |
| illustratorMarkup(res.result, res.result, 0); |
| } |
| res = RESULT_EMPTY; |
| break; |
| case POStatus.STATUS_EOP: |
| case POStatus.STATUS_ERR: |
| case POStatus.STATUS_NULL: |
| default: |
| break; |
| } |
| } catch (IOException ioe) { |
| int errCode = 2135; |
| String msg = "Received error from POLocalRearrage function." + ioe.getMessage(); |
| throw new ExecException(msg, errCode, ioe); |
| } |
| return inp; |
| } |
| |
| @Override |
| public POLocalRearrangeTez clone() throws CloneNotSupportedException { |
| return (POLocalRearrangeTez) super.clone(); |
| } |
| |
| @Override |
| public String name() { |
| return super.name() + "\t->\t " + outputKey; |
| } |
| } |