blob: 32d04698c4a3578e64a7231bed81849a801ee593 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.join;
import java.io.DataOutputStream;
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ExpressionType;
import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.expression.RowValueConstructorExpression;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.TrustedByteArrayOutputStream;
import org.apache.phoenix.util.TupleUtil;
import org.iq80.snappy.Snappy;
import com.google.common.collect.Lists;
/**
*
* Client for adding cache of one side of a join to region servers
*
*
* @since 0.1
*/
public class HashCacheClient {
private final ServerCacheClient serverCache;
/**
* Construct client used to create a serialized cached snapshot of a table and send it to each region server
* for caching during hash join processing.
* @param connection the client connection
*/
public HashCacheClient(PhoenixConnection connection) {
serverCache = new ServerCacheClient(connection);
}
/**
* Send the results of scanning through the scanner to all
* region servers for regions of the table that will use the cache
* that intersect with the minMaxKeyRange.
* @param scanner scanner for the table or intermediate results being cached
* @return client-side {@link ServerCache} representing the added hash cache
* @throws SQLException
* @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed
* size
*/
public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, TableRef cacheUsingTableRef, Expression keyRangeRhsExpression, List<Expression> keyRangeRhsValues) throws SQLException {
/**
* Serialize and compress hashCacheTable
*/
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
serialize(ptr, iterator, estimatedSize, onExpressions, singleValueOnly, keyRangeRhsExpression, keyRangeRhsValues);
return serverCache.addServerCache(keyRanges, ptr, ByteUtil.EMPTY_BYTE_ARRAY, new HashCacheFactory(), cacheUsingTableRef);
}
private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, Expression keyRangeRhsExpression, List<Expression> keyRangeRhsValues) throws SQLException {
long maxSize = serverCache.getConnection().getQueryServices().getProps().getLong(QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_SIZE);
estimatedSize = Math.min(estimatedSize, maxSize);
if (estimatedSize > Integer.MAX_VALUE) {
throw new IllegalStateException("Estimated size(" + estimatedSize + ") must not be greater than Integer.MAX_VALUE(" + Integer.MAX_VALUE + ")");
}
try {
TrustedByteArrayOutputStream baOut = new TrustedByteArrayOutputStream((int)estimatedSize);
DataOutputStream out = new DataOutputStream(baOut);
// Write onExpressions first, for hash key evaluation along with deserialization
out.writeInt(onExpressions.size());
for (Expression expression : onExpressions) {
WritableUtils.writeVInt(out, ExpressionType.valueOf(expression).ordinal());
expression.write(out);
}
int exprSize = baOut.size() + Bytes.SIZEOF_INT;
out.writeInt(exprSize * (singleValueOnly ? -1 : 1));
int nRows = 0;
out.writeInt(nRows); // In the end will be replaced with total number of rows
ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
for (Tuple result = iterator.next(); result != null; result = iterator.next()) {
TupleUtil.write(result, out);
if (baOut.size() > maxSize) {
throw new MaxServerCacheSizeExceededException("Size of hash cache (" + baOut.size() + " bytes) exceeds the maximum allowed size (" + maxSize + " bytes)");
}
// Evaluate key expressions for hash join key range optimization.
if (keyRangeRhsExpression != null) {
keyRangeRhsValues.add(evaluateKeyExpression(keyRangeRhsExpression, result, tempPtr));
}
nRows++;
}
TrustedByteArrayOutputStream sizeOut = new TrustedByteArrayOutputStream(Bytes.SIZEOF_INT);
DataOutputStream dataOut = new DataOutputStream(sizeOut);
try {
dataOut.writeInt(nRows);
dataOut.flush();
byte[] cache = baOut.getBuffer();
// Replace number of rows written above with the correct value.
System.arraycopy(sizeOut.getBuffer(), 0, cache, exprSize, sizeOut.size());
// Reallocate to actual size plus compressed buffer size (which is allocated below)
int maxCompressedSize = Snappy.maxCompressedLength(baOut.size());
byte[] compressed = new byte[maxCompressedSize]; // size for worst case
int compressedSize = Snappy.compress(baOut.getBuffer(), 0, baOut.size(), compressed, 0);
// Last realloc to size of compressed buffer.
ptr.set(compressed,0,compressedSize);
} finally {
dataOut.close();
}
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
} finally {
iterator.close();
}
}
/**
* Evaluate the RHS key expression and wrap the result as a new Expression.
* Unlike other types of Expression which will be evaluated and wrapped as a
* single LiteralExpression, RowValueConstructorExpression should be handled
* differently. We should evaluate each child of RVC and wrap them into a new
* RVC Expression, in order to make sure that the later coercion between the
* LHS key expression and this RHS key expression will be successful.
*
* @param keyExpression the RHS key expression
* @param tuple the input tuple
* @param ptr the temporary pointer
* @return the Expression containing the evaluated result
* @throws SQLException
*/
public static Expression evaluateKeyExpression(Expression keyExpression, Tuple tuple, ImmutableBytesWritable ptr) throws SQLException {
if (!(keyExpression instanceof RowValueConstructorExpression)) {
PDataType type = keyExpression.getDataType();
keyExpression.reset();
if (keyExpression.evaluate(tuple, ptr)) {
return LiteralExpression.newConstant(type.toObject(ptr, keyExpression.getSortOrder()), type);
}
return LiteralExpression.newConstant(null, type);
}
List<Expression> children = keyExpression.getChildren();
List<Expression> values = Lists.newArrayListWithExpectedSize(children.size());
for (Expression child : children) {
PDataType type = child.getDataType();
child.reset();
if (child.evaluate(tuple, ptr)) {
values.add(LiteralExpression.newConstant(type.toObject(ptr, child.getSortOrder()), type));
} else {
values.add(LiteralExpression.newConstant(null, type));
}
}
// The early evaluation of this constant expression is not necessary, for it
// might be coerced later.
return new RowValueConstructorExpression(values, false);
}
}