| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hive.ql.exec.tez; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Iterator; |
| import java.util.List; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; |
| import org.apache.hadoop.hive.ql.exec.Operator; |
| import org.apache.hadoop.hive.ql.exec.Utilities; |
| import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; |
| import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; |
| import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow; |
| import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; |
| import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; |
| import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; |
| import org.apache.hadoop.hive.ql.log.PerfLogger; |
| import org.apache.hadoop.hive.ql.metadata.HiveException; |
| import org.apache.hadoop.hive.ql.plan.TableDesc; |
| import org.apache.hadoop.hive.ql.session.SessionState; |
| import org.apache.hadoop.hive.serde2.AbstractSerDe; |
| import org.apache.hadoop.hive.serde2.SerDeException; |
| import org.apache.hadoop.hive.serde2.SerDeUtils; |
| import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; |
| import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead; |
| import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead; |
| import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; |
| import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; |
| import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; |
| import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; |
| import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; |
| import org.apache.hadoop.io.BytesWritable; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.tez.runtime.api.Reader; |
| import org.apache.tez.runtime.library.api.KeyValueReader; |
| import org.apache.tez.runtime.library.api.KeyValuesReader; |
| |
| import com.google.common.base.Preconditions; |
| |
| /** |
| * Process input from tez LogicalInput and write output - for a map plan |
| * Just pump the records through the query plan. |
| */ |
| @SuppressWarnings("deprecation") |
| public class ReduceRecordSource implements RecordSource { |
| |
| public static final Logger l4j = LoggerFactory.getLogger(ReduceRecordSource.class); |
| |
| private static final String CLASS_NAME = ReduceRecordSource.class.getName(); |
| |
| private byte tag; |
| |
| private boolean abort = false; |
| |
| private AbstractSerDe inputKeySerDe; |
| |
| // Input value serde needs to be an array to support different SerDe |
| // for different tags |
| private AbstractSerDe inputValueSerDe; |
| |
| private TableDesc keyTableDesc; |
| private TableDesc valueTableDesc; |
| |
| private ObjectInspector rowObjectInspector; |
| private Operator<?> reducer; |
| |
| private Object keyObject = null; |
| private BytesWritable groupKey; |
| |
| private boolean vectorized = false; |
| |
| private VectorDeserializeRow<BinarySortableDeserializeRead> keyBinarySortableDeserializeToRow; |
| |
| private VectorDeserializeRow<LazyBinaryDeserializeRead> valueLazyBinaryDeserializeToRow; |
| |
| private VectorizedRowBatchCtx batchContext; |
| private VectorizedRowBatch batch; |
| |
| // number of columns pertaining to keys in a vectorized row batch |
| private int firstValueColumnOffset; |
| |
| private final int BATCH_BYTES = VectorizedRowBatch.DEFAULT_BYTES; |
| |
| private StructObjectInspector keyStructInspector; |
| private StructObjectInspector valueStructInspectors; |
| |
| private KeyValuesAdapter reader; |
| |
| private boolean handleGroupKey; |
| |
| private ObjectInspector valueObjectInspector; |
| |
| private final PerfLogger perfLogger = SessionState.getPerfLogger(); |
| |
| private Iterable<Object> valueWritables; |
| |
| private final GroupIterator groupIterator = new GroupIterator(); |
| |
| private long vectorizedVertexNum; |
| private int vectorizedTestingReducerBatchSize; |
| |
| // Flush the last record when reader is out of records |
| private boolean flushLastRecord = false; |
| |
| void init(JobConf jconf, Operator<?> reducer, boolean vectorized, TableDesc keyTableDesc, |
| TableDesc valueTableDesc, Reader reader, boolean handleGroupKey, byte tag, |
| VectorizedRowBatchCtx batchContext, long vectorizedVertexNum, |
| int vectorizedTestingReducerBatchSize) |
| throws Exception { |
| |
| this.vectorizedVertexNum = vectorizedVertexNum; |
| if (vectorizedTestingReducerBatchSize > VectorizedRowBatch.DEFAULT_SIZE) { |
| |
| // For now, we don't go higher than the default batch size unless we do more work |
| // to verify every vectorized operator downstream can handle a larger batch size. |
| vectorizedTestingReducerBatchSize = VectorizedRowBatch.DEFAULT_SIZE; |
| } |
| this.vectorizedTestingReducerBatchSize = vectorizedTestingReducerBatchSize; |
| ObjectInspector keyObjectInspector; |
| |
| this.reducer = reducer; |
| this.vectorized = vectorized; |
| this.keyTableDesc = keyTableDesc; |
| if (reader instanceof KeyValueReader) { |
| this.reader = new KeyValuesFromKeyValue((KeyValueReader) reader); |
| } else { |
| this.reader = new KeyValuesFromKeyValues((KeyValuesReader) reader); |
| } |
| this.handleGroupKey = handleGroupKey; |
| this.tag = tag; |
| |
| try { |
| inputKeySerDe = ReflectionUtils.newInstance(keyTableDesc.getSerDeClass(), null); |
| inputKeySerDe.initialize(null, keyTableDesc.getProperties(), null); |
| |
| keyObjectInspector = inputKeySerDe.getObjectInspector(); |
| |
| if(vectorized) { |
| keyStructInspector = (StructObjectInspector) keyObjectInspector; |
| firstValueColumnOffset = keyStructInspector.getAllStructFieldRefs().size(); |
| } |
| |
| // We should initialize the SerDe with the TypeInfo when available. |
| this.valueTableDesc = valueTableDesc; |
| inputValueSerDe = (AbstractSerDe) ReflectionUtils.newInstance(valueTableDesc.getSerDeClass(), null); |
| inputValueSerDe.initialize(null, valueTableDesc.getProperties(), null); |
| valueObjectInspector = inputValueSerDe.getObjectInspector(); |
| |
| ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>(); |
| |
| if(vectorized) { |
| /* vectorization only works with struct object inspectors */ |
| valueStructInspectors = (StructObjectInspector) valueObjectInspector; |
| |
| final int totalColumns = firstValueColumnOffset + |
| valueStructInspectors.getAllStructFieldRefs().size(); |
| |
| rowObjectInspector = Utilities.constructVectorizedReduceRowOI(keyStructInspector, |
| valueStructInspectors); |
| this.batchContext = batchContext; |
| batch = batchContext.createVectorizedRowBatch(); |
| |
| // Setup vectorized deserialization for the key and value. |
| BinarySortableSerDe binarySortableSerDe = (BinarySortableSerDe) inputKeySerDe; |
| |
| keyBinarySortableDeserializeToRow = |
| new VectorDeserializeRow<BinarySortableDeserializeRead>( |
| new BinarySortableDeserializeRead( |
| VectorizedBatchUtil.typeInfosFromStructObjectInspector( |
| keyStructInspector), |
| (batchContext.getRowdataTypePhysicalVariations().length > firstValueColumnOffset) |
| ? Arrays.copyOfRange(batchContext.getRowdataTypePhysicalVariations(), 0, |
| firstValueColumnOffset) |
| : batchContext.getRowdataTypePhysicalVariations(), |
| /* useExternalBuffer */ true, |
| binarySortableSerDe.getSortOrders(), |
| binarySortableSerDe.getNullMarkers(), |
| binarySortableSerDe.getNotNullMarkers())); |
| keyBinarySortableDeserializeToRow.init(0); |
| |
| final int valuesSize = valueStructInspectors.getAllStructFieldRefs().size(); |
| if (valuesSize > 0) { |
| valueLazyBinaryDeserializeToRow = |
| new VectorDeserializeRow<LazyBinaryDeserializeRead>( |
| new LazyBinaryDeserializeRead( |
| VectorizedBatchUtil.typeInfosFromStructObjectInspector( |
| valueStructInspectors), |
| (batchContext.getRowdataTypePhysicalVariations().length >= totalColumns) |
| ? Arrays.copyOfRange(batchContext.getRowdataTypePhysicalVariations(), |
| firstValueColumnOffset, totalColumns) |
| : null, |
| /* useExternalBuffer */ true)); |
| valueLazyBinaryDeserializeToRow.init(firstValueColumnOffset); |
| |
| // Create data buffers for value bytes column vectors. |
| for (int i = firstValueColumnOffset; i < batch.numCols; i++) { |
| ColumnVector colVector = batch.cols[i]; |
| if (colVector instanceof BytesColumnVector) { |
| BytesColumnVector bytesColumnVector = (BytesColumnVector) colVector; |
| bytesColumnVector.initBuffer(); |
| } |
| } |
| } |
| } else { |
| ois.add(keyObjectInspector); |
| ois.add(valueObjectInspector); |
| rowObjectInspector = |
| ObjectInspectorFactory.getStandardStructObjectInspector(Utilities.reduceFieldNameList, |
| ois); |
| } |
| } catch (Throwable e) { |
| abort = true; |
| if (e instanceof OutOfMemoryError) { |
| // Don't create a new object if we are already out of memory |
| throw (OutOfMemoryError) e; |
| } else { |
| throw new RuntimeException("Reduce operator initialization failed", e); |
| } |
| } |
| perfLogger.perfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); |
| } |
| |
| public TableDesc getKeyTableDesc() { |
| return keyTableDesc; |
| } |
| |
| @Override |
| public final boolean isGrouped() { |
| return vectorized; |
| } |
| |
| @Override |
| public boolean pushRecord() throws HiveException { |
| |
| if (vectorized) { |
| return pushRecordVector(); |
| } |
| |
| if (groupIterator.hasNext()) { |
| // if we have records left in the group we push one of those |
| groupIterator.next(); |
| return true; |
| } |
| |
| try { |
| if (!reader.next()) { |
| if (flushLastRecord) { |
| reducer.flushRecursive(); |
| } |
| return false; |
| } |
| |
| BytesWritable keyWritable = (BytesWritable) reader.getCurrentKey(); |
| valueWritables = reader.getCurrentValues(); |
| |
| //Set the key, check if this is a new group or same group |
| try { |
| keyObject = inputKeySerDe.deserialize(keyWritable); |
| } catch (Exception e) { |
| throw new HiveException("Hive Runtime Error: Unable to deserialize reduce input key from " |
| + Utilities.formatBinaryString(keyWritable.getBytes(), 0, keyWritable.getLength()) |
| + " with properties " + keyTableDesc.getProperties(), e); |
| } |
| |
| if (handleGroupKey && !keyWritable.equals(this.groupKey)) { |
| // If a operator wants to do some work at the beginning of a group |
| if (groupKey == null) { // the first group |
| this.groupKey = new BytesWritable(); |
| } else { |
| // If a operator wants to do some work at the end of a group |
| reducer.endGroup(); |
| } |
| |
| groupKey.set(keyWritable.getBytes(), 0, keyWritable.getLength()); |
| reducer.startGroup(); |
| reducer.setGroupKeyObject(keyObject); |
| } |
| |
| groupIterator.initialize(valueWritables, keyObject, tag); |
| if (groupIterator.hasNext()) { |
| groupIterator.next(); // push first record of group |
| } |
| return true; |
| } catch (Throwable e) { |
| abort = true; |
| if (e instanceof OutOfMemoryError) { |
| // Don't create a new object if we are already out of memory |
| throw (OutOfMemoryError) e; |
| } else { |
| l4j.error(StringUtils.stringifyException(e)); |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| private Object deserializeValue(BytesWritable valueWritable, byte tag) |
| throws HiveException { |
| |
| try { |
| return inputValueSerDe.deserialize(valueWritable); |
| } catch (SerDeException e) { |
| throw new HiveException( |
| "Hive Runtime Error: Unable to deserialize reduce input value (tag=" |
| + tag |
| + ") from " |
| + Utilities.formatBinaryString(valueWritable.getBytes(), 0, valueWritable.getLength()) |
| + " with properties " + valueTableDesc.getProperties(), e); |
| } |
| } |
| |
| private class GroupIterator { |
| private final List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size()); |
| private List<Object> passDownKey = null; |
| private Iterator<Object> values; |
| private byte tag; |
| private Object keyObject; |
| |
| public void initialize(Iterable<Object> values, Object keyObject, byte tag) { |
| this.passDownKey = null; |
| this.values = values.iterator(); |
| this.tag = tag; |
| this.keyObject = keyObject; |
| } |
| |
| public boolean hasNext() { |
| return values != null && values.hasNext(); |
| } |
| |
| public void next() throws HiveException { |
| row.clear(); |
| Object value = values.next(); |
| BytesWritable valueWritable = (BytesWritable) value; |
| |
| if (passDownKey == null) { |
| row.add(this.keyObject); |
| } else { |
| row.add(passDownKey.get(0)); |
| } |
| if ((passDownKey == null) && (reducer instanceof CommonMergeJoinOperator) && hasNext()) { |
| passDownKey = |
| (List<Object>) ObjectInspectorUtils.copyToStandardObject(row, |
| reducer.getInputObjInspectors()[tag], ObjectInspectorCopyOption.WRITABLE); |
| row.remove(0); |
| row.add(0, passDownKey.get(0)); |
| } |
| |
| row.add(deserializeValue(valueWritable, tag)); |
| |
| try { |
| reducer.process(row, tag); |
| } catch (Exception e) { |
| String rowString = null; |
| try { |
| rowString = SerDeUtils.getJSONString(row, rowObjectInspector); |
| } catch (Exception e2) { |
| rowString = "[Error getting row data with exception " |
| + StringUtils.stringifyException(e2) + " ]"; |
| } |
| |
| // Log the contents of the row that caused exception so that it's available for debugging. But |
| // when exposed through an error message it can leak sensitive information, even to the |
| // client application. |
| l4j.trace("Hive Runtime Error while processing row (tag=" |
| + tag + ") " + rowString); |
| throw new HiveException("Hive Runtime Error while processing row", e); |
| } |
| } |
| } |
| |
| private boolean pushRecordVector() { |
| try { |
| if (!reader.next()) { |
| return false; |
| } |
| |
| BytesWritable keyWritable = (BytesWritable) reader.getCurrentKey(); |
| valueWritables = reader.getCurrentValues(); |
| |
| processVectorGroup(keyWritable, valueWritables, tag); |
| return true; |
| } catch (Throwable e) { |
| abort = true; |
| if (e instanceof OutOfMemoryError) { |
| // Don't create a new object if we are already out of memory |
| throw (OutOfMemoryError) e; |
| } else { |
| l4j.error(StringUtils.stringifyException(e)); |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| /** |
| * |
| * @param keyWritable |
| * @param values |
| * @param tag |
| * @throws HiveException |
| * @throws IOException |
| */ |
| private void processVectorGroup(BytesWritable keyWritable, |
| Iterable<Object> values, byte tag) throws HiveException, IOException { |
| |
| if (reducer.batchNeedsClone()) { |
| batch = batchContext.createVectorizedRowBatch(); |
| } |
| Preconditions.checkState(batch.size == 0); |
| |
| // Deserialize key into vector row columns. |
| // |
| byte[] keyBytes = keyWritable.getBytes(); |
| int keyLength = keyWritable.getLength(); |
| |
| // l4j.info("ReduceRecordSource processVectorGroup keyBytes " + keyLength + " " + |
| // VectorizedBatchUtil.displayBytes(keyBytes, 0, keyLength)); |
| |
| keyBinarySortableDeserializeToRow.setBytes(keyBytes, 0, keyLength); |
| try { |
| keyBinarySortableDeserializeToRow.deserialize(batch, 0); |
| } catch (Exception e) { |
| throw new HiveException( |
| "\nDeserializeRead details: " + |
| keyBinarySortableDeserializeToRow.getDetailedReadPositionString(), |
| e); |
| } |
| for(int i = 0; i < firstValueColumnOffset; i++) { |
| VectorizedBatchUtil.setRepeatingColumn(batch, i); |
| } |
| |
| final int maxSize = |
| (vectorizedTestingReducerBatchSize > 0 ? |
| Math.min(vectorizedTestingReducerBatchSize, batch.getMaxSize()) : |
| batch.getMaxSize()); |
| Preconditions.checkState(maxSize > 0); |
| int rowIdx = 0; |
| int batchBytes = keyBytes.length; |
| try { |
| for (Object value : values) { |
| if (rowIdx >= maxSize || |
| (rowIdx > 0 && batchBytes >= BATCH_BYTES)) { |
| |
| // Batch is full AND we have at least 1 more row... |
| batch.size = rowIdx; |
| if (handleGroupKey) { |
| reducer.setNextVectorBatchGroupStatus(/* isLastGroupBatch */ false); |
| } |
| reducer.process(batch, tag); |
| |
| // Do the non-column batch reset logic. |
| batch.selectedInUse = false; |
| batch.size = 0; |
| batch.endOfFile = false; |
| |
| // Reset just the value columns and value buffer. |
| for (int i = firstValueColumnOffset; i < batch.numCols; i++) { |
| // Note that reset also resets the data buffer for bytes column vectors. |
| batch.cols[i].reset(); |
| } |
| rowIdx = 0; |
| batchBytes = keyBytes.length; |
| } |
| if (valueLazyBinaryDeserializeToRow != null) { |
| // Deserialize value into vector row columns. |
| BytesWritable valueWritable = (BytesWritable) value; |
| byte[] valueBytes = valueWritable.getBytes(); |
| int valueLength = valueWritable.getLength(); |
| batchBytes += valueLength; |
| |
| valueLazyBinaryDeserializeToRow.setBytes(valueBytes, 0, valueLength); |
| valueLazyBinaryDeserializeToRow.deserialize(batch, rowIdx); |
| } |
| rowIdx++; |
| } |
| if (rowIdx > 0) { |
| // Flush final partial batch. |
| batch.size = rowIdx; |
| if (handleGroupKey) { |
| reducer.setNextVectorBatchGroupStatus(/* isLastGroupBatch */ true); |
| } |
| reducer.process(batch, tag); |
| } |
| // reset only when we reuse the batch |
| if (!reducer.batchNeedsClone()) { |
| batch.reset(); |
| } |
| } catch (Exception e) { |
| String rowString = null; |
| try { |
| rowString = batch.toString(); |
| } catch (Exception e2) { |
| rowString = "[Error getting row data with exception " |
| + StringUtils.stringifyException(e2) + " ]"; |
| } |
| l4j.error("Hive Runtime Error while processing vector batch (tag=" + tag |
| + ") (vectorizedVertexNum " + vectorizedVertexNum + ") " + rowString, e); |
| throw new HiveException("Hive Runtime Error while processing vector batch (tag=" |
| + tag + ") (vectorizedVertexNum " + vectorizedVertexNum + ")", e); |
| } |
| } |
| |
| boolean close() throws Exception { |
| try { |
| if (handleGroupKey && groupKey != null) { |
| // If a operator wants to do some work at the end of a group |
| reducer.endGroup(); |
| } |
| } catch (Exception e) { |
| if (!abort) { |
| // signal new failure to map-reduce |
| throw new RuntimeException("Hive Runtime Error while closing operators: " |
| + e.getMessage(), e); |
| } |
| } |
| return abort; |
| } |
| |
| public ObjectInspector getObjectInspector() { |
| return rowObjectInspector; |
| } |
| |
| @Override |
| public void setFlushLastRecord(boolean flushLastRecord) { |
| this.flushLastRecord = flushLastRecord; |
| } |
| } |