| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hbase.spark; |
| |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.exceptions.DeserializationException; |
| import org.apache.hadoop.hbase.filter.FilterBase; |
| import org.apache.hadoop.hbase.spark.datasources.BytesEncoder; |
| import org.apache.hadoop.hbase.spark.datasources.Field; |
| import org.apache.hadoop.hbase.spark.datasources.JavaBytesEncoder; |
| import org.apache.hadoop.hbase.spark.protobuf.generated.SparkFilterProtos; |
| import org.apache.hadoop.hbase.util.ByteStringer; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.yetus.audience.InterfaceAudience; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; |
| import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; |
| |
| import scala.collection.mutable.MutableList; |
| |
| /** |
| * This filter will push down all qualifier logic given to us |
| * by SparkSQL so that we have make the filters at the region server level |
| * and avoid sending the data back to the client to be filtered. |
| */ |
| @InterfaceAudience.Private |
| public class SparkSQLPushDownFilter extends FilterBase{ |
| protected static final Logger log = LoggerFactory.getLogger(SparkSQLPushDownFilter.class); |
| |
| //The following values are populated with protobuffer |
| DynamicLogicExpression dynamicLogicExpression; |
| byte[][] valueFromQueryArray; |
| HashMap<ByteArrayComparable, HashMap<ByteArrayComparable, String>> |
| currentCellToColumnIndexMap; |
| |
| //The following values are transient |
| HashMap<String, ByteArrayComparable> columnToCurrentRowValueMap = null; |
| |
| static final byte[] rowKeyFamily = new byte[0]; |
| static final byte[] rowKeyQualifier = Bytes.toBytes("key"); |
| |
| String encoderClassName; |
| |
| public SparkSQLPushDownFilter(DynamicLogicExpression dynamicLogicExpression, |
| byte[][] valueFromQueryArray, |
| HashMap<ByteArrayComparable, |
| HashMap<ByteArrayComparable, String>> |
| currentCellToColumnIndexMap, String encoderClassName) { |
| this.dynamicLogicExpression = dynamicLogicExpression; |
| this.valueFromQueryArray = valueFromQueryArray; |
| this.currentCellToColumnIndexMap = currentCellToColumnIndexMap; |
| this.encoderClassName = encoderClassName; |
| } |
| |
| public SparkSQLPushDownFilter(DynamicLogicExpression dynamicLogicExpression, |
| byte[][] valueFromQueryArray, |
| MutableList<Field> fields, String encoderClassName) { |
| this.dynamicLogicExpression = dynamicLogicExpression; |
| this.valueFromQueryArray = valueFromQueryArray; |
| this.encoderClassName = encoderClassName; |
| |
| //generate family qualifier to index mapping |
| this.currentCellToColumnIndexMap = |
| new HashMap<>(); |
| |
| for (int i = 0; i < fields.size(); i++) { |
| Field field = fields.apply(i); |
| |
| byte[] cfBytes = field.cfBytes(); |
| ByteArrayComparable familyByteComparable = |
| new ByteArrayComparable(cfBytes, 0, cfBytes.length); |
| |
| HashMap<ByteArrayComparable, String> qualifierIndexMap = |
| currentCellToColumnIndexMap.get(familyByteComparable); |
| |
| if (qualifierIndexMap == null) { |
| qualifierIndexMap = new HashMap<>(); |
| currentCellToColumnIndexMap.put(familyByteComparable, qualifierIndexMap); |
| } |
| byte[] qBytes = field.colBytes(); |
| ByteArrayComparable qualifierByteComparable = |
| new ByteArrayComparable(qBytes, 0, qBytes.length); |
| |
| qualifierIndexMap.put(qualifierByteComparable, field.colName()); |
| } |
| } |
| |
| @Override |
| public ReturnCode filterCell(final Cell c) throws IOException { |
| |
| //If the map RowValueMap is empty then we need to populate |
| // the row key |
| if (columnToCurrentRowValueMap == null) { |
| columnToCurrentRowValueMap = new HashMap<>(); |
| HashMap<ByteArrayComparable, String> qualifierColumnMap = |
| currentCellToColumnIndexMap.get( |
| new ByteArrayComparable(rowKeyFamily, 0, rowKeyFamily.length)); |
| |
| if (qualifierColumnMap != null) { |
| String rowKeyColumnName = |
| qualifierColumnMap.get( |
| new ByteArrayComparable(rowKeyQualifier, 0, |
| rowKeyQualifier.length)); |
| //Make sure that the rowKey is part of the where clause |
| if (rowKeyColumnName != null) { |
| columnToCurrentRowValueMap.put(rowKeyColumnName, |
| new ByteArrayComparable(c.getRowArray(), |
| c.getRowOffset(), c.getRowLength())); |
| } |
| } |
| } |
| |
| //Always populate the column value into the RowValueMap |
| ByteArrayComparable currentFamilyByteComparable = |
| new ByteArrayComparable(c.getFamilyArray(), |
| c.getFamilyOffset(), |
| c.getFamilyLength()); |
| |
| HashMap<ByteArrayComparable, String> qualifierColumnMap = |
| currentCellToColumnIndexMap.get( |
| currentFamilyByteComparable); |
| |
| if (qualifierColumnMap != null) { |
| |
| String columnName = |
| qualifierColumnMap.get( |
| new ByteArrayComparable(c.getQualifierArray(), |
| c.getQualifierOffset(), |
| c.getQualifierLength())); |
| |
| if (columnName != null) { |
| columnToCurrentRowValueMap.put(columnName, |
| new ByteArrayComparable(c.getValueArray(), |
| c.getValueOffset(), c.getValueLength())); |
| } |
| } |
| |
| return ReturnCode.INCLUDE; |
| } |
| |
| |
| @Override |
| public boolean filterRow() throws IOException { |
| |
| try { |
| boolean result = |
| dynamicLogicExpression.execute(columnToCurrentRowValueMap, |
| valueFromQueryArray); |
| columnToCurrentRowValueMap = null; |
| return !result; |
| } catch (Throwable e) { |
| log.error("Error running dynamic logic on row", e); |
| } |
| return false; |
| } |
| |
| |
| /** |
| * @param pbBytes A pb serialized instance |
| * @return An instance of SparkSQLPushDownFilter |
| * @throws DeserializationException if the filter cannot be parsed from the given bytes |
| */ |
| @SuppressWarnings("unused") |
| public static SparkSQLPushDownFilter parseFrom(final byte[] pbBytes) |
| throws DeserializationException { |
| |
| SparkFilterProtos.SQLPredicatePushDownFilter proto; |
| try { |
| proto = SparkFilterProtos.SQLPredicatePushDownFilter.parseFrom(pbBytes); |
| } catch (InvalidProtocolBufferException e) { |
| throw new DeserializationException(e); |
| } |
| |
| String encoder = proto.getEncoderClassName(); |
| BytesEncoder enc = JavaBytesEncoder.create(encoder); |
| |
| //Load DynamicLogicExpression |
| DynamicLogicExpression dynamicLogicExpression = |
| DynamicLogicExpressionBuilder.build(proto.getDynamicLogicExpression(), enc); |
| |
| //Load valuesFromQuery |
| final List<ByteString> valueFromQueryArrayList = proto.getValueFromQueryArrayList(); |
| byte[][] valueFromQueryArray = new byte[valueFromQueryArrayList.size()][]; |
| for (int i = 0; i < valueFromQueryArrayList.size(); i++) { |
| valueFromQueryArray[i] = valueFromQueryArrayList.get(i).toByteArray(); |
| } |
| |
| //Load mapping from HBase family/qualifier to Spark SQL columnName |
| HashMap<ByteArrayComparable, HashMap<ByteArrayComparable, String>> |
| currentCellToColumnIndexMap = new HashMap<>(); |
| |
| for (SparkFilterProtos.SQLPredicatePushDownCellToColumnMapping |
| sqlPredicatePushDownCellToColumnMapping : |
| proto.getCellToColumnMappingList()) { |
| |
| byte[] familyArray = |
| sqlPredicatePushDownCellToColumnMapping.getColumnFamily().toByteArray(); |
| ByteArrayComparable familyByteComparable = |
| new ByteArrayComparable(familyArray, 0, familyArray.length); |
| HashMap<ByteArrayComparable, String> qualifierMap = |
| currentCellToColumnIndexMap.get(familyByteComparable); |
| |
| if (qualifierMap == null) { |
| qualifierMap = new HashMap<>(); |
| currentCellToColumnIndexMap.put(familyByteComparable, qualifierMap); |
| } |
| byte[] qualifierArray = |
| sqlPredicatePushDownCellToColumnMapping.getQualifier().toByteArray(); |
| |
| ByteArrayComparable qualifierByteComparable = |
| new ByteArrayComparable(qualifierArray, 0 ,qualifierArray.length); |
| |
| qualifierMap.put(qualifierByteComparable, |
| sqlPredicatePushDownCellToColumnMapping.getColumnName()); |
| } |
| |
| return new SparkSQLPushDownFilter(dynamicLogicExpression, |
| valueFromQueryArray, currentCellToColumnIndexMap, encoder); |
| } |
| |
| /** |
| * @return The filter serialized using pb |
| */ |
| public byte[] toByteArray() { |
| |
| SparkFilterProtos.SQLPredicatePushDownFilter.Builder builder = |
| SparkFilterProtos.SQLPredicatePushDownFilter.newBuilder(); |
| |
| SparkFilterProtos.SQLPredicatePushDownCellToColumnMapping.Builder columnMappingBuilder = |
| SparkFilterProtos.SQLPredicatePushDownCellToColumnMapping.newBuilder(); |
| |
| builder.setDynamicLogicExpression(dynamicLogicExpression.toExpressionString()); |
| for (byte[] valueFromQuery: valueFromQueryArray) { |
| builder.addValueFromQueryArray(ByteString.copyFrom(valueFromQuery)); |
| } |
| |
| for (Map.Entry<ByteArrayComparable, HashMap<ByteArrayComparable, String>> |
| familyEntry : currentCellToColumnIndexMap.entrySet()) { |
| for (Map.Entry<ByteArrayComparable, String> qualifierEntry : |
| familyEntry.getValue().entrySet()) { |
| columnMappingBuilder.setColumnFamily( |
| ByteString.copyFrom(familyEntry.getKey().bytes())); |
| columnMappingBuilder.setQualifier( |
| ByteString.copyFrom(qualifierEntry.getKey().bytes())); |
| columnMappingBuilder.setColumnName(qualifierEntry.getValue()); |
| builder.addCellToColumnMapping(columnMappingBuilder.build()); |
| } |
| } |
| builder.setEncoderClassName(encoderClassName); |
| |
| |
| return builder.build().toByteArray(); |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (!(obj instanceof SparkSQLPushDownFilter)) { |
| return false; |
| } |
| if (this == obj) { |
| return true; |
| } |
| SparkSQLPushDownFilter f = (SparkSQLPushDownFilter) obj; |
| if (this.valueFromQueryArray.length != f.valueFromQueryArray.length) { |
| return false; |
| } |
| int i = 0; |
| for (byte[] val : this.valueFromQueryArray) { |
| if (!Bytes.equals(val, f.valueFromQueryArray[i])) { |
| return false; |
| } |
| i++; |
| } |
| return this.dynamicLogicExpression.equals(f.dynamicLogicExpression) && |
| this.currentCellToColumnIndexMap.equals(f.currentCellToColumnIndexMap) && |
| this.encoderClassName.equals(f.encoderClassName); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(this.dynamicLogicExpression, Arrays.hashCode(this.valueFromQueryArray), |
| this.currentCellToColumnIndexMap, this.encoderClassName); |
| } |
| } |