blob: d3886dbbfaffb8048cc7a35768f05e3e48ff22cc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.util;
import static org.apache.phoenix.util.SchemaUtil.getEscapedFullColumnName;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.parse.WildcardParseNode;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PInteger;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
public final class QueryUtil {
private static final Log LOG = LogFactory.getLog(QueryUtil.class);
/**
* Column family name index within ResultSet resulting from {@link DatabaseMetaData#getColumns(String, String, String, String)}
*/
public static final int COLUMN_FAMILY_POSITION = 25;
/**
* Column name index within ResultSet resulting from {@link DatabaseMetaData#getColumns(String, String, String, String)}
*/
public static final int COLUMN_NAME_POSITION = 4;
/**
* Data type index within ResultSet resulting from {@link DatabaseMetaData#getColumns(String, String, String, String)}
*/
public static final int DATA_TYPE_POSITION = 5;
/**
* Index of the column containing the datatype name within ResultSet resulting from {@link
* DatabaseMetaData#getColumns(String, String, String, String)}.
*/
public static final int DATA_TYPE_NAME_POSITION = 6;
private static final String SELECT = "SELECT";
private static final String FROM = "FROM";
private static final String WHERE = "WHERE";
private static final String AND = "AND";
private static final String[] CompareOpString = new String[CompareOp.values().length];
static {
CompareOpString[CompareOp.EQUAL.ordinal()] = "=";
CompareOpString[CompareOp.NOT_EQUAL.ordinal()] = "!=";
CompareOpString[CompareOp.GREATER.ordinal()] = ">";
CompareOpString[CompareOp.LESS.ordinal()] = "<";
CompareOpString[CompareOp.GREATER_OR_EQUAL.ordinal()] = ">=";
CompareOpString[CompareOp.LESS_OR_EQUAL.ordinal()] = "<=";
}
public static String toSQL(CompareOp op) {
return CompareOpString[op.ordinal()];
}
/**
* Private constructor
*/
private QueryUtil() {
}
/**
* Generate an upsert statement based on a list of {@code ColumnInfo}s with parameter markers. The list of
* {@code ColumnInfo}s must contain at least one element.
*
* @param tableName name of the table for which the upsert statement is to be created
* @param columnInfos list of column to be included in the upsert statement
* @return the created {@code UPSERT} statement
*/
public static String constructUpsertStatement(String tableName, List<ColumnInfo> columnInfos) {
if (columnInfos.isEmpty()) {
throw new IllegalArgumentException("At least one column must be provided for upserts");
}
final List<String> columnNames = Lists.transform(columnInfos, new Function<ColumnInfo,String>() {
@Override
public String apply(ColumnInfo columnInfo) {
return columnInfo.getColumnName();
}
});
return constructUpsertStatement(tableName, columnNames, null);
}
/**
* Generate an upsert statement based on a list of {@code ColumnInfo}s with parameter markers. The list of
* {@code ColumnInfo}s must contain at least one element.
*
* @param tableName name of the table for which the upsert statement is to be created
* @param columns list of columns to be included in the upsert statement
* @param hint hint to be added to the UPSERT statement.
* @return the created {@code UPSERT} statement
*/
public static String constructUpsertStatement(String tableName, List<String> columns, Hint hint) {
if (columns.isEmpty()) {
throw new IllegalArgumentException("At least one column must be provided for upserts");
}
String hintStr = "";
if(hint != null) {
final HintNode node = new HintNode(hint.name());
hintStr = node.toString();
}
List<String> parameterList = Lists.newArrayList();
for (int i = 0; i < columns.size(); i++) {
parameterList.add("?");
}
return String.format(
"UPSERT %s INTO %s (%s) VALUES (%s)",
hintStr,
tableName,
Joiner.on(", ").join(
Iterables.transform(
columns,
new Function<String, String>() {
@Nullable
@Override
public String apply(@Nullable String columnName) {
return getEscapedFullColumnName(columnName);
}
})),
Joiner.on(", ").join(parameterList));
}
/**
* Generate a generic upsert statement based on a number of columns. The created upsert statement will not include
* any named columns, but will include parameter markers for the given number of columns. The number of columns
* must be greater than zero.
*
* @param tableName name of the table for which the upsert statement is to be created
* @param numColumns number of columns to be included in the upsert statement
* @return the created {@code UPSERT} statement
*/
public static String constructGenericUpsertStatement(String tableName, int numColumns) {
if (numColumns == 0) {
throw new IllegalArgumentException("At least one column must be provided for upserts");
}
List<String> parameterList = Lists.newArrayListWithCapacity(numColumns);
for (int i = 0; i < numColumns; i++) {
parameterList.add("?");
}
return String.format("UPSERT INTO %s VALUES (%s)", tableName, Joiner.on(", ").join(parameterList));
}
/**
*
* @param fullTableName name of the table for which the select statement needs to be created.
* @param columnInfos list of columns to be projected in the select statement.
* @param conditions The condition clause to be added to the WHERE condition
* @return Select Query
*/
public static String constructSelectStatement(String fullTableName, List<ColumnInfo> columnInfos,final String conditions) {
Preconditions.checkNotNull(fullTableName,"Table name cannot be null");
if(columnInfos == null || columnInfos.isEmpty()) {
throw new IllegalArgumentException("At least one column must be provided");
}
StringBuilder query = new StringBuilder();
query.append("SELECT ");
for (ColumnInfo cinfo : columnInfos) {
if (cinfo != null) {
String fullColumnName = getEscapedFullColumnName(cinfo.getColumnName());
query.append(fullColumnName);
query.append(",");
}
}
// Remove the trailing comma
query.setLength(query.length() - 1);
query.append(" FROM ");
query.append(fullTableName);
if(conditions != null && conditions.length() > 0) {
query.append(" WHERE (").append(conditions).append(")");
}
return query.toString();
}
/**
* Create the Phoenix JDBC connection URL from the provided cluster connection details.
*/
public static String getUrl(String zkQuorum) {
return getUrlInternal(zkQuorum, null, null, null);
}
/**
* Create the Phoenix JDBC connection URL from the provided cluster connection details.
*/
public static String getUrl(String zkQuorum, int clientPort) {
return getUrlInternal(zkQuorum, clientPort, null, null);
}
/**
* Create the Phoenix JDBC connection URL from the provided cluster connection details.
*/
public static String getUrl(String zkQuorum, String znodeParent) {
return getUrlInternal(zkQuorum, null, znodeParent, null);
}
/**
* Create the Phoenix JDBC connection URL from the provided cluster connection details.
*/
public static String getUrl(String zkQuorum, int port, String znodeParent, String principal) {
return getUrlInternal(zkQuorum, port, znodeParent, principal);
}
/**
* Create the Phoenix JDBC connection URL from the provided cluster connection details.
*/
public static String getUrl(String zkQuorum, int port, String znodeParent) {
return getUrlInternal(zkQuorum, port, znodeParent, null);
}
/**
* Create the Phoenix JDBC connection URL from the provided cluster connection details.
*/
public static String getUrl(String zkQuorum, Integer port, String znodeParent) {
return getUrlInternal(zkQuorum, port, znodeParent, null);
}
/**
* Create the Phoenix JDBC connection URL from the provided cluster connection details.
*/
public static String getUrl(String zkQuorum, Integer port, String znodeParent, String principal) {
return getUrlInternal(zkQuorum, port, znodeParent, principal);
}
private static String getUrlInternal(String zkQuorum, Integer port, String znodeParent, String principal) {
return new PhoenixEmbeddedDriver.ConnectionInfo(zkQuorum, port, znodeParent, principal, null).toUrl()
+ PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
}
public static String getExplainPlan(ResultSet rs) throws SQLException {
StringBuilder buf = new StringBuilder();
while (rs.next()) {
buf.append(rs.getString(1));
buf.append('\n');
}
if (buf.length() > 0) {
buf.setLength(buf.length()-1);
}
return buf.toString();
}
public static String getExplainPlan(ResultIterator iterator) throws SQLException {
List<String> steps = Lists.newArrayList();
iterator.explain(steps);
StringBuilder buf = new StringBuilder();
for (String step : steps) {
buf.append(step);
buf.append('\n');
}
if (buf.length() > 0) {
buf.setLength(buf.length()-1);
}
return buf.toString();
}
/**
* @return {@link PhoenixConnection} with {@value UpgradeUtil#RUN_UPGRADE} set so that we don't initiate server upgrade
*/
public static Connection getConnectionOnServer(Configuration conf) throws ClassNotFoundException,
SQLException {
return getConnectionOnServer(new Properties(), conf);
}
/**
* @return {@link PhoenixConnection} with {@value UpgradeUtil#DO_NOT_UPGRADE} set so that we don't initiate metadata upgrade.
*/
public static Connection getConnectionOnServer(Properties props, Configuration conf)
throws ClassNotFoundException,
SQLException {
UpgradeUtil.doNotUpgradeOnFirstConnection(props);
return getConnection(props, conf);
}
public static Connection getConnection(Configuration conf) throws ClassNotFoundException,
SQLException {
return getConnection(new Properties(), conf);
}
private static Connection getConnection(Properties props, Configuration conf)
throws ClassNotFoundException, SQLException {
String url = getConnectionUrl(props, conf);
LOG.info("Creating connection with the jdbc url: " + url);
PropertiesUtil.extractProperties(props, conf);
return DriverManager.getConnection(url, props);
}
public static String getConnectionUrl(Properties props, Configuration conf)
throws ClassNotFoundException, SQLException {
return getConnectionUrl(props, conf, null);
}
public static String getConnectionUrl(Properties props, Configuration conf, String principal)
throws ClassNotFoundException, SQLException {
// read the hbase properties from the configuration
int port = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
// Build the ZK quorum server string with "server:clientport" list, separated by ','
final String server =
conf.get(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
String znodeParent = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
String url = getUrl(server, port, znodeParent, principal);
// Mainly for testing to tack on the test=true part to ensure driver is found on server
String extraArgs = props.getProperty(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, conf.get(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS));
if (extraArgs.length() > 0) {
url += extraArgs + PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
}
return url;
}
public static String getViewStatement(String schemaName, String tableName, String where) {
// Only form we currently support for VIEWs: SELECT * FROM t WHERE ...
return SELECT + " " + WildcardParseNode.NAME + " " + FROM + " " +
(schemaName == null || schemaName.length() == 0 ? "" : ("\"" + schemaName + "\".")) +
("\"" + tableName + "\" ") +
(WHERE + " " + where);
}
public static Integer getOffsetLimit(Integer limit, Integer offset) {
if (limit == null) {
return null;
} else if (offset == null) {
return limit;
} else {
return limit + offset;
}
}
public static Integer getRemainingOffset(Tuple offsetTuple) {
if (offsetTuple != null) {
ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr();
offsetTuple.getKey(rowKeyPtr);
if (QueryConstants.OFFSET_ROW_KEY_PTR.compareTo(rowKeyPtr) == 0) {
Cell cell = offsetTuple.getValue(QueryConstants.OFFSET_FAMILY, QueryConstants.OFFSET_COLUMN);
return PInteger.INSTANCE.toObject(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), PInteger.INSTANCE, SortOrder.ASC, null, null);
}
}
return null;
}
public static String getViewPartitionClause(String partitionColumnName, long autoPartitionNum) {
return partitionColumnName + " " + toSQL(CompareOp.EQUAL) + " " + autoPartitionNum;
}
}