| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.physical.impl.filter; |
| |
| import java.io.IOException; |
| import java.util.List; |
| |
| import org.apache.drill.common.expression.ErrorCollector; |
| import org.apache.drill.common.expression.ErrorCollectorImpl; |
| import org.apache.drill.common.expression.LogicalExpression; |
| import org.apache.drill.exec.exception.ClassTransformationException; |
| import org.apache.drill.exec.exception.OutOfMemoryException; |
| import org.apache.drill.exec.exception.SchemaChangeException; |
| import org.apache.drill.exec.expr.ClassGenerator; |
| import org.apache.drill.exec.expr.CodeGenerator; |
| import org.apache.drill.exec.expr.ExpressionTreeMaterializer; |
| import org.apache.drill.exec.ops.FragmentContext; |
| import org.apache.drill.exec.physical.config.Filter; |
| import org.apache.drill.exec.record.AbstractSingleRecordBatch; |
| import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; |
| import org.apache.drill.exec.record.RecordBatch; |
| import org.apache.drill.exec.record.TransferPair; |
| import org.apache.drill.exec.record.VectorWrapper; |
| import org.apache.drill.exec.record.selection.SelectionVector2; |
| import org.apache.drill.exec.record.selection.SelectionVector4; |
| import org.apache.drill.exec.vector.ValueVector; |
| import org.apache.drill.shaded.guava.com.google.common.collect.Lists; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter> { |
| |
| private static final Logger logger = LoggerFactory.getLogger(FilterRecordBatch.class); |
| |
| private SelectionVector2 sv2; |
| private SelectionVector4 sv4; |
| private Filterer filter; |
| |
| public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { |
| super(pop, context, incoming); |
| } |
| |
| @Override |
| public FragmentContext getContext() { |
| return context; |
| } |
| |
| @Override |
| public int getRecordCount() { |
| return sv2 != null ? sv2.getCount() : sv4.getCount(); |
| } |
| |
| @Override |
| public SelectionVector2 getSelectionVector2() { |
| return sv2; |
| } |
| |
| @Override |
| public SelectionVector4 getSelectionVector4() { |
| return sv4; |
| } |
| |
| @Override |
| protected IterOutcome doWork() { |
| try { |
| container.zeroVectors(); |
| int recordCount = incoming.getRecordCount(); |
| filter.filterBatch(recordCount); |
| // The container needs the actual number of values in |
| // its contained vectors (not the filtered count) |
| // Not sure the SV4 case is actually supported... |
| container.setRecordCount( |
| sv2 != null ? sv2.getBatchActualRecordCount() : recordCount); |
| return getFinalOutcome(false); |
| } catch (SchemaChangeException e) { |
| throw new UnsupportedOperationException(e); |
| } |
| } |
| |
| @Override |
| public void close() { |
| clearSv(); |
| super.close(); |
| } |
| |
| private void clearSv() { |
| if (sv2 != null) { |
| sv2.clear(); |
| } |
| if (sv4 != null) { |
| sv4.clear(); |
| } |
| } |
| |
| @Override |
| protected boolean setupNewSchema() throws SchemaChangeException { |
| clearSv(); |
| |
| switch (incoming.getSchema().getSelectionVectorMode()) { |
| case NONE: |
| if (sv2 == null) { |
| sv2 = new SelectionVector2(oContext.getAllocator()); |
| } |
| filter = generateSV2Filterer(); |
| break; |
| case TWO_BYTE: |
| sv2 = new SelectionVector2(oContext.getAllocator()); |
| filter = generateSV2Filterer(); |
| break; |
| case FOUR_BYTE: |
| /* |
| * Filter does not support SV4 handling. There are couple of minor issues in the |
| * logic that handles SV4 + filter should always be pushed beyond sort so disabling |
| * it in FilterPrel. |
| */ |
| default: |
| throw new UnsupportedOperationException(); |
| } |
| |
| if (container.isSchemaChanged()) { |
| container.buildSchema(SelectionVectorMode.TWO_BYTE); |
| return true; |
| } |
| return false; |
| } |
| |
| protected Filterer generateSV4Filterer() throws SchemaChangeException { |
| final ErrorCollector collector = new ErrorCollectorImpl(); |
| final List<TransferPair> transfers = Lists.newArrayList(); |
| final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION4, context.getOptions()); |
| |
| final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector, context.getFunctionRegistry()); |
| if (collector.hasErrors()) { |
| throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); |
| } |
| |
| cg.addExpr(new ReturnValueExpression(expr), ClassGenerator.BlkCreateMode.FALSE); |
| |
| for (final VectorWrapper<?> vw : incoming) { |
| for (final ValueVector vv : vw.getValueVectors()) { |
| final TransferPair pair = vv.getTransferPair(oContext.getAllocator()); |
| container.add(pair.getTo()); |
| transfers.add(pair); |
| } |
| } |
| |
| // allocate outgoing sv4 |
| container.buildSchema(SelectionVectorMode.FOUR_BYTE); |
| |
| try { |
| final TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]); |
| final Filterer filter = context.getImplementationClass(cg); |
| filter.setup(context, incoming, this, tx); |
| return filter; |
| } catch (ClassTransformationException | IOException e) { |
| throw new SchemaChangeException("Failure while attempting to load generated class", e); |
| } |
| } |
| |
| protected Filterer generateSV2Filterer() throws SchemaChangeException { |
| final ErrorCollector collector = new ErrorCollectorImpl(); |
| final List<TransferPair> transfers = Lists.newArrayList(); |
| final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION2, context.getOptions()); |
| cg.getCodeGenerator().plainJavaCapable(true); |
| // Uncomment the following line to enable saving generated code file for debugging |
| // cg.getCodeGenerator().saveCodeForDebugging(true); |
| |
| final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector, |
| context.getFunctionRegistry(), false, unionTypeEnabled); |
| if (collector.hasErrors()) { |
| throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); |
| } |
| |
| cg.addExpr(new ReturnValueExpression(expr), ClassGenerator.BlkCreateMode.FALSE); |
| |
| for (final VectorWrapper<?> v : incoming) { |
| final TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField(), callBack)); |
| transfers.add(pair); |
| } |
| |
| try { |
| final TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]); |
| CodeGenerator<Filterer> codeGen = cg.getCodeGenerator(); |
| codeGen.plainJavaCapable(true); |
| final Filterer filter = context.getImplementationClass(codeGen); |
| filter.setup(context, incoming, this, tx); |
| return filter; |
| } catch (ClassTransformationException | IOException e) { |
| throw new SchemaChangeException("Failure while attempting to load generated class", e); |
| } |
| } |
| |
| @Override |
| public void dump() { |
| logger.error("FilterRecordBatch[container={}, selectionVector2={}, filter={}, popConfig={}]", container, sv2, filter, popConfig); |
| } |
| } |