| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.execute; |
| |
| import java.io.IOException; |
| import java.nio.MappedByteBuffer; |
| import java.sql.ParameterMetaData; |
| import java.sql.SQLException; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Queue; |
| import java.util.Set; |
| |
| import org.apache.hadoop.hbase.KeyValue; |
| import org.apache.hadoop.hbase.KeyValueUtil; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.phoenix.compile.ExplainPlan; |
| import org.apache.phoenix.compile.GroupByCompiler.GroupBy; |
| import org.apache.phoenix.compile.OrderByCompiler.OrderBy; |
| import org.apache.phoenix.compile.QueryPlan; |
| import org.apache.phoenix.compile.RowProjector; |
| import org.apache.phoenix.compile.StatementContext; |
| import org.apache.phoenix.exception.SQLExceptionCode; |
| import org.apache.phoenix.exception.SQLExceptionInfo; |
| import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple; |
| import org.apache.phoenix.expression.Expression; |
| import org.apache.phoenix.iterate.DefaultParallelScanGrouper; |
| import org.apache.phoenix.iterate.MappedByteBufferQueue; |
| import org.apache.phoenix.iterate.ParallelScanGrouper; |
| import org.apache.phoenix.iterate.ResultIterator; |
| import org.apache.phoenix.jdbc.PhoenixParameterMetaData; |
| import org.apache.phoenix.jdbc.PhoenixStatement.Operation; |
| import org.apache.phoenix.parse.FilterableStatement; |
| import org.apache.phoenix.parse.JoinTableNode.JoinType; |
| import org.apache.phoenix.query.KeyRange; |
| import org.apache.phoenix.query.QueryServices; |
| import org.apache.phoenix.query.QueryServicesOptions; |
| import org.apache.phoenix.schema.KeyValueSchema; |
| import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; |
| import org.apache.phoenix.schema.PColumn; |
| import org.apache.phoenix.schema.PTable; |
| import org.apache.phoenix.schema.TableRef; |
| import org.apache.phoenix.schema.ValueBitSet; |
| import org.apache.phoenix.schema.tuple.ResultTuple; |
| import org.apache.phoenix.schema.tuple.Tuple; |
| import org.apache.phoenix.util.ResultUtil; |
| import org.apache.phoenix.util.SchemaUtil; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Sets; |
| |
| public class SortMergeJoinPlan implements QueryPlan { |
| private static final byte[] EMPTY_PTR = new byte[0]; |
| |
| private final StatementContext context; |
| private final FilterableStatement statement; |
| private final TableRef table; |
| private final JoinType type; |
| private final QueryPlan lhsPlan; |
| private final QueryPlan rhsPlan; |
| private final List<Expression> lhsKeyExpressions; |
| private final List<Expression> rhsKeyExpressions; |
| private final KeyValueSchema joinedSchema; |
| private final KeyValueSchema lhsSchema; |
| private final KeyValueSchema rhsSchema; |
| private final int rhsFieldPosition; |
| private final boolean isSingleValueOnly; |
| private final Set<TableRef> tableRefs; |
| private final int thresholdBytes; |
| |
| public SortMergeJoinPlan(StatementContext context, FilterableStatement statement, TableRef table, |
| JoinType type, QueryPlan lhsPlan, QueryPlan rhsPlan, List<Expression> lhsKeyExpressions, List<Expression> rhsKeyExpressions, |
| PTable joinedTable, PTable lhsTable, PTable rhsTable, int rhsFieldPosition, boolean isSingleValueOnly) { |
| if (type == JoinType.Right) throw new IllegalArgumentException("JoinType should not be " + type); |
| this.context = context; |
| this.statement = statement; |
| this.table = table; |
| this.type = type; |
| this.lhsPlan = lhsPlan; |
| this.rhsPlan = rhsPlan; |
| this.lhsKeyExpressions = lhsKeyExpressions; |
| this.rhsKeyExpressions = rhsKeyExpressions; |
| this.joinedSchema = buildSchema(joinedTable); |
| this.lhsSchema = buildSchema(lhsTable); |
| this.rhsSchema = buildSchema(rhsTable); |
| this.rhsFieldPosition = rhsFieldPosition; |
| this.isSingleValueOnly = isSingleValueOnly; |
| this.tableRefs = Sets.newHashSetWithExpectedSize(lhsPlan.getSourceRefs().size() + rhsPlan.getSourceRefs().size()); |
| this.tableRefs.addAll(lhsPlan.getSourceRefs()); |
| this.tableRefs.addAll(rhsPlan.getSourceRefs()); |
| this.thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( |
| QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); |
| } |
| |
| @Override |
| public Operation getOperation() { |
| return statement.getOperation(); |
| } |
| |
| private static KeyValueSchema buildSchema(PTable table) { |
| KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); |
| if (table != null) { |
| for (PColumn column : table.getColumns()) { |
| if (!SchemaUtil.isPKColumn(column)) { |
| builder.addField(column); |
| } |
| } |
| } |
| return builder.build(); |
| } |
| |
| @Override |
| public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException { |
| return iterator(scanGrouper, null); |
| } |
| |
| @Override |
| public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { |
| return type == JoinType.Semi || type == JoinType.Anti ? |
| new SemiAntiJoinIterator(lhsPlan.iterator(scanGrouper), rhsPlan.iterator(scanGrouper)) : |
| new BasicJoinIterator(lhsPlan.iterator(scanGrouper), rhsPlan.iterator(scanGrouper)); |
| } |
| |
| @Override |
| public ResultIterator iterator() throws SQLException { |
| return iterator(DefaultParallelScanGrouper.getInstance()); |
| } |
| |
| @Override |
| public ExplainPlan getExplainPlan() throws SQLException { |
| List<String> steps = Lists.newArrayList(); |
| steps.add("SORT-MERGE-JOIN (" + type.toString().toUpperCase() + ") TABLES"); |
| for (String step : lhsPlan.getExplainPlan().getPlanSteps()) { |
| steps.add(" " + step); |
| } |
| steps.add("AND" + (rhsSchema.getFieldCount() == 0 ? " (SKIP MERGE)" : "")); |
| for (String step : rhsPlan.getExplainPlan().getPlanSteps()) { |
| steps.add(" " + step); |
| } |
| return new ExplainPlan(steps); |
| } |
| |
| @Override |
| public StatementContext getContext() { |
| return context; |
| } |
| |
| @Override |
| public ParameterMetaData getParameterMetaData() { |
| return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA; |
| } |
| |
| @Override |
| public long getEstimatedSize() { |
| return lhsPlan.getEstimatedSize() + rhsPlan.getEstimatedSize(); |
| } |
| |
| @Override |
| public TableRef getTableRef() { |
| return table; |
| } |
| |
| @Override |
| public RowProjector getProjector() { |
| return null; |
| } |
| |
| @Override |
| public Integer getLimit() { |
| return null; |
| } |
| |
| @Override |
| public Integer getOffset() { |
| return null; |
| } |
| |
| @Override |
| public OrderBy getOrderBy() { |
| return null; |
| } |
| |
| @Override |
| public GroupBy getGroupBy() { |
| return null; |
| } |
| |
| @Override |
| public List<KeyRange> getSplits() { |
| return Collections.<KeyRange> emptyList(); |
| } |
| |
| @Override |
| public List<List<Scan>> getScans() { |
| return Collections.<List<Scan>> emptyList(); |
| } |
| |
| @Override |
| public FilterableStatement getStatement() { |
| return statement; |
| } |
| |
| @Override |
| public boolean isDegenerate() { |
| return false; |
| } |
| |
| @Override |
| public boolean isRowKeyOrdered() { |
| return false; |
| } |
| |
| private class BasicJoinIterator implements ResultIterator { |
| private final ResultIterator lhsIterator; |
| private final ResultIterator rhsIterator; |
| private boolean initialized; |
| private Tuple lhsTuple; |
| private Tuple rhsTuple; |
| private JoinKey lhsKey; |
| private JoinKey rhsKey; |
| private Tuple nextLhsTuple; |
| private Tuple nextRhsTuple; |
| private JoinKey nextLhsKey; |
| private JoinKey nextRhsKey; |
| private ValueBitSet destBitSet; |
| private ValueBitSet lhsBitSet; |
| private ValueBitSet rhsBitSet; |
| private byte[] emptyProjectedValue; |
| private MappedByteBufferTupleQueue queue; |
| private Iterator<Tuple> queueIterator; |
| |
| public BasicJoinIterator(ResultIterator lhsIterator, ResultIterator rhsIterator) { |
| this.lhsIterator = lhsIterator; |
| this.rhsIterator = rhsIterator; |
| this.initialized = false; |
| this.lhsTuple = null; |
| this.rhsTuple = null; |
| this.lhsKey = new JoinKey(lhsKeyExpressions); |
| this.rhsKey = new JoinKey(rhsKeyExpressions); |
| this.nextLhsTuple = null; |
| this.nextRhsTuple = null; |
| this.nextLhsKey = new JoinKey(lhsKeyExpressions); |
| this.nextRhsKey = new JoinKey(rhsKeyExpressions); |
| this.destBitSet = ValueBitSet.newInstance(joinedSchema); |
| this.lhsBitSet = ValueBitSet.newInstance(lhsSchema); |
| this.rhsBitSet = ValueBitSet.newInstance(rhsSchema); |
| lhsBitSet.clear(); |
| int len = lhsBitSet.getEstimatedLength(); |
| this.emptyProjectedValue = new byte[len]; |
| lhsBitSet.toBytes(emptyProjectedValue, 0); |
| this.queue = new MappedByteBufferTupleQueue(thresholdBytes); |
| this.queueIterator = null; |
| } |
| |
| @Override |
| public void close() throws SQLException { |
| lhsIterator.close(); |
| rhsIterator.close(); |
| queue.close(); |
| } |
| |
| @Override |
| public Tuple next() throws SQLException { |
| if (!initialized) { |
| init(); |
| } |
| |
| Tuple next = null; |
| while (next == null && !isEnd()) { |
| if (queueIterator != null) { |
| if (queueIterator.hasNext()) { |
| next = join(lhsTuple, queueIterator.next()); |
| } else { |
| boolean eq = nextLhsTuple != null && lhsKey.equals(nextLhsKey); |
| advance(true); |
| if (eq) { |
| queueIterator = queue.iterator(); |
| } else { |
| queue.clear(); |
| queueIterator = null; |
| } |
| } |
| } else if (lhsTuple != null) { |
| if (rhsTuple != null) { |
| if (lhsKey.equals(rhsKey)) { |
| next = join(lhsTuple, rhsTuple); |
| if (nextLhsTuple != null && lhsKey.equals(nextLhsKey)) { |
| queue.offer(rhsTuple); |
| if (nextRhsTuple == null || !rhsKey.equals(nextRhsKey)) { |
| queueIterator = queue.iterator(); |
| advance(true); |
| } else if (isSingleValueOnly) { |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS).build().buildException(); |
| } |
| } else if (nextRhsTuple == null || !rhsKey.equals(nextRhsKey)) { |
| advance(true); |
| } else if (isSingleValueOnly) { |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS).build().buildException(); |
| } |
| advance(false); |
| } else if (lhsKey.compareTo(rhsKey) < 0) { |
| if (type == JoinType.Full || type == JoinType.Left) { |
| next = join(lhsTuple, null); |
| } |
| advance(true); |
| } else { |
| if (type == JoinType.Full) { |
| next = join(null, rhsTuple); |
| } |
| advance(false); |
| } |
| } else { // left-join or full-join |
| next = join(lhsTuple, null); |
| advance(true); |
| } |
| } else { // full-join |
| next = join(null, rhsTuple); |
| advance(false); |
| } |
| } |
| |
| return next; |
| } |
| |
| @Override |
| public void explain(List<String> planSteps) { |
| } |
| |
| private void init() throws SQLException { |
| nextLhsTuple = lhsIterator.next(); |
| if (nextLhsTuple != null) { |
| nextLhsKey.evaluate(nextLhsTuple); |
| } |
| advance(true); |
| nextRhsTuple = rhsIterator.next(); |
| if (nextRhsTuple != null) { |
| nextRhsKey.evaluate(nextRhsTuple); |
| } |
| advance(false); |
| initialized = true; |
| } |
| |
| private void advance(boolean lhs) throws SQLException { |
| if (lhs) { |
| lhsTuple = nextLhsTuple; |
| lhsKey.set(nextLhsKey); |
| if (lhsTuple != null) { |
| nextLhsTuple = lhsIterator.next(); |
| if (nextLhsTuple != null) { |
| nextLhsKey.evaluate(nextLhsTuple); |
| } else { |
| nextLhsKey.clear(); |
| } |
| } |
| } else { |
| rhsTuple = nextRhsTuple; |
| rhsKey.set(nextRhsKey); |
| if (rhsTuple != null) { |
| nextRhsTuple = rhsIterator.next(); |
| if (nextRhsTuple != null) { |
| nextRhsKey.evaluate(nextRhsTuple); |
| } else { |
| nextRhsKey.clear(); |
| } |
| } |
| } |
| } |
| |
| private boolean isEnd() { |
| return (lhsTuple == null && (rhsTuple == null || type != JoinType.Full)) |
| || (queueIterator == null && rhsTuple == null && type == JoinType.Inner); |
| } |
| |
| private Tuple join(Tuple lhs, Tuple rhs) throws SQLException { |
| try { |
| ProjectedValueTuple t = null; |
| if (lhs == null) { |
| t = new ProjectedValueTuple(rhs, rhs.getValue(0).getTimestamp(), |
| this.emptyProjectedValue, 0, this.emptyProjectedValue.length, |
| this.emptyProjectedValue.length); |
| } else if (lhs instanceof ProjectedValueTuple) { |
| t = (ProjectedValueTuple) lhs; |
| } else { |
| ImmutableBytesWritable ptr = context.getTempPtr(); |
| TupleProjector.decodeProjectedValue(lhs, ptr); |
| lhsBitSet.clear(); |
| lhsBitSet.or(ptr); |
| int bitSetLen = lhsBitSet.getEstimatedLength(); |
| t = new ProjectedValueTuple(lhs, lhs.getValue(0).getTimestamp(), |
| ptr.get(), ptr.getOffset(), ptr.getLength(), bitSetLen); |
| |
| } |
| return rhsBitSet == ValueBitSet.EMPTY_VALUE_BITSET ? |
| t : TupleProjector.mergeProjectedValue( |
| t, joinedSchema, destBitSet, |
| rhs, rhsSchema, rhsBitSet, rhsFieldPosition, true); |
| } catch (IOException e) { |
| throw new SQLException(e); |
| } |
| } |
| } |
| |
| private class SemiAntiJoinIterator implements ResultIterator { |
| private final ResultIterator lhsIterator; |
| private final ResultIterator rhsIterator; |
| private final boolean isSemi; |
| private boolean initialized; |
| private Tuple lhsTuple; |
| private Tuple rhsTuple; |
| private JoinKey lhsKey; |
| private JoinKey rhsKey; |
| |
| public SemiAntiJoinIterator(ResultIterator lhsIterator, ResultIterator rhsIterator) { |
| if (type != JoinType.Semi && type != JoinType.Anti) throw new IllegalArgumentException("Type " + type + " is not allowed by " + SemiAntiJoinIterator.class.getName()); |
| this.lhsIterator = lhsIterator; |
| this.rhsIterator = rhsIterator; |
| this.isSemi = type == JoinType.Semi; |
| this.initialized = false; |
| this.lhsTuple = null; |
| this.rhsTuple = null; |
| this.lhsKey = new JoinKey(lhsKeyExpressions); |
| this.rhsKey = new JoinKey(rhsKeyExpressions); |
| } |
| |
| @Override |
| public void close() throws SQLException { |
| lhsIterator.close(); |
| rhsIterator.close(); |
| } |
| |
| @Override |
| public Tuple next() throws SQLException { |
| if (!initialized) { |
| advance(true); |
| advance(false); |
| initialized = true; |
| } |
| |
| Tuple next = null; |
| while (lhsTuple != null && next == null) { |
| if (rhsTuple != null) { |
| if (lhsKey.equals(rhsKey)) { |
| if (isSemi) { |
| next = lhsTuple; |
| } |
| advance(true); |
| } else if (lhsKey.compareTo(rhsKey) < 0) { |
| if (!isSemi) { |
| next = lhsTuple; |
| } |
| advance(true); |
| } else { |
| advance(false); |
| } |
| } else { |
| if (!isSemi) { |
| next = lhsTuple; |
| } |
| advance(true); |
| } |
| } |
| |
| return next; |
| } |
| |
| @Override |
| public void explain(List<String> planSteps) { |
| } |
| |
| private void advance(boolean lhs) throws SQLException { |
| if (lhs) { |
| lhsTuple = lhsIterator.next(); |
| if (lhsTuple != null) { |
| lhsKey.evaluate(lhsTuple); |
| } else { |
| lhsKey.clear(); |
| } |
| } else { |
| rhsTuple = rhsIterator.next(); |
| if (rhsTuple != null) { |
| rhsKey.evaluate(rhsTuple); |
| } else { |
| rhsKey.clear(); |
| } |
| } |
| } |
| } |
| |
| private static class JoinKey implements Comparable<JoinKey> { |
| private final List<Expression> expressions; |
| private final List<ImmutableBytesWritable> keys; |
| |
| public JoinKey(List<Expression> expressions) { |
| this.expressions = expressions; |
| this.keys = Lists.newArrayListWithExpectedSize(expressions.size()); |
| for (int i = 0; i < expressions.size(); i++) { |
| this.keys.add(new ImmutableBytesWritable(EMPTY_PTR)); |
| } |
| } |
| |
| public void evaluate(Tuple tuple) { |
| for (int i = 0; i < keys.size(); i++) { |
| if (!expressions.get(i).evaluate(tuple, keys.get(i))) { |
| keys.get(i).set(EMPTY_PTR); |
| } |
| } |
| } |
| |
| public void set(JoinKey other) { |
| for (int i = 0; i < keys.size(); i++) { |
| ImmutableBytesWritable key = other.keys.get(i); |
| this.keys.get(i).set(key.get(), key.getOffset(), key.getLength()); |
| } |
| } |
| |
| public void clear() { |
| for (int i = 0; i < keys.size(); i++) { |
| this.keys.get(i).set(EMPTY_PTR); |
| } |
| } |
| |
| @Override |
| public boolean equals(Object other) { |
| if (!(other instanceof JoinKey)) |
| return false; |
| return this.compareTo((JoinKey) other) == 0; |
| } |
| |
| @Override |
| public int compareTo(JoinKey other) { |
| for (int i = 0; i < keys.size(); i++) { |
| int comp = this.keys.get(i).compareTo(other.keys.get(i)); |
| if (comp != 0) |
| return comp; |
| } |
| |
| return 0; |
| } |
| } |
| |
| private static class MappedByteBufferTupleQueue extends MappedByteBufferQueue<Tuple> { |
| |
| public MappedByteBufferTupleQueue(int thresholdBytes) { |
| super(thresholdBytes); |
| } |
| |
| @Override |
| protected MappedByteBufferSegmentQueue<Tuple> createSegmentQueue( |
| int index, int thresholdBytes) { |
| return new MappedByteBufferTupleSegmentQueue(index, thresholdBytes, false); |
| } |
| |
| @Override |
| protected Comparator<MappedByteBufferSegmentQueue<Tuple>> getSegmentQueueComparator() { |
| return new Comparator<MappedByteBufferSegmentQueue<Tuple>>() { |
| @Override |
| public int compare(MappedByteBufferSegmentQueue<Tuple> q1, |
| MappedByteBufferSegmentQueue<Tuple> q2) { |
| return q1.index() - q2.index(); |
| } |
| }; |
| } |
| |
| @Override |
| public Iterator<Tuple> iterator() { |
| return new Iterator<Tuple>() { |
| private Iterator<MappedByteBufferSegmentQueue<Tuple>> queueIter; |
| private Iterator<Tuple> currentIter; |
| { |
| this.queueIter = getSegmentQueues().iterator(); |
| this.currentIter = queueIter.hasNext() ? queueIter.next().iterator() : null; |
| } |
| |
| @Override |
| public boolean hasNext() { |
| return currentIter != null && currentIter.hasNext(); |
| } |
| |
| @Override |
| public Tuple next() { |
| if (!hasNext()) |
| return null; |
| |
| Tuple ret = currentIter.next(); |
| if (!currentIter.hasNext()) { |
| this.currentIter = queueIter.hasNext() ? queueIter.next().iterator() : null; |
| } |
| |
| return ret; |
| } |
| |
| @Override |
| public void remove() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| }; |
| } |
| |
| private static class MappedByteBufferTupleSegmentQueue extends MappedByteBufferSegmentQueue<Tuple> { |
| private LinkedList<Tuple> results; |
| |
| public MappedByteBufferTupleSegmentQueue(int index, |
| int thresholdBytes, boolean hasMaxQueueSize) { |
| super(index, thresholdBytes, hasMaxQueueSize); |
| this.results = Lists.newLinkedList(); |
| } |
| |
| @Override |
| protected Queue<Tuple> getInMemoryQueue() { |
| return results; |
| } |
| |
| @Override |
| protected int sizeOf(Tuple e) { |
| KeyValue kv = KeyValueUtil.ensureKeyValue(e.getValue(0)); |
| return Bytes.SIZEOF_INT * 2 + kv.getLength(); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| protected void writeToBuffer(MappedByteBuffer buffer, Tuple e) { |
| KeyValue kv = KeyValueUtil.ensureKeyValue(e.getValue(0)); |
| buffer.putInt(kv.getLength() + Bytes.SIZEOF_INT); |
| buffer.putInt(kv.getLength()); |
| buffer.put(kv.getBuffer(), kv.getOffset(), kv.getLength()); |
| } |
| |
| @Override |
| protected Tuple readFromBuffer(MappedByteBuffer buffer) { |
| int length = buffer.getInt(); |
| if (length < 0) |
| return null; |
| |
| byte[] b = new byte[length]; |
| buffer.get(b); |
| Result result = ResultUtil.toResult(new ImmutableBytesWritable(b)); |
| return new ResultTuple(result); |
| } |
| |
| } |
| } |
| |
| @Override |
| public boolean useRoundRobinIterator() { |
| return false; |
| } |
| |
| @Override |
| public Set<TableRef> getSourceRefs() { |
| return tableRefs; |
| } |
| } |