blob: 255afe9e3e600107a1b0e4267c76f4322261dae5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.index;
import org.apache.drill.common.parser.LogicalExpressionParser;
import com.mapr.db.Admin;
import com.mapr.db.MapRDB;
import com.mapr.db.exceptions.DBException;
import com.mapr.db.index.IndexDesc;
import com.mapr.db.index.IndexDesc.MissingAndNullOrdering;
import com.mapr.db.index.IndexFieldDesc;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelFieldCollation.NullDirection;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.common.util.DrillFileUtils;
import org.apache.drill.exec.physical.base.AbstractDbGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.common.DrillScanRelBase;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.physical.ScanPrel;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.mapr.db.MapRDBFormatMatcher;
import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
import org.apache.drill.exec.store.mapr.db.MapRDBGroupScan;
import org.apache.drill.exec.store.mapr.db.json.FieldPathHelper;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.ojai.FieldPath;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.collect.Maps;
public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDiscover {
public static final String DEFAULT_STRING_CAST_LEN_STR = "256";
public MapRDBIndexDiscover(GroupScan inScan, DrillScanRelBase scanRel) {
super((AbstractDbGroupScan) inScan, scanRel);
}
public MapRDBIndexDiscover(GroupScan inScan, ScanPrel scanRel) {
super((AbstractDbGroupScan) inScan, scanRel);
}
@Override
public IndexCollection getTableIndex(String tableName) {
return getTableIndexFromMFS(tableName);
}
/**
* For a given table name get the list of indexes defined on the table according to the visibility of
* the indexes based on permissions.
* @param tableName table name
* @return an IndexCollection representing the list of indexes for that table
*/
private IndexCollection getTableIndexFromMFS(String tableName) {
try {
Set<DrillIndexDescriptor> idxSet = new HashSet<>();
Collection<IndexDesc> indexes = admin().getTableIndexes(new Path(tableName));
if (indexes.size() == 0 ) {
logger.error("No index returned from Admin.getTableIndexes for table {}", tableName);
return null;
}
for (IndexDesc idx : indexes) {
DrillIndexDescriptor hbaseIdx = buildIndexDescriptor(tableName, idx);
if (hbaseIdx == null) {
// not able to build a valid index based on the index info from MFS
logger.error("Not able to build index for {}", idx.toString());
continue;
}
idxSet.add(hbaseIdx);
}
if (idxSet.size() == 0) {
logger.error("No index found for table {}.", tableName);
return null;
}
return new DrillIndexCollection(getOriginalScanRel(), idxSet);
} catch (DBException ex) {
logger.error("Could not get table index from File system.", ex);
}
catch(InvalidIndexDefinitionException ex) {
logger.error("Invalid index definition detected.", ex);
}
return null;
}
FileSelection deriveFSSelection(DrillFileSystem fs, IndexDescriptor idxDesc) throws IOException {
String tableName = idxDesc.getTableName();
String[] tablePath = tableName.split(DrillFileUtils.SEPARATOR);
String tableParent = tableName.substring(0, tableName.lastIndexOf(DrillFileUtils.SEPARATOR));
return FileSelection.create(fs, tableParent, tablePath[tablePath.length - 1], false);
}
@Override
public DrillTable getNativeDrillTable(IndexDescriptor idxDescriptor) {
try {
final AbstractDbGroupScan origScan = getOriginalScan();
if (!(origScan instanceof MapRDBGroupScan)) {
return null;
}
MapRDBFormatPlugin maprFormatPlugin = ((MapRDBGroupScan) origScan).getFormatPlugin();
FileSystemPlugin fsPlugin = (FileSystemPlugin) (origScan.getStoragePlugin());
DrillFileSystem fs = ImpersonationUtil.createFileSystem(origScan.getUserName(), fsPlugin.getFsConf());
MapRDBFormatMatcher matcher = (MapRDBFormatMatcher) (maprFormatPlugin.getMatcher());
FileSelection fsSelection = deriveFSSelection(fs, idxDescriptor);
return matcher.isReadableIndex(fs, fsSelection, fsPlugin, fsPlugin.getName(),
origScan.getUserName(), idxDescriptor);
} catch (Exception e) {
logger.error("Failed to get native DrillTable.", e);
}
return null;
}
private SchemaPath fieldName2SchemaPath(String fieldName) {
if (fieldName.contains(":")) {
fieldName = fieldName.split(":")[1];
}
if (fieldName.contains(".")) {
return FieldPathHelper.fieldPath2SchemaPath(FieldPath.parseFrom(fieldName));
}
return SchemaPath.getSimplePath(fieldName);
}
String getDrillTypeStr(String maprdbTypeStr) {
String typeStr = maprdbTypeStr.toUpperCase();
String[] typeTokens = typeStr.split("[)(]");
String typeData = DEFAULT_STRING_CAST_LEN_STR;
if(typeTokens.length > 1) {
typeStr = typeTokens[0];
typeData = typeTokens[1];
}
switch(typeStr){
case "STRING":
// set default width since it is not specified
return "VARCHAR("+typeData+")";
case "LONG":
return "BIGINT";
case "INT":
case "INTEGER":
return "INT";
case "FLOAT":
return "FLOAT4";
case "DOUBLE":
return "FLOAT8";
case "INTERVAL_YEAR_MONTH":
return "INTERVALYEAR";
case "INTERVAL_DAY_TIME":
return "INTERVALDAY";
case "BOOLEAN":
return "BIT";
case "BINARY":
return "VARBINARY";
case "ANY":
case "DECIMAL":
return null;
default: return typeStr;
}
}
TypeProtos.MajorType getDrillType(String typeStr) {
switch(typeStr){
case "VARCHAR":
case "CHAR":
case "STRING":
// set default width since it is not specified
return
Types.required(TypeProtos.MinorType.VARCHAR).toBuilder().setWidth(
getOriginalScanRel().getCluster().getTypeFactory().createSqlType(SqlTypeName.VARCHAR).getPrecision()).build();
case "LONG":
case "BIGINT":
return Types.required(TypeProtos.MinorType.BIGINT);
case "INT":
case "INTEGER":
return Types.required(TypeProtos.MinorType.INT);
case "FLOAT":
return Types.required(TypeProtos.MinorType.FLOAT4);
case "DOUBLE":
return Types.required(TypeProtos.MinorType.FLOAT8);
case "INTERVAL_YEAR_MONTH":
return Types.required(TypeProtos.MinorType.INTERVALYEAR);
case "INTERVAL_DAY_TIME":
return Types.required(TypeProtos.MinorType.INTERVALDAY);
case "BOOLEAN":
return Types.required(TypeProtos.MinorType.BIT);
case "BINARY":
return Types.required(TypeProtos.MinorType.VARBINARY).toBuilder().build();
case "ANY":
case "DECIMAL":
return null;
default: return Types.required(TypeProtos.MinorType.valueOf(typeStr));
}
}
private LogicalExpression castFunctionSQLSyntax(String field, String type) throws InvalidIndexDefinitionException {
// get castTypeStr so we can construct SQL syntax string before MapRDB could provide such syntax
String castTypeStr = getDrillTypeStr(type);
if(castTypeStr == null) { // no cast
throw new InvalidIndexDefinitionException("cast function type not recognized: " + type + "for field " + field);
}
try {
String castFunc = String.format("cast( %s as %s)", field, castTypeStr);
return LogicalExpressionParser.parse(castFunc);
} catch (Exception ex) {
logger.error("parse failed: {}", ex);
}
return null;
}
private LogicalExpression getIndexExpression(IndexFieldDesc desc) throws InvalidIndexDefinitionException {
final String fieldName = desc.getFieldPath().asPathString();
final String functionDef = desc.getFunctionName();
if ((functionDef != null)) { // this is a function
String[] tokens = functionDef.split("\\s+");
if (tokens[0].equalsIgnoreCase("cast")) {
if (tokens.length != 3) {
throw new InvalidIndexDefinitionException("cast function definition not recognized: " + functionDef);
}
LogicalExpression idxExpr = castFunctionSQLSyntax(fieldName, tokens[2]);
if (idxExpr == null) {
throw new InvalidIndexDefinitionException("got null expression for function definition: " + functionDef);
}
return idxExpr;
} else {
throw new InvalidIndexDefinitionException("function definition is not supported for indexing: " + functionDef);
}
}
// else it is a schemaPath
return fieldName2SchemaPath(fieldName);
}
private List<LogicalExpression> field2SchemaPath(Collection<IndexFieldDesc> descCollection)
throws InvalidIndexDefinitionException {
List<LogicalExpression> listSchema = new ArrayList<>();
for (IndexFieldDesc field : descCollection) {
listSchema.add(getIndexExpression(field));
}
return listSchema;
}
private List<RelFieldCollation> getFieldCollations(IndexDesc desc, Collection<IndexFieldDesc> descCollection) {
List<RelFieldCollation> fieldCollations = new ArrayList<>();
int i = 0;
for (IndexFieldDesc field : descCollection) {
RelFieldCollation.Direction direction = (field.getSortOrder() == IndexFieldDesc.Order.Asc) ?
RelFieldCollation.Direction.ASCENDING : (field.getSortOrder() == IndexFieldDesc.Order.Desc ?
RelFieldCollation.Direction.DESCENDING : null);
if (direction != null) {
// assume null direction of NULLS UNSPECIFIED for now until MapR-DB adds that to the APIs
RelFieldCollation.NullDirection nulldir =
direction == RelFieldCollation.Direction.ASCENDING ? NullDirection.LAST :
(direction == RelFieldCollation.Direction.DESCENDING ?
NullDirection.FIRST : NullDirection.UNSPECIFIED);
RelFieldCollation c = new RelFieldCollation(i++, direction, nulldir);
fieldCollations.add(c);
} else {
// if the direction is not present for a field, no need to examine remaining fields
break;
}
}
return fieldCollations;
}
private CollationContext buildCollationContext(List<LogicalExpression> indexFields,
List<RelFieldCollation> indexFieldCollations) {
assert indexFieldCollations.size() <= indexFields.size();
Map<LogicalExpression, RelFieldCollation> collationMap = Maps.newHashMap();
for (int i = 0; i < indexFieldCollations.size(); i++) {
collationMap.put(indexFields.get(i), indexFieldCollations.get(i));
}
return new CollationContext(collationMap, indexFieldCollations);
}
private DrillIndexDescriptor buildIndexDescriptor(String tableName, IndexDesc desc)
throws InvalidIndexDefinitionException {
if (desc.isExternal()) {
// External index is not currently supported
return null;
}
IndexDescriptor.IndexType idxType = IndexDescriptor.IndexType.NATIVE_SECONDARY_INDEX;
List<LogicalExpression> indexFields = field2SchemaPath(desc.getIndexedFields());
List<LogicalExpression> coveringFields = field2SchemaPath(desc.getIncludedFields());
coveringFields.add(SchemaPath.getSimplePath("_id"));
CollationContext collationContext = null;
if (!desc.isHashed()) { // hash index has no collation property
List<RelFieldCollation> indexFieldCollations = getFieldCollations(desc, desc.getIndexedFields());
collationContext = buildCollationContext(indexFields, indexFieldCollations);
}
DrillIndexDescriptor idx = new MapRDBIndexDescriptor (
indexFields,
collationContext,
coveringFields,
null,
desc.getIndexName(),
tableName,
idxType,
desc,
this.getOriginalScan(),
desc.getMissingAndNullOrdering() == MissingAndNullOrdering.MissingAndNullFirst ? NullDirection.FIRST :
(desc.getMissingAndNullOrdering() == MissingAndNullOrdering.MissingAndNullLast ?
NullDirection.LAST : NullDirection.UNSPECIFIED));
String storageName = this.getOriginalScan().getStoragePlugin().getName();
materializeIndex(storageName, idx);
return idx;
}
@SuppressWarnings("deprecation")
private Admin admin() {
assert getOriginalScan() instanceof MapRDBGroupScan;
final MapRDBGroupScan dbGroupScan = (MapRDBGroupScan) getOriginalScan();
final UserGroupInformation currentUser = ImpersonationUtil.createProxyUgi(dbGroupScan.getUserName());
final Configuration conf = dbGroupScan.getFormatPlugin().getFsConf();
final Admin admin;
try {
admin = currentUser.doAs((PrivilegedExceptionAction<Admin>) () -> MapRDB.getAdmin(conf));
} catch (Exception e) {
throw new DrillRuntimeException("Failed to get Admin instance for user: " + currentUser.getUserName(), e);
}
return admin;
}
}