blob: a27266f4ed709be46ab9cb9c624ed916618ecec7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.llap.io.api.impl;
import org.apache.hadoop.hive.ql.io.BatchToRowInputFormat;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool;
import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.AvoidSplitCombination;
import org.apache.hadoop.hive.ql.io.LlapAwareSplit;
import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hive.common.util.HiveStringUtils;
import org.apache.orc.TypeDescription;
import org.apache.orc.impl.SchemaEvolution;
import org.apache.tez.common.counters.TezCounters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowBatch>,
VectorizedInputFormatInterface, SelfDescribingInputFormatInterface,
AvoidSplitCombination {
private static final String NONVECTOR_SETTING_MESSAGE = "disable "
+ ConfVars.LLAP_IO_NONVECTOR_WRAPPER_ENABLED.varname + " to work around this error";
@SuppressWarnings("rawtypes")
private final InputFormat sourceInputFormat;
private final AvoidSplitCombination sourceASC;
private final ColumnVectorProducer cvp;
private final ExecutorService executor;
private final String hostName;
@SuppressWarnings("rawtypes")
LlapInputFormat(InputFormat sourceInputFormat, ColumnVectorProducer cvp,
ExecutorService executor) {
// TODO: right now, we do nothing with source input format, ORC-only in the first cut.
// We'd need to plumb it thru and use it to get data to cache/etc.
assert sourceInputFormat instanceof OrcInputFormat;
this.executor = executor;
this.cvp = cvp;
this.sourceInputFormat = sourceInputFormat;
this.sourceASC = (sourceInputFormat instanceof AvoidSplitCombination)
? (AvoidSplitCombination)sourceInputFormat : null;
this.hostName = HiveStringUtils.getHostname();
}
@Override
public RecordReader<NullWritable, VectorizedRowBatch> getRecordReader(
InputSplit split, JobConf job, Reporter reporter) throws IOException {
boolean useLlapIo = true;
if (split instanceof LlapAwareSplit) {
useLlapIo = ((LlapAwareSplit) split).canUseLlapIo();
}
boolean isVectorized = Utilities.getUseVectorizedInputFileFormat(job);
// validate for supported types. Until we fix HIVE-14089 we need this check.
if (useLlapIo) {
useLlapIo = Utilities.checkLlapIOSupportedTypes(job);
}
if (!useLlapIo) {
LlapIoImpl.LOG.warn("Not using LLAP IO for an unsupported split: " + split);
return sourceInputFormat.getRecordReader(split, job, reporter);
}
FileSplit fileSplit = (FileSplit) split;
reporter.setStatus(fileSplit.toString());
try {
List<Integer> includedCols = ColumnProjectionUtils.isReadAllColumns(job)
? null : ColumnProjectionUtils.getReadColumnIDs(job);
LlapRecordReader rr = new LlapRecordReader(job, fileSplit, includedCols, hostName);
if (!rr.init()) {
return sourceInputFormat.getRecordReader(split, job, reporter);
}
// vectorized row batch reader
if (isVectorized) {
return rr;
}
// row batch to row-by-row reader
if (sourceInputFormat instanceof BatchToRowInputFormat) {
return bogusCast(((BatchToRowInputFormat) sourceInputFormat).getWrapper(
rr, rr.getVectorizedRowBatchCtx(), includedCols));
}
return sourceInputFormat.getRecordReader(split, job, reporter);
} catch (Exception ex) {
throw new IOException(ex);
}
}
// Returning either a vectorized or non-vectorized reader from the same call requires breaking
// generics... this is how vectorization currently works.
@SuppressWarnings("unchecked")
private static <A, B, C, D> RecordReader<A, B> bogusCast(RecordReader<C, D> rr) {
return (RecordReader<A, B>)rr;
}
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
return sourceInputFormat.getSplits(job, numSplits);
}
private class LlapRecordReader
implements RecordReader<NullWritable, VectorizedRowBatch>, Consumer<ColumnVectorBatch> {
private final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class);
private final FileSplit split;
private final List<Integer> columnIds;
private final SearchArgument sarg;
private final String[] columnNames;
private final VectorizedRowBatchCtx rbCtx;
private final Object[] partitionValues;
private final LinkedList<ColumnVectorBatch> pendingData = new LinkedList<ColumnVectorBatch>();
private ColumnVectorBatch lastCvb = null;
private boolean isFirst = true;
private Throwable pendingError = null;
/** Vector that is currently being processed by our user. */
private boolean isDone = false;
private final boolean isClosed = false;
private final ConsumerFeedback<ColumnVectorBatch> feedback;
private final QueryFragmentCounters counters;
private long firstReturnTime;
private final JobConf jobConf;
private final TypeDescription fileSchema;
private final boolean[] includedColumns;
private final ReadPipeline rp;
public LlapRecordReader(JobConf job, FileSplit split, List<Integer> includedCols,
String hostName) throws IOException, HiveException {
this.jobConf = job;
this.split = split;
this.columnIds = includedCols;
this.sarg = ConvertAstToSearchArg.createFromConf(job);
this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
final String fragmentId = LlapTezUtils.getFragmentId(job);
final String dagId = LlapTezUtils.getDagId(job);
final String queryId = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID);
MDC.put("dagId", dagId);
MDC.put("queryId", queryId);
TezCounters taskCounters = null;
if (fragmentId != null) {
MDC.put("fragmentId", fragmentId);
taskCounters = FragmentCountersMap.getCountersForFragment(fragmentId);
LOG.info("Received fragment id: {}", fragmentId);
} else {
LOG.warn("Not using tez counters as fragment id string is null");
}
this.counters = new QueryFragmentCounters(job, taskCounters);
this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName);
MapWork mapWork = Utilities.getMapWork(job);
VectorizedRowBatchCtx ctx = mapWork.getVectorizedRowBatchCtx();
rbCtx = ctx != null ? ctx : createFakeVrbCtx(mapWork);
int partitionColumnCount = rbCtx.getPartitionColumnCount();
if (partitionColumnCount > 0) {
partitionValues = new Object[partitionColumnCount];
VectorizedRowBatchCtx.getPartitionValues(rbCtx, job, split, partitionValues);
} else {
partitionValues = null;
}
// Create the consumer of encoded data; it will coordinate decoding to CVBs.
rp = cvp.createReadPipeline(this, split, columnIds, sarg, columnNames, counters);
feedback = rp;
fileSchema = rp.getFileSchema();
includedColumns = rp.getIncludedColumns();
}
/**
* Starts the data read pipeline
*/
public boolean init() {
boolean isAcidScan = HiveConf.getBoolVar(jobConf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
TypeDescription readerSchema = OrcInputFormat.getDesiredRowTypeDescr(jobConf, isAcidScan,
Integer.MAX_VALUE);
SchemaEvolution schemaEvolution = new SchemaEvolution(fileSchema, readerSchema,
includedColumns);
for (Integer colId : columnIds) {
if (!schemaEvolution.isPPDSafeConversion(colId)) {
LlapIoImpl.LOG.warn("Unsupported schema evolution! Disabling Llap IO for {}", split);
return false;
}
}
// perform the data read asynchronously
if (executor instanceof StatsRecordingThreadPool) {
// Every thread created by this thread pool will use the same handler
((StatsRecordingThreadPool) executor)
.setUncaughtExceptionHandler(new IOUncaughtExceptionHandler());
}
executor.submit(rp.getReadCallable());
return true;
}
@Override
public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
assert value != null;
if (isClosed) {
throw new AssertionError("next called after close");
}
// Add partition cols if necessary (see VectorizedOrcInputFormat for details).
boolean wasFirst = isFirst;
if (isFirst) {
if (partitionValues != null) {
rbCtx.addPartitionColsToBatch(value, partitionValues);
}
isFirst = false;
}
ColumnVectorBatch cvb = null;
try {
cvb = nextCvb();
} catch (InterruptedException e) {
// Query might have been canceled. Stop the background processing.
feedback.stop();
throw new IOException(e);
}
if (cvb == null) {
if (wasFirst) {
firstReturnTime = counters.startTimeCounter();
}
counters.incrTimeCounter(LlapIOCounters.CONSUMER_TIME_NS, firstReturnTime);
return false;
}
if (columnIds.size() != cvb.cols.length) {
throw new RuntimeException("Unexpected number of columns, VRB has " + columnIds.size()
+ " included, but the reader returned " + cvb.cols.length);
}
// VRB was created from VrbCtx, so we already have pre-allocated column vectors
for (int i = 0; i < cvb.cols.length; ++i) {
// Return old CVs (if any) to caller. We assume these things all have the same schema.
cvb.swapColumnVector(i, value.cols, columnIds.get(i));
}
value.selectedInUse = false;
value.size = cvb.size;
if (wasFirst) {
firstReturnTime = counters.startTimeCounter();
}
return true;
}
public VectorizedRowBatchCtx getVectorizedRowBatchCtx() {
return rbCtx;
}
private final class IOUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
LlapIoImpl.LOG.error("Unhandled error from reader thread. threadName: {} threadId: {}" +
" Message: {}", t.getName(), t.getId(), e.getMessage());
setError(e);
}
}
ColumnVectorBatch nextCvb() throws InterruptedException, IOException {
boolean isFirst = (lastCvb == null);
if (!isFirst) {
feedback.returnData(lastCvb);
}
synchronized (pendingData) {
// We are waiting for next block. Either we will get it, or be told we are done.
boolean doLogBlocking = LlapIoImpl.LOG.isTraceEnabled() && isNothingToReport();
if (doLogBlocking) {
LlapIoImpl.LOG.trace("next will block");
}
while (isNothingToReport()) {
pendingData.wait(100);
}
if (doLogBlocking) {
LlapIoImpl.LOG.trace("next is unblocked");
}
rethrowErrorIfAny();
lastCvb = pendingData.poll();
}
if (LlapIoImpl.LOG.isTraceEnabled() && lastCvb != null) {
LlapIoImpl.LOG.trace("Processing will receive vector {}", lastCvb);
}
return lastCvb;
}
private boolean isNothingToReport() {
return !isDone && pendingData.isEmpty() && pendingError == null;
}
@Override
public NullWritable createKey() {
return NullWritable.get();
}
@Override
public VectorizedRowBatch createValue() {
return rbCtx.createVectorizedRowBatch();
}
@Override
public long getPos() throws IOException {
return -1; // Position doesn't make sense for async reader, chunk order is arbitrary.
}
@Override
public void close() throws IOException {
if (LlapIoImpl.LOG.isTraceEnabled()) {
LlapIoImpl.LOG.trace("close called; closed {}, done {}, err {}, pending {}",
isClosed, isDone, pendingError, pendingData.size());
}
LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged!
feedback.stop();
rethrowErrorIfAny();
MDC.clear();
}
private void rethrowErrorIfAny() throws IOException {
if (pendingError == null) return;
if (pendingError instanceof IOException) {
throw (IOException)pendingError;
}
throw new IOException(pendingError);
}
@Override
public void setDone() {
if (LlapIoImpl.LOG.isTraceEnabled()) {
LlapIoImpl.LOG.trace("setDone called; closed {}, done {}, err {}, pending {}",
isClosed, isDone, pendingError, pendingData.size());
}
synchronized (pendingData) {
isDone = true;
pendingData.notifyAll();
}
}
@Override
public void consumeData(ColumnVectorBatch data) {
if (LlapIoImpl.LOG.isTraceEnabled()) {
LlapIoImpl.LOG.trace("consume called; closed {}, done {}, err {}, pending {}",
isClosed, isDone, pendingError, pendingData.size());
}
synchronized (pendingData) {
if (isClosed) {
return;
}
pendingData.add(data);
pendingData.notifyAll();
}
}
@Override
public void setError(Throwable t) {
counters.incrCounter(LlapIOCounters.NUM_ERRORS);
LlapIoImpl.LOG.info("setError called; closed {}, done {}, err {}, pending {}",
isClosed, isDone, pendingError, pendingData.size());
assert t != null;
synchronized (pendingData) {
pendingError = t;
pendingData.notifyAll();
}
}
@Override
public float getProgress() throws IOException {
// TODO: plumb progress info thru the reader if we can get metadata from loader first.
return 0.0f;
}
}
@Override
public boolean shouldSkipCombine(Path path, Configuration conf) throws IOException {
return sourceASC == null ? false : sourceASC.shouldSkipCombine(path, conf);
}
private static VectorizedRowBatchCtx createFakeVrbCtx(MapWork mapWork) throws HiveException {
// This is based on Vectorizer code, minus the validation.
// Add all non-virtual columns from the TableScan operator.
RowSchema rowSchema = findTsOp(mapWork).getSchema();
final List<String> colNames = new ArrayList<String>(rowSchema.getSignature().size());
final List<TypeInfo> colTypes = new ArrayList<TypeInfo>(rowSchema.getSignature().size());
for (ColumnInfo c : rowSchema.getSignature()) {
String columnName = c.getInternalName();
if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(columnName)) continue;
colNames.add(columnName);
colTypes.add(TypeInfoUtils.getTypeInfoFromTypeString(c.getTypeName()));
}
// Determine the partition columns using the first partition descriptor.
// Note - like vectorizer, this assumes partition columns go after data columns.
int partitionColumnCount = 0;
Iterator<Path> paths = mapWork.getPathToAliases().keySet().iterator();
if (paths.hasNext()) {
PartitionDesc partDesc = mapWork.getPathToPartitionInfo().get(paths.next());
if (partDesc != null) {
LinkedHashMap<String, String> partSpec = partDesc.getPartSpec();
if (partSpec != null && partSpec.isEmpty()) {
partitionColumnCount = partSpec.size();
}
}
}
return new VectorizedRowBatchCtx(colNames.toArray(new String[colNames.size()]),
colTypes.toArray(new TypeInfo[colTypes.size()]), null, partitionColumnCount, new String[0]);
}
static TableScanOperator findTsOp(MapWork mapWork) throws HiveException {
if (mapWork.getAliasToWork() == null) {
throw new HiveException("Unexpected - aliasToWork is missing; " + NONVECTOR_SETTING_MESSAGE);
}
Iterator<Operator<?>> ops = mapWork.getAliasToWork().values().iterator();
TableScanOperator tableScanOperator = null;
while (ops.hasNext()) {
Operator<?> op = ops.next();
if (op instanceof TableScanOperator) {
if (tableScanOperator != null) {
throw new HiveException("Unexpected - more than one TSOP; " + NONVECTOR_SETTING_MESSAGE);
}
tableScanOperator = (TableScanOperator)op;
}
}
return tableScanOperator;
}
}