blob: 3161eb492c0ae563ea77ea9c655b96b007476f50 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.join;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ExpressionType;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.parse.JoinTableNode.JoinType;
import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.SchemaUtil;
public class HashJoinInfo {
private static final String HASH_JOIN = "HashJoin";
private KeyValueSchema joinedSchema;
private ImmutableBytesPtr[] joinIds;
private List<Expression>[] joinExpressions;
private JoinType[] joinTypes;
private boolean[] earlyEvaluation;
private KeyValueSchema[] schemas;
private int[] fieldPositions;
private Expression postJoinFilterExpression;
private Integer limit;
private boolean forceProjection; // always true now, but for backward compatibility.
public HashJoinInfo(PTable joinedTable, ImmutableBytesPtr[] joinIds, List<Expression>[] joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, PTable[] tables, int[] fieldPositions, Expression postJoinFilterExpression, Integer limit) {
this(buildSchema(joinedTable), joinIds, joinExpressions, joinTypes, earlyEvaluation, buildSchemas(tables), fieldPositions, postJoinFilterExpression, limit, true);
}
private static KeyValueSchema[] buildSchemas(PTable[] tables) {
KeyValueSchema[] schemas = new KeyValueSchema[tables.length];
for (int i = 0; i < tables.length; i++) {
schemas[i] = buildSchema(tables[i]);
}
return schemas;
}
private static KeyValueSchema buildSchema(PTable table) {
KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
if (table != null) {
for (PColumn column : table.getColumns()) {
if (!SchemaUtil.isPKColumn(column)) {
builder.addField(column);
}
}
}
return builder.build();
}
private HashJoinInfo(KeyValueSchema joinedSchema, ImmutableBytesPtr[] joinIds, List<Expression>[] joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, KeyValueSchema[] schemas, int[] fieldPositions, Expression postJoinFilterExpression, Integer limit, boolean forceProjection) {
this.joinedSchema = joinedSchema;
this.joinIds = joinIds;
this.joinExpressions = joinExpressions;
this.joinTypes = joinTypes;
this.earlyEvaluation = earlyEvaluation;
this.schemas = schemas;
this.fieldPositions = fieldPositions;
this.postJoinFilterExpression = postJoinFilterExpression;
this.limit = limit;
this.forceProjection = forceProjection;
}
public KeyValueSchema getJoinedSchema() {
return joinedSchema;
}
public ImmutableBytesPtr[] getJoinIds() {
return joinIds;
}
public List<Expression>[] getJoinExpressions() {
return joinExpressions;
}
public JoinType[] getJoinTypes() {
return joinTypes;
}
public boolean[] earlyEvaluation() {
return earlyEvaluation;
}
public KeyValueSchema[] getSchemas() {
return schemas;
}
public int[] getFieldPositions() {
return fieldPositions;
}
public Expression getPostJoinFilterExpression() {
return postJoinFilterExpression;
}
public Integer getLimit() {
return limit;
}
public boolean forceProjection() {
return forceProjection;
}
public static void serializeHashJoinIntoScan(Scan scan, HashJoinInfo joinInfo) {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
try {
DataOutputStream output = new DataOutputStream(stream);
joinInfo.joinedSchema.write(output);
int count = joinInfo.joinIds.length;
WritableUtils.writeVInt(output, count);
for (int i = 0; i < count; i++) {
joinInfo.joinIds[i].write(output);
WritableUtils.writeVInt(output, joinInfo.joinExpressions[i].size());
for (Expression expr : joinInfo.joinExpressions[i]) {
WritableUtils.writeVInt(output, ExpressionType.valueOf(expr).ordinal());
expr.write(output);
}
WritableUtils.writeVInt(output, joinInfo.joinTypes[i].ordinal());
output.writeBoolean(joinInfo.earlyEvaluation[i]);
joinInfo.schemas[i].write(output);
WritableUtils.writeVInt(output, joinInfo.fieldPositions[i]);
}
if (joinInfo.postJoinFilterExpression != null) {
WritableUtils.writeVInt(output, ExpressionType.valueOf(joinInfo.postJoinFilterExpression).ordinal());
joinInfo.postJoinFilterExpression.write(output);
} else {
WritableUtils.writeVInt(output, -1);
}
WritableUtils.writeVInt(output, joinInfo.limit == null ? -1 : joinInfo.limit);
output.writeBoolean(joinInfo.forceProjection);
scan.setAttribute(HASH_JOIN, stream.toByteArray());
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
stream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@SuppressWarnings("unchecked")
public static HashJoinInfo deserializeHashJoinFromScan(Scan scan) {
byte[] join = scan.getAttribute(HASH_JOIN);
if (join == null) {
return null;
}
ByteArrayInputStream stream = new ByteArrayInputStream(join);
try {
DataInputStream input = new DataInputStream(stream);
KeyValueSchema joinedSchema = new KeyValueSchema();
joinedSchema.readFields(input);
int count = WritableUtils.readVInt(input);
ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[count];
List<Expression>[] joinExpressions = new List[count];
JoinType[] joinTypes = new JoinType[count];
boolean[] earlyEvaluation = new boolean[count];
KeyValueSchema[] schemas = new KeyValueSchema[count];
int[] fieldPositions = new int[count];
for (int i = 0; i < count; i++) {
joinIds[i] = new ImmutableBytesPtr();
joinIds[i].readFields(input);
int nExprs = WritableUtils.readVInt(input);
joinExpressions[i] = new ArrayList<Expression>(nExprs);
for (int j = 0; j < nExprs; j++) {
int expressionOrdinal = WritableUtils.readVInt(input);
Expression expression = ExpressionType.values()[expressionOrdinal].newInstance();
expression.readFields(input);
joinExpressions[i].add(expression);
}
int type = WritableUtils.readVInt(input);
joinTypes[i] = JoinType.values()[type];
earlyEvaluation[i] = input.readBoolean();
schemas[i] = new KeyValueSchema();
schemas[i].readFields(input);
fieldPositions[i] = WritableUtils.readVInt(input);
}
Expression postJoinFilterExpression = null;
int expressionOrdinal = WritableUtils.readVInt(input);
if (expressionOrdinal != -1) {
postJoinFilterExpression = ExpressionType.values()[expressionOrdinal].newInstance();
postJoinFilterExpression.readFields(input);
}
int limit = -1;
boolean forceProjection = false;
// Read these and ignore if we don't find them as they were not
// present in Apache Phoenix 3.0.0 release. This allows a newer
// 3.1 server to work with an older 3.0 client without force
// both to be upgraded in lock step.
try {
limit = WritableUtils.readVInt(input);
forceProjection = input.readBoolean();
} catch (EOFException ignore) {
}
return new HashJoinInfo(joinedSchema, joinIds, joinExpressions, joinTypes, earlyEvaluation, schemas, fieldPositions, postJoinFilterExpression, limit >= 0 ? limit : null, forceProjection);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
stream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}