| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.physical.impl.mergereceiver; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Comparator; |
| import java.util.List; |
| import java.util.PriorityQueue; |
| |
| import org.apache.calcite.rel.RelFieldCollation.Direction; |
| import org.apache.drill.common.exceptions.DrillRuntimeException; |
| import org.apache.drill.common.expression.ErrorCollector; |
| import org.apache.drill.common.expression.ErrorCollectorImpl; |
| import org.apache.drill.common.expression.LogicalExpression; |
| import org.apache.drill.common.logical.data.Order.Ordering; |
| import org.apache.drill.exec.compile.sig.GeneratorMapping; |
| import org.apache.drill.exec.compile.sig.MappingSet; |
| import org.apache.drill.exec.exception.OutOfMemoryException; |
| import org.apache.drill.exec.exception.SchemaChangeException; |
| import org.apache.drill.exec.expr.ClassGenerator; |
| import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; |
| import org.apache.drill.exec.expr.CodeGenerator; |
| import org.apache.drill.exec.expr.ExpressionTreeMaterializer; |
| import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; |
| import org.apache.drill.exec.ops.ExchangeFragmentContext; |
| import org.apache.drill.exec.ops.FragmentContext; |
| import org.apache.drill.exec.ops.MetricDef; |
| import org.apache.drill.exec.ops.QueryCancelledException; |
| import org.apache.drill.exec.physical.MinorFragmentEndpoint; |
| import org.apache.drill.exec.physical.config.MergingReceiverPOP; |
| import org.apache.drill.exec.proto.BitControl.FinishedReceiver; |
| import org.apache.drill.exec.proto.BitData; |
| import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; |
| import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; |
| import org.apache.drill.exec.proto.UserBitShared; |
| import org.apache.drill.exec.proto.UserBitShared.SerializedField; |
| import org.apache.drill.exec.record.AbstractRecordBatch; |
| import org.apache.drill.exec.record.BatchSchema; |
| import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; |
| import org.apache.drill.exec.record.ExpandableHyperContainer; |
| import org.apache.drill.exec.record.MaterializedField; |
| import org.apache.drill.exec.record.RawFragmentBatch; |
| import org.apache.drill.exec.record.RawFragmentBatchProvider; |
| import org.apache.drill.exec.record.RecordBatch; |
| import org.apache.drill.exec.record.RecordBatchLoader; |
| import org.apache.drill.exec.record.SchemaBuilder; |
| import org.apache.drill.exec.record.VectorAccessible; |
| import org.apache.drill.exec.record.VectorWrapper; |
| import org.apache.drill.exec.rpc.RpcException; |
| import org.apache.drill.exec.rpc.RpcOutcomeListener; |
| import org.apache.drill.exec.testing.ControlsInjector; |
| import org.apache.drill.exec.testing.ControlsInjectorFactory; |
| import org.apache.drill.exec.vector.AllocationHelper; |
| import org.apache.drill.exec.vector.CopyUtil; |
| import org.apache.drill.exec.vector.FixedWidthVector; |
| import org.apache.drill.exec.vector.ValueVector; |
| import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; |
| import org.apache.drill.shaded.guava.com.google.common.collect.Lists; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.sun.codemodel.JConditional; |
| import com.sun.codemodel.JExpr; |
| |
| import io.netty.buffer.ByteBuf; |
| |
| /** |
| * The MergingRecordBatch merges pre-sorted record batches from remote senders. |
| */ |
| public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> implements RecordBatch { |
| private static final Logger logger = LoggerFactory.getLogger(MergingRecordBatch.class); |
| private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(MergingRecordBatch.class); |
| |
| private static final int OUTGOING_BATCH_SIZE = 32 * 1024; |
| |
| private RecordBatchLoader[] batchLoaders; |
| private final RawFragmentBatchProvider[] fragProviders; |
| private final ExchangeFragmentContext context; |
| private MergingReceiverGeneratorBase merger; |
| private final MergingReceiverPOP config; |
| private boolean hasRun; |
| private boolean outgoingBatchHasSpace = true; |
| private boolean hasMoreIncoming = true; |
| |
| private int outgoingPosition; |
| private int senderCount; |
| private RawFragmentBatch[] incomingBatches; |
| private int[] batchOffsets; |
| private PriorityQueue <Node> pqueue; |
| private RawFragmentBatch[] tempBatchHolder; |
| private final long[] inputCounts; |
| private final long[] outputCounts; |
| |
| public enum Metric implements MetricDef { |
| BYTES_RECEIVED, |
| NUM_SENDERS, |
| NEXT_WAIT_NANOS; |
| |
| @Override |
| public int metricId() { |
| return ordinal(); |
| } |
| } |
| |
| public MergingRecordBatch(final ExchangeFragmentContext context, |
| final MergingReceiverPOP config, |
| final RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException { |
| super(config, context, true, context.newOperatorContext(config)); |
| this.fragProviders = fragProviders; |
| this.context = context; |
| this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders()); |
| this.config = config; |
| this.inputCounts = new long[config.getNumSenders()]; |
| this.outputCounts = new long[config.getNumSenders()]; |
| |
| // Register this operator's buffer allocator so that incoming buffers are owned by this allocator |
| context.getBuffers().getCollector(config.getOppositeMajorFragmentId()).setAllocator(oContext.getAllocator()); |
| } |
| |
| private RawFragmentBatch getNext(final int providerIndex) throws IOException { |
| stats.startWait(); |
| final RawFragmentBatchProvider provider = fragProviders[providerIndex]; |
| try { |
| injector.injectInterruptiblePause(context.getExecutionControls(), "waiting-for-data", logger); |
| final RawFragmentBatch b = provider.getNext(); |
| if (b != null) { |
| stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount()); |
| stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false); |
| inputCounts[providerIndex] += b.getHeader().getDef().getRecordCount(); |
| } |
| return b; |
| } catch(final InterruptedException e) { |
| // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the |
| // interruption and respond to it if it wants to. |
| Thread.currentThread().interrupt(); |
| |
| throw new QueryCancelledException(); |
| } finally { |
| stats.stopWait(); |
| } |
| } |
| |
| private void clearBatches(List<RawFragmentBatch> batches) { |
| for (RawFragmentBatch batch : batches) { |
| if (batch != null) { |
| batch.release(); |
| } |
| } |
| } |
| |
| @Override |
| public IterOutcome innerNext() { |
| if (fragProviders.length == 0) { |
| return IterOutcome.NONE; |
| } |
| boolean schemaChanged = false; |
| |
| if (!outgoingBatchHasSpace) { |
| logger.debug("Outgoing vectors were full on last iteration"); |
| allocateOutgoing(); |
| outgoingPosition = 0; |
| outgoingBatchHasSpace = true; |
| } |
| |
| if (!hasMoreIncoming) { |
| logger.debug("next() was called after all values have been processed"); |
| outgoingPosition = 0; |
| return IterOutcome.NONE; |
| } |
| |
| List<UserBitShared.SerializedField> fieldList = null; |
| boolean createDummyBatch = false; |
| |
| // lazy initialization |
| if (!hasRun) { |
| schemaChanged = true; // first iteration is always a schema change |
| |
| // set up each (non-empty) incoming record batch |
| final List<RawFragmentBatch> rawBatches = Lists.newArrayList(); |
| try { |
| int p = 0; |
| for (@SuppressWarnings("unused") final RawFragmentBatchProvider provider : fragProviders) { |
| RawFragmentBatch rawBatch; |
| // check if there is a batch in temp holder before calling getNext(), as it may have been used when building schema |
| if (tempBatchHolder[p] != null) { |
| rawBatch = tempBatchHolder[p]; |
| tempBatchHolder[p] = null; |
| } else { |
| try { |
| rawBatch = getNext(p); |
| } catch (final IOException e) { |
| context.getExecutorState().fail(e); |
| return IterOutcome.STOP; |
| } |
| } |
| checkContinue(); |
| |
| // If rawBatch is null, go ahead and add it to the list. We will create dummy batches |
| // for all null batches later. |
| if (rawBatch == null) { |
| checkContinue(); |
| createDummyBatch = true; |
| rawBatches.add(rawBatch); |
| p++; // move to next sender |
| continue; |
| } |
| |
| if (fieldList == null && rawBatch.getHeader().getDef().getFieldCount() != 0) { |
| // save the schema to fix up empty batches with no schema if needed. |
| fieldList = rawBatch.getHeader().getDef().getFieldList(); |
| } |
| |
| if (rawBatch.getHeader().getDef().getRecordCount() != 0) { |
| rawBatches.add(rawBatch); |
| } else { |
| // keep reading till we get a batch with record count > 0 or we have no more batches to read i.e. we get null |
| try { |
| while ((rawBatch = getNext(p)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) { |
| // Do nothing |
| } |
| if (rawBatch == null) { |
| checkContinue(); |
| createDummyBatch = true; |
| } |
| } catch (final IOException e) { |
| context.getExecutorState().fail(e); |
| clearBatches(rawBatches); |
| return IterOutcome.STOP; |
| } |
| if (rawBatch == null || rawBatch.getHeader().getDef().getFieldCount() == 0) { |
| createDummyBatch = true; |
| } |
| // Even if rawBatch is null, go ahead and add it to the list. |
| // We will create dummy batches for all null batches later. |
| rawBatches.add(rawBatch); |
| } |
| p++; |
| } |
| |
| // If no batch arrived with schema from any of the providers, just return NONE. |
| if (fieldList == null) { |
| return IterOutcome.NONE; |
| } |
| |
| // Go through and fix schema for empty batches. |
| if (createDummyBatch) { |
| // Create dummy record batch definition with 0 record count |
| UserBitShared.RecordBatchDef dummyDef = UserBitShared.RecordBatchDef.newBuilder() |
| // we cannot use/modify the original field list as that is used by |
| // valid record batch. |
| // create a copy of field list with valuecount = 0 for all fields. |
| // This is for dummy schema generation. |
| .addAllField(createDummyFieldList(fieldList)) |
| .setRecordCount(0) |
| .build(); |
| |
| // Create dummy header |
| BitData.FragmentRecordBatch dummyHeader = BitData.FragmentRecordBatch.newBuilder() |
| .setIsLastBatch(true) |
| .setDef(dummyDef) |
| .build(); |
| |
| for (int i = 0; i < p; i++) { |
| RawFragmentBatch rawBatch = rawBatches.get(i); |
| if (rawBatch == null || rawBatch.getHeader().getDef().getFieldCount() == 0) { |
| rawBatch = new RawFragmentBatch(dummyHeader, null, null); |
| rawBatches.set(i, rawBatch); |
| } |
| } |
| } |
| } catch (Throwable t) { |
| clearBatches(rawBatches); |
| throw t; |
| } |
| |
| // allocate the incoming record batch loaders |
| senderCount = rawBatches.size(); |
| incomingBatches = new RawFragmentBatch[senderCount]; |
| batchOffsets = new int[senderCount]; |
| batchLoaders = new RecordBatchLoader[senderCount]; |
| for (int i = 0; i < senderCount; ++i) { |
| incomingBatches[i] = rawBatches.get(i); |
| batchLoaders[i] = new RecordBatchLoader(oContext.getAllocator()); |
| } |
| |
| // after this point all batches have moved to incomingBatches |
| rawBatches.clear(); |
| |
| int i = 0; |
| for (final RawFragmentBatch batch : incomingBatches) { |
| // initialize the incoming batchLoaders |
| final UserBitShared.RecordBatchDef rbd = batch.getHeader().getDef(); |
| try { |
| batchLoaders[i].load(rbd, batch.getBody()); |
| // TODO: Clean: DRILL-2933: That load(...) no longer throws |
| // SchemaChangeException, so check/clean catch clause below. |
| } catch(final SchemaChangeException e) { |
| logger.error("MergingReceiver failed to load record batch from remote host. {}", e); |
| context.getExecutorState().fail(e); |
| return IterOutcome.STOP; |
| } |
| batch.release(); |
| ++batchOffsets[i]; |
| ++i; |
| } |
| |
| // after this point all batches have been released and their bytebuf are in batchLoaders |
| |
| // Ensure all the incoming batches have the identical schema. |
| // Note: RecordBatchLoader permutes the columns to obtain the same columns order for all batches. |
| if (!isSameSchemaAmongBatches(batchLoaders)) { |
| context.getExecutorState().fail(new SchemaChangeException("Incoming batches for merging receiver have different schemas!")); |
| return IterOutcome.STOP; |
| } |
| |
| // create the outgoing schema and vector container, and allocate the initial batch |
| final SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE); |
| for (final VectorWrapper<?> v : batchLoaders[0]) { |
| |
| // add field to the output schema |
| bldr.addField(v.getField()); |
| |
| // allocate a new value vector |
| container.addOrGet(v.getField()); |
| } |
| allocateOutgoing(); |
| |
| container.buildSchema(BatchSchema.SelectionVectorMode.NONE); |
| |
| // generate code for merge operations (copy and compare) |
| merger = createMerger(); |
| |
| // allocate the priority queue with the generated comparator |
| this.pqueue = new PriorityQueue<>(fragProviders.length, new Comparator<Node>() { |
| @Override |
| public int compare(final Node node1, final Node node2) { |
| final int leftIndex = (node1.batchId << 16) + node1.valueIndex; |
| final int rightIndex = (node2.batchId << 16) + node2.valueIndex; |
| try { |
| return merger.doEval(leftIndex, rightIndex); |
| } catch (SchemaChangeException e) { |
| throw new UnsupportedOperationException(e); |
| } |
| } |
| }); |
| |
| // populate the priority queue with initial values |
| for (int b = 0; b < senderCount; ++b) { |
| while (batchLoaders[b] != null && batchLoaders[b].getRecordCount() == 0) { |
| try { |
| final RawFragmentBatch batch = getNext(b); |
| incomingBatches[b] = batch; |
| if (batch != null) { |
| batchLoaders[b].load(batch.getHeader().getDef(), batch.getBody()); |
| } else { |
| batchLoaders[b].clear(); |
| batchLoaders[b] = null; |
| checkContinue(); |
| } |
| } catch (IOException | SchemaChangeException e) { |
| context.getExecutorState().fail(e); |
| return IterOutcome.STOP; |
| } |
| } |
| if (batchLoaders[b] != null) { |
| pqueue.add(new Node(b, 0)); |
| } |
| } |
| |
| hasRun = true; |
| // finished lazy initialization |
| } |
| |
| while (outgoingBatchHasSpace) { |
| // poll next value from pq and copy to outgoing batch |
| final Node node = pqueue.poll(); |
| if (node == null) { |
| break; |
| } |
| outgoingBatchHasSpace = copyRecordToOutgoingBatch(node); |
| |
| if (node.valueIndex == batchLoaders[node.batchId].getRecordCount() - 1) { |
| // reached the end of an incoming record batch |
| RawFragmentBatch nextBatch; |
| try { |
| nextBatch = getNext(node.batchId); |
| |
| while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) { |
| nextBatch = getNext(node.batchId); |
| } |
| |
| assert nextBatch != null || inputCounts[node.batchId] == outputCounts[node.batchId] |
| : String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]); |
| if (nextBatch == null) { |
| checkContinue(); |
| } |
| } catch (final IOException e) { |
| context.getExecutorState().fail(e); |
| return IterOutcome.STOP; |
| } |
| |
| incomingBatches[node.batchId] = nextBatch; |
| |
| if (nextBatch == null) { |
| // batch is empty |
| boolean allBatchesEmpty = true; |
| |
| for (final RawFragmentBatch batch : incomingBatches) { |
| // see if all batches are empty so we can return OK_* or NONE |
| if (batch != null) { |
| allBatchesEmpty = false; |
| break; |
| } |
| } |
| |
| if (allBatchesEmpty) { |
| hasMoreIncoming = false; |
| break; |
| } |
| |
| // this batch is empty; since the pqueue no longer references this batch, it will be |
| // ignored in subsequent iterations. |
| continue; |
| } |
| |
| final UserBitShared.RecordBatchDef rbd = incomingBatches[node.batchId].getHeader().getDef(); |
| try { |
| batchLoaders[node.batchId].load(rbd, incomingBatches[node.batchId].getBody()); |
| // TODO: Clean: DRILL-2933: That load(...) no longer throws |
| // SchemaChangeException, so check/clean catch clause below. |
| } catch(final SchemaChangeException ex) { |
| context.getExecutorState().fail(ex); |
| return IterOutcome.STOP; |
| } |
| incomingBatches[node.batchId].release(); |
| batchOffsets[node.batchId] = 0; |
| |
| // add front value from batch[x] to priority queue |
| if (batchLoaders[node.batchId].getRecordCount() != 0) { |
| node.valueIndex = 0; |
| pqueue.add(node); |
| } |
| |
| } else { |
| node.valueIndex++; |
| pqueue.add(node); |
| } |
| |
| } |
| |
| // set the value counts in the outgoing vectors |
| container.setValueCount(outgoingPosition); |
| |
| if (pqueue.isEmpty()) { |
| state = BatchState.DONE; |
| } |
| |
| if (schemaChanged) { |
| return IterOutcome.OK_NEW_SCHEMA; |
| } |
| else { |
| return IterOutcome.OK; |
| } |
| } |
| |
| // Create dummy field that will be used for empty batches. |
| private UserBitShared.SerializedField createDummyField(UserBitShared.SerializedField field) { |
| UserBitShared.SerializedField.Builder newDummyFieldBuilder = UserBitShared.SerializedField.newBuilder() |
| .setVarByteLength(0) |
| .setBufferLength(0) |
| .setValueCount(0) |
| .setNamePart(field.getNamePart()) |
| .setMajorType(field.getMajorType()); |
| |
| int index = 0; |
| for (UserBitShared.SerializedField childField : field.getChildList()) { |
| // make sure we make a copy of all children, so we do not corrupt the |
| // original fieldList. This will recursively call itself. |
| newDummyFieldBuilder.addChild(index, createDummyField(childField)); |
| index++; |
| } |
| |
| UserBitShared.SerializedField newDummyField = newDummyFieldBuilder.build(); |
| |
| return newDummyField; |
| } |
| |
| // Create a dummy field list that we can use for empty batches. |
| private List<UserBitShared.SerializedField> createDummyFieldList(List<UserBitShared.SerializedField> fieldList) { |
| List<UserBitShared.SerializedField> dummyFieldList = new ArrayList<UserBitShared.SerializedField>(); |
| |
| for (UserBitShared.SerializedField field : fieldList) { |
| dummyFieldList.add(createDummyField(field)); |
| } |
| |
| return dummyFieldList; |
| } |
| |
| @Override |
| public FragmentContext getContext() { |
| return context; |
| } |
| |
| @Override |
| public BatchSchema getSchema() { |
| if (container.hasSchema()) { |
| return container.getSchema(); |
| } |
| return null; |
| } |
| |
| @Override |
| public void buildSchema() throws SchemaChangeException { |
| // find frag provider that has data to use to build schema, and put in tempBatchHolder for later use |
| tempBatchHolder = new RawFragmentBatch[fragProviders.length]; |
| int i = 0; |
| try { |
| while (true) { |
| if (i >= fragProviders.length) { |
| state = BatchState.DONE; |
| return; |
| } |
| final RawFragmentBatch batch = getNext(i); |
| if (batch == null) { |
| checkContinue(); |
| } |
| if (batch.getHeader().getDef().getFieldCount() == 0) { |
| i++; |
| continue; |
| } |
| tempBatchHolder[i] = batch; |
| for (final SerializedField field : batch.getHeader().getDef().getFieldList()) { |
| final ValueVector v = container.addOrGet(MaterializedField.create(field)); |
| v.allocateNew(); |
| } |
| break; |
| } |
| } catch (final IOException e) { |
| throw new DrillRuntimeException(e); |
| } |
| container.buildSchema(SelectionVectorMode.NONE); |
| container.setEmpty(); |
| } |
| |
| @Override |
| public int getRecordCount() { |
| return outgoingPosition; |
| } |
| |
| @Override |
| public void kill(final boolean sendUpstream) { |
| if (sendUpstream) { |
| informSenders(); |
| } else { |
| close(); |
| } |
| |
| for (final RawFragmentBatchProvider provider : fragProviders) { |
| provider.kill(context); |
| } |
| } |
| |
| private void informSenders() { |
| logger.info("Informing senders of request to terminate sending."); |
| final FragmentHandle handlePrototype = FragmentHandle.newBuilder() |
| .setMajorFragmentId(config.getOppositeMajorFragmentId()) |
| .setQueryId(context.getHandle().getQueryId()) |
| .build(); |
| for (final MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) { |
| final FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype) |
| .setMinorFragmentId(providingEndpoint.getId()) |
| .build(); |
| final FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder() |
| .setReceiver(context.getHandle()) |
| .setSender(sender) |
| .build(); |
| context.getController() |
| .getTunnel(providingEndpoint.getEndpoint()) |
| .informReceiverFinished(new OutcomeListener(), finishedReceiver); |
| } |
| } |
| |
| // TODO: Code duplication. UnorderedReceiverBatch has the same implementation. |
| private class OutcomeListener implements RpcOutcomeListener<Ack> { |
| |
| @Override |
| public void failed(final RpcException ex) { |
| logger.warn("Failed to inform upstream that receiver is finished"); |
| } |
| |
| @Override |
| public void success(final Ack value, final ByteBuf buffer) { |
| // Do nothing |
| } |
| |
| @Override |
| public void interrupted(final InterruptedException e) { |
| if (context.getExecutorState().shouldContinue()) { |
| final String errMsg = "Received an interrupt RPC outcome while sending ReceiverFinished message"; |
| logger.error(errMsg, e); |
| context.getExecutorState().fail(new RpcException(errMsg, e)); |
| } |
| } |
| } |
| |
| @Override |
| protected void killIncoming(final boolean sendUpstream) { |
| //No op |
| } |
| |
| private boolean isSameSchemaAmongBatches(final RecordBatchLoader[] batchLoaders) { |
| Preconditions.checkArgument(batchLoaders.length > 0, "0 batch is not allowed!"); |
| |
| final BatchSchema schema = batchLoaders[0].getSchema(); |
| |
| for (int i = 1; i < batchLoaders.length; i++) { |
| if (!schema.equals(batchLoaders[i].getSchema())) { |
| logger.error("Schemas are different. Schema 1 : " + schema + ", Schema 2: " + batchLoaders[i].getSchema() ); |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| private void allocateOutgoing() { |
| for (final VectorWrapper<?> w : container) { |
| final ValueVector v = w.getValueVector(); |
| if (v instanceof FixedWidthVector) { |
| AllocationHelper.allocate(v, OUTGOING_BATCH_SIZE, 1); |
| } else { |
| v.allocateNewSafe(); |
| } |
| } |
| } |
| |
| /** |
| * Creates a generate class which implements the copy and compare methods. |
| * |
| * @return instance of a new merger based on generated code |
| * @throws SchemaChangeException |
| */ |
| private MergingReceiverGeneratorBase createMerger() { |
| |
| final CodeGenerator<MergingReceiverGeneratorBase> cg = |
| CodeGenerator.get(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, |
| context.getOptions()); |
| cg.plainJavaCapable(true); |
| // Uncomment out this line to debug the generated code. |
| // cg.saveCodeForDebugging(true); |
| final ClassGenerator<MergingReceiverGeneratorBase> g = cg.getRoot(); |
| |
| ExpandableHyperContainer batch = null; |
| boolean first = true; |
| for (final RecordBatchLoader loader : batchLoaders) { |
| if (first) { |
| batch = new ExpandableHyperContainer(loader); |
| first = false; |
| } else { |
| batch.addBatch(loader); |
| } |
| } |
| |
| try { |
| generateComparisons(g, batch); |
| |
| g.setMappingSet(COPIER_MAPPING_SET); |
| CopyUtil.generateCopies(g, batch, true); |
| g.setMappingSet(MAIN_MAPPING); |
| final MergingReceiverGeneratorBase merger = context.getImplementationClass(cg); |
| |
| merger.doSetup(context, batch, container); |
| return merger; |
| } catch (SchemaChangeException e) { |
| throw schemaChangeException(e, logger); |
| } |
| } |
| |
| private final MappingSet MAIN_MAPPING = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP); |
| private final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP); |
| private final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP); |
| private final GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", null, null); |
| private final MappingSet COPIER_MAPPING_SET = new MappingSet(COPIER_MAPPING, COPIER_MAPPING); |
| |
| private void generateComparisons(final ClassGenerator<?> g, final VectorAccessible batch) throws SchemaChangeException { |
| g.setMappingSet(MAIN_MAPPING); |
| |
| for (final Ordering od : popConfig.getOrderings()) { |
| // first, we rewrite the evaluation stack for each side of the comparison. |
| final ErrorCollector collector = new ErrorCollectorImpl(); |
| final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry()); |
| if (collector.hasErrors()) { |
| throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); |
| } |
| g.setMappingSet(LEFT_MAPPING); |
| final HoldingContainer left = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE); |
| g.setMappingSet(RIGHT_MAPPING); |
| final HoldingContainer right = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE); |
| g.setMappingSet(MAIN_MAPPING); |
| |
| // next we wrap the two comparison sides and add the expression block for the comparison. |
| final LogicalExpression fh = |
| FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right, |
| context.getFunctionRegistry()); |
| final HoldingContainer out = g.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE); |
| final JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0))); |
| |
| if (od.getDirection() == Direction.ASCENDING) { |
| jc._then()._return(out.getValue()); |
| } else { |
| jc._then()._return(out.getValue().minus()); |
| } |
| } |
| |
| g.getEvalBlock()._return(JExpr.lit(0)); |
| } |
| |
| /** |
| * Copy the record referenced by the supplied node to the next output position. |
| * Side Effect: increments outgoing position if successful |
| * |
| * @param node Reference to the next record to copy from the incoming batches |
| */ |
| private boolean copyRecordToOutgoingBatch(final Node node) { |
| assert outgoingPosition < OUTGOING_BATCH_SIZE |
| : String.format("Outgoing position %d must be less than bath size %d", outgoingPosition, OUTGOING_BATCH_SIZE); |
| assert ++outputCounts[node.batchId] <= inputCounts[node.batchId] |
| : String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]); |
| final int inIndex = (node.batchId << 16) + node.valueIndex; |
| try { |
| merger.doCopy(inIndex, outgoingPosition); |
| } catch (SchemaChangeException e) { |
| throw new UnsupportedOperationException(e); |
| } |
| if (++outgoingPosition == OUTGOING_BATCH_SIZE) { |
| logger.debug("Outgoing vectors space is full (batch size {}).", OUTGOING_BATCH_SIZE); |
| return false; |
| } |
| return true; |
| } |
| |
| /** |
| * A Node contains a reference to a single value in a specific incoming batch. It is used |
| * as a wrapper for the priority queue. |
| */ |
| public class Node { |
| public int batchId; // incoming batch |
| public int valueIndex; // value within the batch |
| Node(final int batchId, final int valueIndex) { |
| this.batchId = batchId; |
| this.valueIndex = valueIndex; |
| } |
| } |
| |
| @Override |
| public void close() { |
| container.clear(); |
| if (batchLoaders != null) { |
| for (final RecordBatchLoader rbl : batchLoaders) { |
| if (rbl != null) { |
| rbl.clear(); |
| } |
| } |
| } |
| super.close(); |
| } |
| |
| @Override |
| public void dump() { |
| logger.error("MergingRecordBatch[container={}, outgoingPosition={}, incomingBatches={}, batchOffsets={}, " |
| + "tempBatchHolder={}, inputCounts={}, outputCounts={}]", |
| container, outgoingPosition, Arrays.toString(incomingBatches), Arrays.toString(batchOffsets), |
| Arrays.toString(tempBatchHolder), Arrays.toString(inputCounts), Arrays.toString(outputCounts)); |
| } |
| } |