blob: e308b3dd4984bc0f86fda0f8bfcea5fbb717eec2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.util;
import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_PAUSE;
import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER;
import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_PAUSE;
import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER;
import java.io.IOException;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Region.RowLock;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
import org.apache.phoenix.exception.PhoenixIOException;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.hbase.index.util.VersionUtil;
import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ServerUtil {
private static final int COPROCESSOR_SCAN_WORKS = VersionUtil.encodeVersion("0.98.6");
private static final Logger LOGGER = LoggerFactory.getLogger(ServerUtil.class);
private static final String FORMAT = "ERROR %d (%s): %s";
private static final Pattern PATTERN = Pattern.compile("ERROR (\\d+) \\((\\w+)\\): (.*)");
private static final Pattern HASH_JOIN_EXCEPTION_PATTERN = Pattern.compile("joinId: (-?\\d+)");
private static final Pattern PATTERN_FOR_TS = Pattern.compile(",serverTimestamp=(\\d+),");
private static final String FORMAT_FOR_TIMESTAMP = ",serverTimestamp=%d,";
private static final Map<Class<? extends Exception>, SQLExceptionCode> errorcodeMap
= new HashMap<Class<? extends Exception>, SQLExceptionCode>();
static {
// Map a normal exception into a corresponding SQLException.
errorcodeMap.put(ArithmeticException.class, SQLExceptionCode.SERVER_ARITHMETIC_ERROR);
}
public static void throwIOException(String msg, Throwable t) throws IOException {
throw createIOException(msg, t);
}
public static IOException createIOException(String msg, Throwable t) {
// First unwrap SQLExceptions if it's root cause is an IOException.
if (t instanceof SQLException) {
Throwable cause = t.getCause();
if (cause instanceof IOException) {
t = cause;
}
}
// Throw immediately if DoNotRetryIOException
if (t instanceof DoNotRetryIOException) {
return (DoNotRetryIOException) t;
} else if (t instanceof IOException) {
// If the IOException does not wrap any exception, then bubble it up.
Throwable cause = t.getCause();
if (cause instanceof RetriesExhaustedWithDetailsException)
return new DoNotRetryIOException(t.getMessage(), cause);
else if (cause == null || cause instanceof IOException) {
return (IOException) t;
}
// Else assume it's been wrapped, so throw as DoNotRetryIOException to prevent client hanging while retrying
return new DoNotRetryIOException(t.getMessage(), cause);
} else if (t instanceof SQLException) {
// If it's already an SQLException, construct an error message so we can parse and reconstruct on the client side.
return new DoNotRetryIOException(constructSQLErrorMessage((SQLException) t, msg), t);
} else {
// Not a DoNotRetryIOException, IOException or SQLException. Map the exception type to a general SQLException
// and construct the error message so it can be reconstruct on the client side.
//
// If no mapping exists, rethrow it as a generic exception.
SQLExceptionCode code = errorcodeMap.get(t.getClass());
if (code == null) {
return new DoNotRetryIOException(msg + ": " + t.getMessage(), t);
} else {
return new DoNotRetryIOException(constructSQLErrorMessage(code, t, msg), t);
}
}
}
private static String constructSQLErrorMessage(SQLExceptionCode code, Throwable e, String message) {
return constructSQLErrorMessage(code.getErrorCode(), code.getSQLState(), code.getMessage() + " " + e.getMessage() + " " + message);
}
private static String constructSQLErrorMessage(SQLException e, String message) {
return constructSQLErrorMessage(e.getErrorCode(), e.getSQLState(), e.getMessage() + " " + message);
}
private static String constructSQLErrorMessage(int errorCode, String SQLState, String message) {
return String.format(FORMAT, errorCode, SQLState, message);
}
public static SQLException parseServerException(Throwable t) {
SQLException e = parseServerExceptionOrNull(t);
if (e != null) {
return e;
}
return new PhoenixIOException(t);
}
/**
* Return the first SQLException in the exception chain, otherwise parse it.
* When we're receiving an exception locally, there's no need to string parse,
* as the SQLException will already be part of the chain.
* @param t
* @return the SQLException, or null if none found
*/
public static SQLException parseLocalOrRemoteServerException(Throwable t) {
while (t.getCause() != null) {
if (t instanceof NotServingRegionException) {
return parseRemoteException(new StaleRegionBoundaryCacheException());
} else if (t instanceof SQLException) {
return (SQLException) t;
}
t = t.getCause();
}
return parseRemoteException(t);
}
public static SQLException parseServerExceptionOrNull(Throwable t) {
while (t.getCause() != null) {
if (t instanceof NotServingRegionException) {
return parseRemoteException(new StaleRegionBoundaryCacheException());
}
t = t.getCause();
}
return parseRemoteException(t);
}
private static SQLException parseRemoteException(Throwable t) {
String message = t.getLocalizedMessage();
if (message != null) {
// If the message matches the standard pattern, recover the SQLException and throw it.
Matcher matcher = PATTERN.matcher(t.getLocalizedMessage());
if (matcher.find()) {
int statusCode = Integer.parseInt(matcher.group(1));
SQLExceptionCode code = SQLExceptionCode.fromErrorCode(statusCode);
if(code.equals(SQLExceptionCode.HASH_JOIN_CACHE_NOT_FOUND)){
Matcher m = HASH_JOIN_EXCEPTION_PATTERN.matcher(t.getLocalizedMessage());
if (m.find()) { return new HashJoinCacheNotFoundException(Long.parseLong(m.group(1))); }
}
return new SQLExceptionInfo.Builder(code).setMessage(matcher.group()).setRootCause(t).build().buildException();
}
}
return null;
}
private static boolean coprocessorScanWorks(RegionCoprocessorEnvironment env) {
return (VersionUtil.encodeVersion(env.getHBaseVersion()) >= COPROCESSOR_SCAN_WORKS);
}
/*
* This code works around HBASE-11837 which causes HTableInterfaces retrieved from
* RegionCoprocessorEnvironment to not read local data.
*/
private static Table getTableFromSingletonPool(RegionCoprocessorEnvironment env, TableName tableName) throws IOException {
// It's ok to not ever do a pool.close() as we're storing a single
// table only. The HTablePool holds no other resources that this table
// which will be closed itself when it's no longer needed.
Connection conn = ConnectionFactory.getConnection(ConnectionType.DEFAULT_SERVER_CONNECTION, env);
try {
return conn.getTable(tableName);
} catch (RuntimeException t) {
// handle cases that an IOE is wrapped inside a RuntimeException like HTableInterface#createHTableInterface
if(t.getCause() instanceof IOException) {
throw (IOException)t.getCause();
} else {
throw t;
}
}
}
public static Table getHTableForCoprocessorScan (RegionCoprocessorEnvironment env,
Table writerTable) throws IOException {
if (coprocessorScanWorks(env)) {
return writerTable;
}
return getTableFromSingletonPool(env, writerTable.getName());
}
public static Table getHTableForCoprocessorScan (RegionCoprocessorEnvironment env, TableName tableName) throws IOException {
if (coprocessorScanWorks(env)) {
return env.getConnection().getTable(tableName);
}
return getTableFromSingletonPool(env, tableName);
}
public static long parseServerTimestamp(Throwable t) {
while (t.getCause() != null) {
t = t.getCause();
}
return parseTimestampFromRemoteException(t);
}
public static long parseTimestampFromRemoteException(Throwable t) {
String message = t.getLocalizedMessage();
if (message != null) {
// If the message matches the standard pattern, recover the SQLException and throw it.
Matcher matcher = PATTERN_FOR_TS.matcher(t.getLocalizedMessage());
if (matcher.find()) {
String tsString = matcher.group(1);
if (tsString != null) {
return Long.parseLong(tsString);
}
}
}
return HConstants.LATEST_TIMESTAMP;
}
public static DoNotRetryIOException wrapInDoNotRetryIOException(String msg, Throwable t, long timestamp) {
if (msg == null) {
msg = "";
}
if (t instanceof SQLException) {
msg = t.getMessage() + " " + msg;
}
msg += String.format(FORMAT_FOR_TIMESTAMP, timestamp);
return new DoNotRetryIOException(msg, t);
}
public static boolean readyToCommit(int rowCount, long mutationSize, int maxBatchSize, long maxBatchSizeBytes) {
return maxBatchSize > 0 && rowCount >= maxBatchSize
|| (maxBatchSizeBytes > 0 && mutationSize >= maxBatchSizeBytes);
}
public static boolean isKeyInRegion(byte[] key, Region region) {
byte[] startKey = region.getRegionInfo().getStartKey();
byte[] endKey = region.getRegionInfo().getEndKey();
return (Bytes.compareTo(startKey, key) <= 0
&& (Bytes.compareTo(HConstants.LAST_ROW, endKey) == 0 || Bytes.compareTo(key,
endKey) < 0));
}
public static RowLock acquireLock(Region region, byte[] key, List<RowLock> locks)
throws IOException {
RowLock rowLock = region.getRowLock(key, false);
if (rowLock == null) {
throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
}
if (locks != null) {
locks.add(rowLock);
}
return rowLock;
}
public static void releaseRowLocks(List<RowLock> rowLocks) {
if (rowLocks != null) {
for (RowLock rowLock : rowLocks) {
rowLock.release();
}
rowLocks.clear();
}
}
public static enum ConnectionType {
COMPACTION_CONNECTION,
INDEX_WRITER_CONNECTION,
INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS,
INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS_NO_RETRIES,
DEFAULT_SERVER_CONNECTION;
}
public static class ConnectionFactory {
private static Map<ConnectionType, Connection> connections =
new ConcurrentHashMap<ConnectionType, Connection>();
public static Connection getConnection(final ConnectionType connectionType, final RegionCoprocessorEnvironment env) {
return connections.computeIfAbsent(connectionType, new Function<ConnectionType, Connection>() {
@Override
public Connection apply(ConnectionType t) {
try {
return env.createConnection(getTypeSpecificConfiguration(connectionType, env.getConfiguration()));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
}
public static Configuration getTypeSpecificConfiguration(ConnectionType connectionType, Configuration conf) {
switch (connectionType) {
case COMPACTION_CONNECTION:
return getCompactionConfig(conf);
case DEFAULT_SERVER_CONNECTION:
return conf;
case INDEX_WRITER_CONNECTION:
return getIndexWriterConnection(conf);
case INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS:
return getIndexWriterConfigurationWithCustomThreads(conf);
case INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS_NO_RETRIES:
return getNoRetriesIndexWriterConfigurationWithCustomThreads(conf);
default:
return conf;
}
}
public static void shutdown() {
synchronized (ConnectionFactory.class) {
for (Connection connection : connections.values()) {
try {
connection.close();
} catch (IOException e) {
LOGGER.warn("Unable to close coprocessor connection", e);
}
}
connections.clear();
}
}
public static int getConnectionsCount() {
return connections.size();
}
}
public static Configuration getCompactionConfig(Configuration conf) {
Configuration compactionConfig = PropertiesUtil.cloneConfig(conf);
// lower the number of rpc retries, so we don't hang the compaction
compactionConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
conf.getInt(QueryServices.METADATA_WRITE_RETRIES_NUMBER,
QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRIES_NUMBER));
compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE,
conf.getInt(QueryServices.METADATA_WRITE_RETRY_PAUSE,
QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRY_PAUSE));
return compactionConfig;
}
public static Configuration getIndexWriterConnection(Configuration conf) {
Configuration clonedConfig = PropertiesUtil.cloneConfig(conf);
/*
* Set the rpc controller factory so that the HTables used by IndexWriter would
* set the correct priorities on the remote RPC calls.
*/
clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class);
// lower the number of rpc retries. We inherit config from HConnectionManager#setServerSideHConnectionRetries,
// which by default uses a multiplier of 10. That is too many retries for our synchronous index writes
clonedConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
conf.getInt(INDEX_WRITER_RPC_RETRIES_NUMBER,
DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER));
clonedConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, conf
.getInt(INDEX_WRITER_RPC_PAUSE, DEFAULT_INDEX_WRITER_RPC_PAUSE));
return clonedConfig;
}
public static Configuration getIndexWriterConfigurationWithCustomThreads(Configuration conf) {
Configuration clonedConfig = getIndexWriterConnection(conf);
setHTableThreads(clonedConfig);
return clonedConfig;
}
private static void setHTableThreads(Configuration conf) {
// set the number of threads allowed per table.
int htableThreads =
conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY,
IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS);
IndexManagementUtil.setIfNotSet(conf, IndexWriterUtils.HTABLE_THREAD_KEY, htableThreads);
}
public static Configuration getNoRetriesIndexWriterConfigurationWithCustomThreads(Configuration conf) {
Configuration clonedConf = getIndexWriterConfigurationWithCustomThreads(conf);
// note in HBase 2+, numTries = numRetries + 1
// in prior versions, numTries = numRetries
clonedConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
return clonedConf;
}
}