blob: a573f07cc125503a3e633b6ad9c3314fcfc3d3e3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.kudu;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.kudu.KuduSubScan.KuduSubScanSpec;
import org.apache.drill.exec.vector.BigIntVector;
import org.apache.drill.exec.vector.BitVector;
import org.apache.drill.exec.vector.Float4Vector;
import org.apache.drill.exec.vector.Float8Vector;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.NullableBigIntVector;
import org.apache.drill.exec.vector.NullableBitVector;
import org.apache.drill.exec.vector.NullableFloat4Vector;
import org.apache.drill.exec.vector.NullableFloat8Vector;
import org.apache.drill.exec.vector.NullableIntVector;
import org.apache.drill.exec.vector.NullableTimeStampVector;
import org.apache.drill.exec.vector.NullableVarBinaryVector;
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.TimeStampVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VarBinaryVector;
import org.apache.drill.exec.vector.VarCharVector;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduScanner.KuduScannerBuilder;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.client.RowResultIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
public class KuduRecordReader extends AbstractRecordReader {
private static final Logger logger = LoggerFactory.getLogger(KuduRecordReader.class);
private static final int TARGET_RECORD_COUNT = 4000;
private final KuduClient client;
private final KuduSubScanSpec scanSpec;
private KuduScanner scanner;
private RowResultIterator iterator;
private OutputMutator output;
private OperatorContext context;
private String lastColumnName;
private Type lastColumnType;
private static class ProjectedColumnInfo {
int index;
ValueVector vv;
ColumnSchema kuduColumn;
}
private ImmutableList<ProjectedColumnInfo> projectedCols;
public KuduRecordReader(KuduClient client, KuduSubScan.KuduSubScanSpec subScanSpec, List<SchemaPath> projectedColumns) {
setColumns(projectedColumns);
this.client = client;
scanSpec = subScanSpec;
logger.debug("Scan spec: {}", subScanSpec);
}
@Override
public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
this.output = output;
this.context = context;
try {
KuduTable table = client.openTable(scanSpec.getTableName());
KuduScannerBuilder builder = client.newScannerBuilder(table);
if (!isStarQuery()) {
List<String> colNames = Lists.newArrayList();
for (SchemaPath p : this.getColumns()) {
colNames.add(p.getRootSegmentPath());
}
builder.setProjectedColumnNames(colNames);
}
context.getStats().startWait();
try {
scanner = builder
.lowerBoundRaw(scanSpec.getStartKey())
.exclusiveUpperBoundRaw(scanSpec.getEndKey())
.build();
} finally {
context.getStats().stopWait();
}
} catch (Exception e) {
throw new ExecutionSetupException(e);
}
}
static final Map<Type, MinorType> TYPES;
static {
TYPES = ImmutableMap.<Type, MinorType> builder()
.put(Type.BINARY, MinorType.VARBINARY)
.put(Type.BOOL, MinorType.BIT)
.put(Type.DOUBLE, MinorType.FLOAT8)
.put(Type.FLOAT, MinorType.FLOAT4)
.put(Type.INT8, MinorType.INT)
.put(Type.INT16, MinorType.INT)
.put(Type.INT32, MinorType.INT)
.put(Type.INT64, MinorType.BIGINT)
.put(Type.STRING, MinorType.VARCHAR)
.put(Type.UNIXTIME_MICROS, MinorType.TIMESTAMP)
.build();
}
@Override
public int next() {
int rowCount = 0;
try {
while (iterator == null || !iterator.hasNext()) {
if (!scanner.hasMoreRows()) {
iterator = null;
return 0;
}
context.getStats().startWait();
try {
iterator = scanner.nextRows();
} finally {
context.getStats().stopWait();
}
}
for (; rowCount < TARGET_RECORD_COUNT && iterator.hasNext(); rowCount++) {
addRowResult(iterator.next(), rowCount);
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
for (ProjectedColumnInfo pci : projectedCols) {
pci.vv.getMutator().setValueCount(rowCount);
}
return rowCount;
}
private void initCols(Schema schema) throws SchemaChangeException {
ImmutableList.Builder<ProjectedColumnInfo> pciBuilder = ImmutableList.builder();
for (int i = 0; i < schema.getColumnCount(); i++) {
ColumnSchema col = schema.getColumnByIndex(i);
final String name = col.getName();
final Type kuduType = col.getType();
lastColumnName = name;
lastColumnType = kuduType;
MinorType minorType = TYPES.get(kuduType);
if (minorType == null) {
logger.warn("Ignoring column that is unsupported.", UserException
.unsupportedError()
.message(
"A column you queried has a data type that is not currently supported by the Kudu storage plugin. "
+ "The column's name was %s and its Kudu data type was %s. ",
name, kuduType.toString())
.addContext("column Name", name)
.addContext("plugin", "kudu")
.build(logger));
continue;
}
MajorType majorType;
if (col.isNullable()) {
majorType = Types.optional(minorType);
} else {
majorType = Types.required(minorType);
}
MaterializedField field = MaterializedField.create(name, majorType);
final Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass(
minorType, majorType.getMode());
ValueVector vector = output.addField(field, clazz);
vector.allocateNew();
ProjectedColumnInfo pci = new ProjectedColumnInfo();
pci.vv = vector;
pci.kuduColumn = col;
pci.index = i;
pciBuilder.add(pci);
}
projectedCols = pciBuilder.build();
}
private void addRowResult(RowResult result, int rowIndex) throws SchemaChangeException {
if (projectedCols == null) {
initCols(result.getColumnProjection());
}
for (ProjectedColumnInfo pci : projectedCols) {
if (result.isNull(pci.index)) {
continue;
}
switch (pci.kuduColumn.getType()) {
case BINARY: {
ByteBuffer value = result.getBinary(pci.index);
if (pci.kuduColumn.isNullable()) {
((NullableVarBinaryVector.Mutator) pci.vv.getMutator())
.setSafe(rowIndex, value, 0, value.remaining());
} else {
((VarBinaryVector.Mutator) pci.vv.getMutator())
.setSafe(rowIndex, value, 0, value.remaining());
}
break;
}
case STRING: {
ByteBuffer value = ByteBuffer.wrap(result.getString(pci.index).getBytes(StandardCharsets.UTF_8));
if (pci.kuduColumn.isNullable()) {
((NullableVarCharVector.Mutator) pci.vv.getMutator())
.setSafe(rowIndex, value, 0, value.remaining());
} else {
((VarCharVector.Mutator) pci.vv.getMutator())
.setSafe(rowIndex, value, 0, value.remaining());
}
break;
}
case BOOL:
if (pci.kuduColumn.isNullable()) {
((NullableBitVector.Mutator) pci.vv.getMutator())
.setSafe(rowIndex, result.getBoolean(pci.index) ? 1 : 0);
} else {
((BitVector.Mutator) pci.vv.getMutator())
.setSafe(rowIndex, result.getBoolean(pci.index) ? 1 : 0);
}
break;
case DOUBLE:
if (pci.kuduColumn.isNullable()) {
((NullableFloat8Vector.Mutator) pci.vv.getMutator())
.setSafe(rowIndex, result.getDouble(pci.index));
} else {
((Float8Vector.Mutator) pci.vv.getMutator())
.setSafe(rowIndex, result.getDouble(pci.index));
}
break;
case FLOAT:
if (pci.kuduColumn.isNullable()) {
((NullableFloat4Vector.Mutator) pci.vv.getMutator())
.setSafe(rowIndex, result.getFloat(pci.index));
} else {
((Float4Vector.Mutator) pci.vv.getMutator())
.setSafe(rowIndex, result.getFloat(pci.index));
}
break;
case INT16:
if (pci.kuduColumn.isNullable()) {
((NullableIntVector.Mutator) pci.vv.getMutator())
.setSafe(rowIndex, result.getShort(pci.index));
} else {
((IntVector.Mutator) pci.vv.getMutator())
.setSafe(rowIndex, result.getShort(pci.index));
}
break;
case INT32:
if (pci.kuduColumn.isNullable()) {
((NullableIntVector.Mutator) pci.vv.getMutator())
.setSafe(rowIndex, result.getInt(pci.index));
} else {
((IntVector.Mutator) pci.vv.getMutator())
.setSafe(rowIndex, result.getInt(pci.index));
}
break;
case INT8:
if (pci.kuduColumn.isNullable()) {
((NullableIntVector.Mutator) pci.vv.getMutator())
.setSafe(rowIndex, result.getByte(pci.index));
} else {
((IntVector.Mutator) pci.vv.getMutator())
.setSafe(rowIndex, result.getByte(pci.index));
}
break;
case INT64:
if (pci.kuduColumn.isNullable()) {
((NullableBigIntVector.Mutator) pci.vv.getMutator())
.setSafe(rowIndex, result.getLong(pci.index));
} else {
((BigIntVector.Mutator) pci.vv.getMutator())
.setSafe(rowIndex, result.getLong(pci.index));
}
break;
case UNIXTIME_MICROS:
if (pci.kuduColumn.isNullable()) {
((NullableTimeStampVector.Mutator) pci.vv.getMutator())
.setSafe(rowIndex, result.getLong(pci.index) / 1000);
} else {
((TimeStampVector.Mutator) pci.vv.getMutator())
.setSafe(rowIndex, result.getLong(pci.index) / 1000);
}
break;
default:
throw new SchemaChangeException("unknown type"); // TODO make better
}
}
}
@Override
public void close() {
}
@Override
public String toString() {
return "KuduRecordReader[Column=" + lastColumnName
+ ", Type=" + lastColumnType
+ "]";
}
}