| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.physical.impl.window; |
| |
| import com.google.common.collect.Lists; |
| import com.sun.codemodel.JExpr; |
| import org.apache.drill.common.exceptions.DrillException; |
| import org.apache.drill.common.expression.ErrorCollector; |
| import org.apache.drill.common.expression.ErrorCollectorImpl; |
| import org.apache.drill.common.expression.LogicalExpression; |
| import org.apache.drill.common.logical.data.NamedExpression; |
| import org.apache.drill.common.logical.data.Order; |
| import org.apache.drill.exec.compile.sig.GeneratorMapping; |
| import org.apache.drill.exec.compile.sig.MappingSet; |
| import org.apache.drill.exec.exception.ClassTransformationException; |
| import org.apache.drill.exec.exception.SchemaChangeException; |
| import org.apache.drill.exec.expr.ClassGenerator; |
| import org.apache.drill.exec.expr.CodeGenerator; |
| import org.apache.drill.exec.expr.ExpressionTreeMaterializer; |
| import org.apache.drill.exec.expr.ValueVectorReadExpression; |
| import org.apache.drill.exec.expr.ValueVectorWriteExpression; |
| import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; |
| import org.apache.drill.exec.memory.OutOfMemoryException; |
| import org.apache.drill.exec.memory.OutOfMemoryRuntimeException; |
| import org.apache.drill.exec.ops.FragmentContext; |
| import org.apache.drill.exec.physical.config.WindowPOP; |
| import org.apache.drill.exec.physical.impl.sort.RecordBatchData; |
| import org.apache.drill.exec.record.AbstractRecordBatch; |
| import org.apache.drill.exec.record.BatchSchema; |
| import org.apache.drill.exec.record.MaterializedField; |
| import org.apache.drill.exec.record.RecordBatch; |
| import org.apache.drill.exec.record.TypedFieldId; |
| import org.apache.drill.exec.record.VectorAccessible; |
| import org.apache.drill.exec.record.VectorWrapper; |
| import org.apache.drill.exec.vector.ValueVector; |
| |
| import java.io.IOException; |
| import java.util.List; |
| |
| /** |
| * support for OVER(PARTITION BY expression1,expression2,... [ORDER BY expressionA, expressionB,...]) |
| * |
| * Doesn't support distinct partitions: multiple window with different PARTITION BY clauses. |
| */ |
| public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { |
| static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WindowFrameRecordBatch.class); |
| |
| private final RecordBatch incoming; |
| private List<RecordBatchData> batches; |
| private WindowFramer framer; |
| |
| private boolean noMoreBatches; |
| private BatchSchema schema; |
| |
| public WindowFrameRecordBatch(WindowPOP popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { |
| super(popConfig, context); |
| this.incoming = incoming; |
| batches = Lists.newArrayList(); |
| } |
| |
| /** |
| * Let's assume we have the following 3 batches of data: |
| * <p><pre> |
| * +---------+--------+--------------+--------+ |
| * | b0 | b1 | b2 | b3 | |
| * +----+----+--------+----+----+----+--------+ |
| * | p0 | p1 | p1 | p2 | p3 | p4 | p5 | |
| * +----+----+--------+----+----+----+--------+ |
| * </pre></p> |
| * |
| * batch b0 contains partitions p0 and p1 |
| * batch b1 contains partition p1 |
| * batch b2 contains partitions p2 p3 and p4 |
| * batch b3 contains partition p5 |
| * |
| * <p><pre> |
| * when innerNext() is called: |
| * call next(incoming), we receive and save b0 in a list of RecordDataBatch |
| * we can't process b0 yet because we don't know if p1 has more rows upstream |
| * call next(incoming), we receive and save b1 |
| * we can't process b0 yet for the same reason previously stated |
| * call next(incoming), we receive and save b2 |
| * we process b0 (using the framer) and pass the container downstream |
| * when innerNext() is called: |
| * we process b1 and pass the container downstream, b0 and b1 are released from memory |
| * when innerNext() is called: |
| * call next(incoming), we receive and save b3 |
| * we process b2 and pass the container downstream, b2 is released from memory |
| * when innerNext() is called: |
| * call next(incoming) and receive NONE |
| * we process b3 and pass the container downstream, b3 is released from memory |
| * when innerNext() is called: |
| * we return NONE |
| * </pre></p> |
| * |
| */ |
| @Override |
| public IterOutcome innerNext() { |
| logger.trace("innerNext(), noMoreBatches = {}", noMoreBatches); |
| |
| // Short circuit if record batch has already sent all data and is done |
| if (state == BatchState.DONE) { |
| return IterOutcome.NONE; |
| } |
| |
| // keep saving incoming batches until the first unprocessed batch can be processed, or upstream == NONE |
| while (!noMoreBatches && !framer.canDoWork()) { |
| IterOutcome upstream = next(incoming); |
| logger.trace("next(incoming) returned {}", upstream); |
| |
| switch (upstream) { |
| case NONE: |
| noMoreBatches = true; |
| break; |
| case NOT_YET: |
| case STOP: |
| return upstream; |
| case OK_NEW_SCHEMA: |
| // when a partition of rows exceeds the current processed batch, it will be kept as "pending" and processed |
| // when innerNext() is called again. If the schema changes, the framer is "rebuilt" and the pending information |
| // will be lost which may lead to incorrect results. |
| |
| // only change in the case that the schema truly changes. Artificial schema changes are ignored. |
| if (!incoming.getSchema().equals(schema)) { |
| if (schema != null) { |
| throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas."); |
| } |
| this.schema = incoming.getSchema(); |
| } |
| case OK: |
| batches.add(new RecordBatchData(incoming)); |
| break; |
| default: |
| throw new UnsupportedOperationException(); |
| } |
| } |
| |
| if (batches.isEmpty()) { |
| logger.trace("no more batches to handle, we are DONE"); |
| state = BatchState.DONE; |
| return IterOutcome.NONE; |
| } |
| |
| // process a saved batch |
| try { |
| framer.doWork(); |
| } catch (DrillException | OutOfMemoryRuntimeException e) { |
| context.fail(e); |
| if (framer != null) { |
| framer.cleanup(); |
| framer = null; |
| } |
| return IterOutcome.STOP; |
| } |
| |
| if (state == BatchState.FIRST) { |
| state = BatchState.NOT_FIRST; |
| } |
| |
| return IterOutcome.OK; |
| } |
| |
| @Override |
| protected void buildSchema() throws SchemaChangeException { |
| logger.trace("buildSchema()"); |
| if (next(incoming) == IterOutcome.NONE) { |
| state = BatchState.DONE; |
| container.buildSchema(BatchSchema.SelectionVectorMode.NONE); |
| return; |
| } |
| |
| try { |
| framer = createFramer(incoming); |
| } catch (IOException | ClassTransformationException e) { |
| throw new SchemaChangeException("Exception when creating the schema", e); |
| } |
| } |
| |
| private WindowFramer createFramer(VectorAccessible batch) throws SchemaChangeException, IOException, ClassTransformationException { |
| logger.trace("creating framer"); |
| |
| container.clear(); |
| |
| if (framer != null) { |
| framer.cleanup(); |
| framer = null; |
| } |
| |
| ErrorCollector collector = new ErrorCollectorImpl(); |
| |
| // setup code generation to copy all incoming vectors to the container |
| // we can't just transfer them because after we pass the container downstream, some values will be needed when |
| // processing the next batches |
| int j = 0; |
| LogicalExpression[] windowExprs = new LogicalExpression[batch.getSchema().getFieldCount()]; |
| for (VectorWrapper wrapper : batch) { |
| // read value from saved batch |
| final LogicalExpression expr = ExpressionTreeMaterializer.materialize( |
| new ValueVectorReadExpression(new TypedFieldId(wrapper.getField().getType(), wrapper.isHyper(), j)), |
| batch, collector, context.getFunctionRegistry()); |
| |
| ValueVector vv = container.addOrGet(wrapper.getField()); |
| vv.allocateNew(); |
| |
| // write value into container |
| TypedFieldId id = container.getValueVectorId(vv.getField().getPath()); |
| windowExprs[j] = new ValueVectorWriteExpression(id, expr, true); |
| j++; |
| } |
| |
| // add aggregation vectors to the container, and materialize corresponding expressions |
| LogicalExpression[] aggExprs = new LogicalExpression[popConfig.getAggregations().length]; |
| for (int i = 0; i < aggExprs.length; i++) { |
| // evaluate expression over saved batch |
| NamedExpression ne = popConfig.getAggregations()[i]; |
| final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), batch, collector, context.getFunctionRegistry()); |
| |
| // add corresponding ValueVector to container |
| final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); |
| ValueVector vv = container.addOrGet(outputField); |
| vv.allocateNew(); |
| |
| // write value into container |
| TypedFieldId id = container.getValueVectorId(ne.getRef()); |
| aggExprs[i] = new ValueVectorWriteExpression(id, expr, true); |
| } |
| |
| if (container.isSchemaChanged()) { |
| container.buildSchema(BatchSchema.SelectionVectorMode.NONE); |
| } |
| |
| // materialize partition by expressions |
| LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getWithins().length]; |
| for (int i = 0; i < keyExprs.length; i++) { |
| NamedExpression ne = popConfig.getWithins()[i]; |
| keyExprs[i] = ExpressionTreeMaterializer.materialize(ne.getExpr(), batch, collector, context.getFunctionRegistry()); |
| } |
| |
| // materialize order by expressions |
| LogicalExpression[] orderExprs = new LogicalExpression[popConfig.getOrderings().length]; |
| for (int i = 0; i < orderExprs.length; i++) { |
| Order.Ordering oe = popConfig.getOrderings()[i]; |
| orderExprs[i] = ExpressionTreeMaterializer.materialize(oe.getExpr(), batch, collector, context.getFunctionRegistry()); |
| } |
| |
| if (collector.hasErrors()) { |
| throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); |
| } |
| |
| // generate framer code |
| |
| final ClassGenerator<WindowFramer> cg = CodeGenerator.getRoot(WindowFramer.TEMPLATE_DEFINITION, context.getFunctionRegistry()); |
| // setup for isSamePartition() |
| setupIsFunction(cg, keyExprs, isaB1, isaB2); |
| // setup for isPeer() |
| setupIsFunction(cg, orderExprs, isaP1, isaP2); |
| setupAddRecords(cg, aggExprs); |
| setupOutputWindowValues(cg, windowExprs); |
| |
| cg.getBlock("resetValues")._return(JExpr.TRUE); |
| |
| WindowFramer framer = context.getImplementationClass(cg); |
| framer.setup(batches, container); |
| |
| return framer; |
| } |
| |
| private static final GeneratorMapping IS_SAME_RECORD_BATCH_DATA_READ = GeneratorMapping.create("isSamePartition", "isSamePartition", null, null); |
| private final MappingSet isaB1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_RECORD_BATCH_DATA_READ, IS_SAME_RECORD_BATCH_DATA_READ); |
| private final MappingSet isaB2 = new MappingSet("b2Index", null, "b2", null, IS_SAME_RECORD_BATCH_DATA_READ, IS_SAME_RECORD_BATCH_DATA_READ); |
| |
| private static final GeneratorMapping IS_SAME_PEER = GeneratorMapping.create("isPeer", "isPeer", null, null); |
| private final MappingSet isaP1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PEER, IS_SAME_PEER); |
| private final MappingSet isaP2 = new MappingSet("b2Index", null, "b2", null, IS_SAME_PEER, IS_SAME_PEER); |
| |
| /** |
| * setup comparison functions isSamePartition and isPeer |
| */ |
| private void setupIsFunction(ClassGenerator<WindowFramer> cg, LogicalExpression[] exprs, MappingSet leftMapping, MappingSet rightMapping) { |
| cg.setMappingSet(leftMapping); |
| for (LogicalExpression expr : exprs) { |
| cg.setMappingSet(leftMapping); |
| ClassGenerator.HoldingContainer first = cg.addExpr(expr, false); |
| cg.setMappingSet(rightMapping); |
| ClassGenerator.HoldingContainer second = cg.addExpr(expr, false); |
| |
| LogicalExpression fh = |
| FunctionGenerationHelper |
| .getOrderingComparatorNullsHigh(first, second, context.getFunctionRegistry()); |
| ClassGenerator.HoldingContainer out = cg.addExpr(fh, false); |
| cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE); |
| } |
| cg.getEvalBlock()._return(JExpr.TRUE); |
| } |
| |
| private static final GeneratorMapping EVAL_INSIDE = GeneratorMapping.create("setupIncoming", "addRecord", null, null); |
| private static final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupOutgoing", "outputRecordValues", "resetValues", "cleanup"); |
| private final MappingSet eval = new MappingSet("index", "outIndex", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE); |
| |
| /** |
| * setup for addRecords() and outputRecordValues() |
| */ |
| private void setupAddRecords(ClassGenerator<WindowFramer> cg, LogicalExpression[] valueExprs) { |
| cg.setMappingSet(eval); |
| for (LogicalExpression ex : valueExprs) { |
| cg.addExpr(ex); |
| } |
| } |
| |
| private final static GeneratorMapping OUTPUT_WINDOW_VALUES = GeneratorMapping.create("setupCopy", "outputWindowValues", null, null); |
| private final MappingSet windowValues = new MappingSet("index", "index", OUTPUT_WINDOW_VALUES, OUTPUT_WINDOW_VALUES); |
| |
| private void setupOutputWindowValues(ClassGenerator<WindowFramer> cg, LogicalExpression[] valueExprs) { |
| cg.setMappingSet(windowValues); |
| for (LogicalExpression valueExpr : valueExprs) { |
| cg.addExpr(valueExpr); |
| } |
| } |
| |
| @Override |
| public void cleanup() { |
| if (framer != null) { |
| framer.cleanup(); |
| framer = null; |
| } |
| super.cleanup(); |
| incoming.cleanup(); |
| } |
| |
| @Override |
| protected void killIncoming(boolean sendUpstream) { |
| incoming.kill(sendUpstream); |
| } |
| |
| @Override |
| public int getRecordCount() { |
| return framer.getOutputCount(); |
| } |
| } |