blob: ece3373efaa9163ef54cd1f12313b5b150286126 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.query;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.hadoop.hbase.HColumnDescriptor.TTL;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.getVersion;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ENABLED;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS;
import static org.apache.phoenix.util.UpgradeUtil.getSysCatalogSnapshotName;
import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.concurrent.GuardedBy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.regionserver.IndexHalfStoreFileReaderGenerator;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.phoenix.compile.MutationPlan;
import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataEndpointImpl;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
import org.apache.phoenix.coprocessor.MetaDataRegionObserver;
import org.apache.phoenix.coprocessor.PhoenixTransactionalProcessor;
import org.apache.phoenix.coprocessor.ScanRegionObserver;
import org.apache.phoenix.coprocessor.SequenceRegionObserver;
import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl;
import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateFunctionRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateSchemaRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateTableRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropColumnRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropFunctionRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropSchemaRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropTableRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetFunctionsRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetSchemaRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetTableRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
import org.apache.phoenix.exception.PhoenixIOException;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.IndexRegionSplitPolicy;
import org.apache.phoenix.hbase.index.Indexer;
import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.hbase.index.util.VersionUtil;
import org.apache.phoenix.index.PhoenixIndexBuilder;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.index.PhoenixTransactionalIndexer;
import org.apache.phoenix.iterate.TableResultIterator;
import org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.parse.PSchema;
import org.apache.phoenix.protobuf.ProtobufUtil;
import org.apache.phoenix.schema.ColumnAlreadyExistsException;
import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
import org.apache.phoenix.schema.EmptySequenceCacheException;
import org.apache.phoenix.schema.FunctionNotFoundException;
import org.apache.phoenix.schema.MetaDataSplitPolicy;
import org.apache.phoenix.schema.NewerSchemaAlreadyExistsException;
import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnFamily;
import org.apache.phoenix.schema.PMetaData;
import org.apache.phoenix.schema.PMetaDataImpl;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PSynchronizedMetaData;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.ReadOnlyTableException;
import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.schema.Sequence;
import org.apache.phoenix.schema.SequenceAllocation;
import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.TableAlreadyExistsException;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableProperty;
import org.apache.phoenix.schema.stats.PTableStats;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.types.PUnsignedTinyint;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.ConfigUtil;
import org.apache.phoenix.util.JDBCUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixContextExecutor;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PhoenixStopWatch;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.UpgradeUtil;
import org.apache.tephra.TransactionSystemClient;
import org.apache.tephra.TxConstants;
import org.apache.tephra.distributed.PooledClientProvider;
import org.apache.tephra.distributed.TransactionServiceClient;
import org.apache.tephra.zookeeper.TephraZKClientService;
import org.apache.twill.discovery.ZKDiscoveryService;
import org.apache.twill.zookeeper.RetryStrategies;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.twill.zookeeper.ZKClientServices;
import org.apache.twill.zookeeper.ZKClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class ConnectionQueryServicesImpl extends DelegateQueryServices implements ConnectionQueryServices {
private static final Logger logger = LoggerFactory.getLogger(ConnectionQueryServicesImpl.class);
private static final int INITIAL_CHILD_SERVICES_CAPACITY = 100;
private static final int DEFAULT_OUT_OF_ORDER_MUTATIONS_WAIT_TIME_MS = 1000;
private static final int TTL_FOR_MUTEX = 15 * 60; // 15min
protected final Configuration config;
private final ConnectionInfo connectionInfo;
// Copy of config.getProps(), but read-only to prevent synchronization that we
// don't need.
private final ReadOnlyProps props;
private final String userName;
private final ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices> childServices;
private final TableStatsCache tableStatsCache;
// Cache the latest meta data here for future connections
// writes guarded by "latestMetaDataLock"
private volatile PMetaData latestMetaData;
private final Object latestMetaDataLock = new Object();
// Lowest HBase version on the cluster.
private int lowestClusterHBaseVersion = Integer.MAX_VALUE;
private boolean hasIndexWALCodec = true;
@GuardedBy("connectionCountLock")
private int connectionCount = 0;
private final Object connectionCountLock = new Object();
private final boolean returnSequenceValues ;
private HConnection connection;
private TransactionServiceClient txServiceClient;
private volatile boolean initialized;
private volatile int nSequenceSaltBuckets;
// writes guarded by "this"
private volatile boolean closed;
private volatile SQLException initializationException;
// setting this member variable guarded by "connectionCountLock"
private volatile ConcurrentMap<SequenceKey,Sequence> sequenceMap = Maps.newConcurrentMap();
private KeyValueBuilder kvBuilder;
private final int renewLeaseTaskFrequency;
private final int renewLeasePoolSize;
private final int renewLeaseThreshold;
// List of queues instead of a single queue to provide reduced contention via lock striping
private final List<LinkedBlockingQueue<WeakReference<PhoenixConnection>>> connectionQueues;
private ScheduledExecutorService renewLeaseExecutor;
private final boolean renewLeaseEnabled;
private static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes();
private static final byte[] UPGRADE_MUTEX_LOCKED = "UPGRADE_MUTEX_LOCKED".getBytes();
private static final byte[] UPGRADE_MUTEX_UNLOCKED = "UPGRADE_MUTEX_UNLOCKED".getBytes();
private static interface FeatureSupported {
boolean isSupported(ConnectionQueryServices services);
}
private final Map<Feature, FeatureSupported> featureMap = ImmutableMap.<Feature, FeatureSupported>of(
Feature.LOCAL_INDEX, new FeatureSupported() {
@Override
public boolean isSupported(ConnectionQueryServices services) {
int hbaseVersion = services.getLowestClusterHBaseVersion();
return hbaseVersion < PhoenixDatabaseMetaData.MIN_LOCAL_SI_VERSION_DISALLOW || hbaseVersion > PhoenixDatabaseMetaData.MAX_LOCAL_SI_VERSION_DISALLOW;
}
},
Feature.RENEW_LEASE, new FeatureSupported() {
@Override
public boolean isSupported(ConnectionQueryServices services) {
int hbaseVersion = services.getLowestClusterHBaseVersion();
return hbaseVersion >= PhoenixDatabaseMetaData.MIN_RENEW_LEASE_VERSION;
}
});
private PMetaData newEmptyMetaData() {
return new PSynchronizedMetaData(new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, getProps()));
}
/**
* Construct a ConnectionQueryServicesImpl that represents a connection to an HBase
* cluster.
* @param services base services from where we derive our default configuration
* @param connectionInfo to provide connection information
* @param info hbase configuration properties
* @throws SQLException
*/
public ConnectionQueryServicesImpl(QueryServices services, ConnectionInfo connectionInfo, Properties info) {
super(services);
Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
for (Entry<String,String> entry : services.getProps()) {
config.set(entry.getKey(), entry.getValue());
}
if (info != null) {
for (Object key : info.keySet()) {
config.set((String) key, info.getProperty((String) key));
}
}
for (Entry<String,String> entry : connectionInfo.asProps()) {
config.set(entry.getKey(), entry.getValue());
}
this.connectionInfo = connectionInfo;
// Without making a copy of the configuration we cons up, we lose some of our properties
// on the server side during testing.
this.config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config);
// set replication required parameter
ConfigUtil.setReplicationConfigIfAbsent(this.config);
this.props = new ReadOnlyProps(this.config.iterator());
this.userName = connectionInfo.getPrincipal();
this.latestMetaData = newEmptyMetaData();
// TODO: should we track connection wide memory usage or just org-wide usage?
// If connection-wide, create a MemoryManager here, otherwise just use the one from the delegate
this.childServices = new ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices>(INITIAL_CHILD_SERVICES_CAPACITY);
// find the HBase version and use that to determine the KeyValueBuilder that should be used
String hbaseVersion = VersionInfo.getVersion();
this.kvBuilder = KeyValueBuilder.get(hbaseVersion);
this.returnSequenceValues = props.getBoolean(QueryServices.RETURN_SEQUENCE_VALUES_ATTRIB, QueryServicesOptions.DEFAULT_RETURN_SEQUENCE_VALUES);
this.renewLeaseEnabled = config.getBoolean(RENEW_LEASE_ENABLED, DEFAULT_RENEW_LEASE_ENABLED);
this.renewLeasePoolSize = config.getInt(RENEW_LEASE_THREAD_POOL_SIZE, DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE);
this.renewLeaseThreshold = config.getInt(RENEW_LEASE_THRESHOLD_MILLISECONDS, DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS);
this.renewLeaseTaskFrequency = config.getInt(RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS, DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS);
List<LinkedBlockingQueue<WeakReference<PhoenixConnection>>> list = Lists.newArrayListWithCapacity(renewLeasePoolSize);
for (int i = 0; i < renewLeasePoolSize; i++) {
LinkedBlockingQueue<WeakReference<PhoenixConnection>> queue = new LinkedBlockingQueue<WeakReference<PhoenixConnection>>();
list.add(queue);
}
connectionQueues = ImmutableList.copyOf(list);
// A little bit of a smell to leak `this` here, but should not be a problem
this.tableStatsCache = new TableStatsCache(this, config);
}
@Override
public TransactionSystemClient getTransactionSystemClient() {
return txServiceClient;
}
private void initTxServiceClient() {
String zkQuorumServersString = this.getProps().get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM);
if (zkQuorumServersString==null) {
zkQuorumServersString = connectionInfo.getZookeeperQuorum()+":"+connectionInfo.getPort();
}
int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
// Create instance of the tephra zookeeper client
ZKClientService tephraZKClientService = new TephraZKClientService(zkQuorumServersString, timeOut, null, ArrayListMultimap.<String, byte[]>create());
ZKClientService zkClientService = ZKClientServices.delegate(
ZKClients.reWatchOnExpire(
ZKClients.retryOnFailure(tephraZKClientService, RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
)
);
zkClientService.startAndWait();
ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService);
PooledClientProvider pooledClientProvider = new PooledClientProvider(
config, zkDiscoveryService);
this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider);
}
private void openConnection() throws SQLException {
try {
boolean transactionsEnabled = props.getBoolean(
QueryServices.TRANSACTIONS_ENABLED,
QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
// only initialize the tx service client if needed
if (transactionsEnabled) {
initTxServiceClient();
}
this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
} catch (IOException e) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
.setRootCause(e).build().buildException();
}
if (this.connection.isClosed()) { // TODO: why the heck doesn't this throw above?
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION).build().buildException();
}
}
@Override
public HTableInterface getTable(byte[] tableName) throws SQLException {
try {
return HBaseFactoryProvider.getHTableFactory().getTable(tableName, connection, null);
} catch (org.apache.hadoop.hbase.TableNotFoundException e) {
throw new TableNotFoundException(SchemaUtil.getSchemaNameFromFullName(tableName), SchemaUtil.getTableNameFromFullName(tableName));
} catch (IOException e) {
throw new SQLException(e);
}
}
@Override
public HTableDescriptor getTableDescriptor(byte[] tableName) throws SQLException {
HTableInterface htable = getTable(tableName);
try {
return htable.getTableDescriptor();
} catch (IOException e) {
if(e instanceof org.apache.hadoop.hbase.TableNotFoundException
|| e.getCause() instanceof org.apache.hadoop.hbase.TableNotFoundException) {
byte[][] schemaAndTableName = new byte[2][];
SchemaUtil.getVarChars(tableName, schemaAndTableName);
throw new TableNotFoundException(Bytes.toString(schemaAndTableName[0]), Bytes.toString(schemaAndTableName[1]));
}
throw new RuntimeException(e);
} finally {
Closeables.closeQuietly(htable);
}
}
@Override
public ReadOnlyProps getProps() {
return props;
}
/**
* Closes the underlying connection to zookeeper. The QueryServices
* may not be used after that point. When a Connection is closed,
* this is not called, since these instances are pooled by the
* Driver. Instead, the Driver should call this if the QueryServices
* is ever removed from the pool
*/
@Override
public void close() throws SQLException {
if (closed) {
return;
}
synchronized (this) {
if (closed) {
return;
}
closed = true;
SQLException sqlE = null;
try {
// Attempt to return any unused sequences.
if (connection != null) returnAllSequences(this.sequenceMap);
} catch (SQLException e) {
sqlE = e;
} finally {
try {
childServices.clear();
if (renewLeaseExecutor != null) {
renewLeaseExecutor.shutdownNow();
}
synchronized (latestMetaDataLock) {
latestMetaData = null;
latestMetaDataLock.notifyAll();
}
if (connection != null) connection.close();
} catch (IOException e) {
if (sqlE == null) {
sqlE = ServerUtil.parseServerException(e);
} else {
sqlE.setNextException(ServerUtil.parseServerException(e));
}
} finally {
try {
tableStatsCache.invalidateAll();
super.close();
} catch (SQLException e) {
if (sqlE == null) {
sqlE = e;
} else {
sqlE.setNextException(e);
}
} finally {
if (sqlE != null) { throw sqlE; }
}
}
}
}
}
protected ConnectionQueryServices newChildQueryService() {
return new ChildQueryServices(this);
}
/**
* Get (and create if necessary) a child QueryService for a given tenantId.
* The QueryService will be cached for the lifetime of the parent QueryService
* @param tenantId the tenant ID
* @return the child QueryService
*/
@Override
public ConnectionQueryServices getChildQueryServices(ImmutableBytesWritable tenantId) {
ConnectionQueryServices childQueryService = childServices.get(tenantId);
if (childQueryService == null) {
childQueryService = newChildQueryService();
ConnectionQueryServices prevQueryService = childServices.putIfAbsent(tenantId, childQueryService);
return prevQueryService == null ? childQueryService : prevQueryService;
}
return childQueryService;
}
@Override
public void clearTableRegionCache(byte[] tableName) throws SQLException {
connection.clearRegionCache(TableName.valueOf(tableName));
}
@Override
public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException {
/*
* Use HConnection.getRegionLocation as it uses the cache in HConnection, while getting
* all region locations from the HTable doesn't.
*/
int retryCount = 0, maxRetryCount = 1;
boolean reload =false;
while (true) {
try {
// We could surface the package projected HConnectionImplementation.getNumberOfCachedRegionLocations
// to get the sizing info we need, but this would require a new class in the same package and a cast
// to this implementation class, so it's probably not worth it.
List<HRegionLocation> locations = Lists.newArrayList();
byte[] currentKey = HConstants.EMPTY_START_ROW;
do {
HRegionLocation regionLocation = connection.getRegionLocation(
TableName.valueOf(tableName), currentKey, reload);
locations.add(regionLocation);
currentKey = regionLocation.getRegionInfo().getEndKey();
} while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW));
return locations;
} catch (org.apache.hadoop.hbase.TableNotFoundException e) {
String fullName = Bytes.toString(tableName);
throw new TableNotFoundException(fullName);
} catch (IOException e) {
if (retryCount++ < maxRetryCount) { // One retry, in case split occurs while navigating
reload = true;
continue;
}
throw new SQLExceptionInfo.Builder(SQLExceptionCode.GET_TABLE_REGIONS_FAIL)
.setRootCause(e).build().buildException();
}
}
}
@Override
public void addTable(PTable table, long resolvedTime) throws SQLException {
synchronized (latestMetaDataLock) {
try {
throwConnectionClosedIfNullMetaData();
// If existing table isn't older than new table, don't replace
// If a client opens a connection at an earlier timestamp, this can happen
PTable existingTable = latestMetaData.getTableRef(new PTableKey(table.getTenantId(), table.getName().getString())).getTable();
if (existingTable.getTimeStamp() >= table.getTimeStamp()) {
return;
}
} catch (TableNotFoundException e) {}
latestMetaData.addTable(table, resolvedTime);
latestMetaDataLock.notifyAll();
}
}
@Override
public void updateResolvedTimestamp(PTable table, long resolvedTime) throws SQLException {
synchronized (latestMetaDataLock) {
throwConnectionClosedIfNullMetaData();
latestMetaData.updateResolvedTimestamp(table, resolvedTime);
latestMetaDataLock.notifyAll();
}
}
private static interface Mutator {
void mutate(PMetaData metaData) throws SQLException;
}
/**
* Ensures that metaData mutations are handled in the correct order
*/
private PMetaData metaDataMutated(PName tenantId, String tableName, long tableSeqNum, Mutator mutator) throws SQLException {
synchronized (latestMetaDataLock) {
throwConnectionClosedIfNullMetaData();
PMetaData metaData = latestMetaData;
PTable table;
long endTime = System.currentTimeMillis() + DEFAULT_OUT_OF_ORDER_MUTATIONS_WAIT_TIME_MS;
while (true) {
try {
try {
table = metaData.getTableRef(new PTableKey(tenantId, tableName)).getTable();
/* If the table is at the prior sequence number, then we're good to go.
* We know if we've got this far, that the server validated the mutations,
* so we'd just need to wait until the other connection that mutated the same
* table is processed.
*/
if (table.getSequenceNumber() + 1 == tableSeqNum) {
// TODO: assert that timeStamp is bigger that table timeStamp?
mutator.mutate(metaData);
break;
} else if (table.getSequenceNumber() >= tableSeqNum) {
logger.warn("Attempt to cache older version of " + tableName + ": current= " + table.getSequenceNumber() + ", new=" + tableSeqNum);
break;
}
} catch (TableNotFoundException e) {
}
long waitTime = endTime - System.currentTimeMillis();
// We waited long enough - just remove the table from the cache
// and the next time it's used it'll be pulled over from the server.
if (waitTime <= 0) {
logger.warn("Unable to update meta data repo within " + (DEFAULT_OUT_OF_ORDER_MUTATIONS_WAIT_TIME_MS/1000) + " seconds for " + tableName);
// There will never be a parentTableName here, as that would only
// be non null for an index an we never add/remove columns from an index.
metaData.removeTable(tenantId, tableName, null, HConstants.LATEST_TIMESTAMP);
break;
}
latestMetaDataLock.wait(waitTime);
} catch (InterruptedException e) {
// restore the interrupt status
Thread.currentThread().interrupt();
throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION)
.setRootCause(e).build().buildException(); // FIXME
}
}
latestMetaData = metaData;
latestMetaDataLock.notifyAll();
return metaData;
}
}
@Override
public void addColumn(final PName tenantId, final String tableName, final List<PColumn> columns,
final long tableTimeStamp, final long tableSeqNum, final boolean isImmutableRows,
final boolean isWalDisabled, final boolean isMultitenant, final boolean storeNulls,
final boolean isTransactional, final long updateCacheFrequency, final boolean isNamespaceMapped,
final long resolvedTime) throws SQLException {
metaDataMutated(tenantId, tableName, tableSeqNum, new Mutator() {
@Override
public void mutate(PMetaData metaData) throws SQLException {
try {
metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum,
isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional,
updateCacheFrequency, isNamespaceMapped, resolvedTime);
} catch (TableNotFoundException e) {
// The DROP TABLE may have been processed first, so just ignore.
}
}
});
}
@Override
public void removeTable(PName tenantId, final String tableName, String parentTableName, long tableTimeStamp) throws SQLException {
synchronized (latestMetaDataLock) {
throwConnectionClosedIfNullMetaData();
latestMetaData.removeTable(tenantId, tableName, parentTableName, tableTimeStamp);
latestMetaDataLock.notifyAll();
}
}
@Override
public void removeColumn(final PName tenantId, final String tableName, final List<PColumn> columnsToRemove, final long tableTimeStamp, final long tableSeqNum, final long resolvedTime) throws SQLException {
metaDataMutated(tenantId, tableName, tableSeqNum, new Mutator() {
@Override
public void mutate(PMetaData metaData) throws SQLException {
try {
metaData.removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime);
} catch (TableNotFoundException e) {
// The DROP TABLE may have been processed first, so just ignore.
}
}
});
}
@Override
public PhoenixConnection connect(String url, Properties info) throws SQLException {
checkClosed();
PMetaData metadata = latestMetaData;
throwConnectionClosedIfNullMetaData();
metadata = metadata.clone();
return new PhoenixConnection(this, url, info, metadata);
}
private HColumnDescriptor generateColumnFamilyDescriptor(Pair<byte[],Map<String,Object>> family, PTableType tableType) throws SQLException {
HColumnDescriptor columnDesc = new HColumnDescriptor(family.getFirst());
if (tableType != PTableType.VIEW) {
if(props.get(QueryServices.DEFAULT_KEEP_DELETED_CELLS_ATTRIB) != null){
columnDesc.setKeepDeletedCells(props.getBoolean(
QueryServices.DEFAULT_KEEP_DELETED_CELLS_ATTRIB, QueryServicesOptions.DEFAULT_KEEP_DELETED_CELLS));
}
columnDesc.setDataBlockEncoding(SchemaUtil.DEFAULT_DATA_BLOCK_ENCODING);
for (Entry<String,Object> entry : family.getSecond().entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
setHColumnDescriptorValue(columnDesc, key, value);
}
}
return columnDesc;
}
// Workaround HBASE-14737
private static void setHColumnDescriptorValue(HColumnDescriptor columnDesc, String key, Object value) {
if (HConstants.VERSIONS.equals(key)) {
columnDesc.setMaxVersions(getMaxVersion(value));
} else {
columnDesc.setValue(key, value == null ? null : value.toString());
}
}
private static int getMaxVersion(Object value) {
if (value == null) {
return -1; // HColumnDescriptor.UNINITIALIZED is private
}
if (value instanceof Number) {
return ((Number)value).intValue();
}
String stringValue = value.toString();
if (stringValue.isEmpty()) {
return -1;
}
return Integer.parseInt(stringValue);
}
private void modifyColumnFamilyDescriptor(HColumnDescriptor hcd, Map<String,Object> props) throws SQLException {
for (Entry<String, Object> entry : props.entrySet()) {
String propName = entry.getKey();
Object value = entry.getValue();
setHColumnDescriptorValue(hcd, propName, value);
}
}
private HTableDescriptor generateTableDescriptor(byte[] tableName, HTableDescriptor existingDesc,
PTableType tableType, Map<String, Object> tableProps, List<Pair<byte[], Map<String, Object>>> families,
byte[][] splits, boolean isNamespaceMapped) throws SQLException {
String defaultFamilyName = (String)tableProps.remove(PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME);
HTableDescriptor tableDescriptor = (existingDesc != null) ? new HTableDescriptor(existingDesc)
: new HTableDescriptor(
SchemaUtil.getPhysicalHBaseTableName(tableName, isNamespaceMapped, tableType).getBytes());
for (Entry<String,Object> entry : tableProps.entrySet()) {
String key = entry.getKey();
if (!TableProperty.isPhoenixTableProperty(key)) {
Object value = entry.getValue();
tableDescriptor.setValue(key, value == null ? null : value.toString());
}
}
if (families.isEmpty()) {
if (tableType != PTableType.VIEW) {
byte[] defaultFamilyByes = defaultFamilyName == null ? QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES : Bytes.toBytes(defaultFamilyName);
// Add dummy column family so we have key values for tables that
HColumnDescriptor columnDescriptor = generateColumnFamilyDescriptor(new Pair<byte[],Map<String,Object>>(defaultFamilyByes,Collections.<String,Object>emptyMap()), tableType);
tableDescriptor.addFamily(columnDescriptor);
}
} else {
for (Pair<byte[],Map<String,Object>> family : families) {
// If family is only in phoenix description, add it. otherwise, modify its property accordingly.
byte[] familyByte = family.getFirst();
if (tableDescriptor.getFamily(familyByte) == null) {
if (tableType == PTableType.VIEW) {
String fullTableName = Bytes.toString(tableName);
throw new ReadOnlyTableException(
"The HBase column families for a read-only table must already exist",
SchemaUtil.getSchemaNameFromFullName(fullTableName),
SchemaUtil.getTableNameFromFullName(fullTableName),
Bytes.toString(familyByte));
}
HColumnDescriptor columnDescriptor = generateColumnFamilyDescriptor(family, tableType);
tableDescriptor.addFamily(columnDescriptor);
} else {
if (tableType != PTableType.VIEW) {
HColumnDescriptor columnDescriptor = tableDescriptor.getFamily(familyByte);
if (columnDescriptor == null) {
throw new IllegalArgumentException("Unable to find column descriptor with family name " + Bytes.toString(family.getFirst()));
}
modifyColumnFamilyDescriptor(columnDescriptor, family.getSecond());
}
}
}
}
addCoprocessors(tableName, tableDescriptor, tableType, tableProps);
// PHOENIX-3072: Set index priority if this is a system table or index table
if (tableType == PTableType.SYSTEM) {
tableDescriptor.setValue(QueryConstants.PRIORITY,
String.valueOf(PhoenixRpcSchedulerFactory.getMetadataPriority(config)));
} else if (tableType == PTableType.INDEX // Global, mutable index
&& !isLocalIndexTable(tableDescriptor.getFamiliesKeys())
&& !Boolean.TRUE.equals(tableProps.get(PhoenixDatabaseMetaData.IMMUTABLE_ROWS))) {
tableDescriptor.setValue(QueryConstants.PRIORITY,
String.valueOf(PhoenixRpcSchedulerFactory.getIndexPriority(config)));
}
return tableDescriptor;
}
private boolean isLocalIndexTable(Collection<byte[]> families) {
// no easier way to know local index table?
for (byte[] family: families) {
if (Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
return true;
}
}
return false;
}
private void addCoprocessors(byte[] tableName, HTableDescriptor descriptor, PTableType tableType, Map<String,Object> tableProps) throws SQLException {
// The phoenix jar must be available on HBase classpath
int priority = props.getInt(QueryServices.COPROCESSOR_PRIORITY_ATTRIB, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY);
try {
if (!descriptor.hasCoprocessor(ScanRegionObserver.class.getName())) {
descriptor.addCoprocessor(ScanRegionObserver.class.getName(), null, priority, null);
}
if (!descriptor.hasCoprocessor(UngroupedAggregateRegionObserver.class.getName())) {
descriptor.addCoprocessor(UngroupedAggregateRegionObserver.class.getName(), null, priority, null);
}
if (!descriptor.hasCoprocessor(GroupedAggregateRegionObserver.class.getName())) {
descriptor.addCoprocessor(GroupedAggregateRegionObserver.class.getName(), null, priority, null);
}
if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) {
descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, priority, null);
}
boolean isTransactional =
Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) ||
Boolean.TRUE.equals(tableProps.get(TxConstants.READ_NON_TX_DATA)); // For ALTER TABLE
// TODO: better encapsulation for this
// Since indexes can't have indexes, don't install our indexing coprocessor for indexes.
// Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table because we use
// all-or-none mutate class which break when this coprocessor is installed (PHOENIX-1318).
if ((tableType != PTableType.INDEX && tableType != PTableType.VIEW)
&& !SchemaUtil.isMetaTable(tableName)
&& !SchemaUtil.isStatsTable(tableName)) {
if (isTransactional) {
if (!descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) {
descriptor.addCoprocessor(PhoenixTransactionalIndexer.class.getName(), null, priority, null);
}
// For alter table, remove non transactional index coprocessor
if (descriptor.hasCoprocessor(Indexer.class.getName())) {
descriptor.removeCoprocessor(Indexer.class.getName());
}
} else {
if (!descriptor.hasCoprocessor(Indexer.class.getName())) {
// If exception on alter table to transition back to non transactional
if (descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) {
descriptor.removeCoprocessor(PhoenixTransactionalIndexer.class.getName());
}
Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
}
}
}
if (SchemaUtil.isStatsTable(tableName) && !descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName())) {
descriptor.addCoprocessor(MultiRowMutationEndpoint.class.getName(),
null, priority, null);
}
Set<byte[]> familiesKeys = descriptor.getFamiliesKeys();
for(byte[] family: familiesKeys) {
if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
if (!descriptor.hasCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName())) {
descriptor.addCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName(),
null, priority, null);
break;
}
}
}
// Setup split policy on Phoenix metadata table to ensure that the key values of a Phoenix table
// stay on the same region.
if (SchemaUtil.isMetaTable(tableName) || SchemaUtil.isFunctionTable(tableName)) {
if (!descriptor.hasCoprocessor(MetaDataEndpointImpl.class.getName())) {
descriptor.addCoprocessor(MetaDataEndpointImpl.class.getName(), null, priority, null);
}
if(SchemaUtil.isMetaTable(tableName) ) {
if (!descriptor.hasCoprocessor(MetaDataRegionObserver.class.getName())) {
descriptor.addCoprocessor(MetaDataRegionObserver.class.getName(), null, priority + 1, null);
}
}
} else if (SchemaUtil.isSequenceTable(tableName)) {
if (!descriptor.hasCoprocessor(SequenceRegionObserver.class.getName())) {
descriptor.addCoprocessor(SequenceRegionObserver.class.getName(), null, priority, null);
}
}
if (isTransactional) {
if (!descriptor.hasCoprocessor(PhoenixTransactionalProcessor.class.getName())) {
descriptor.addCoprocessor(PhoenixTransactionalProcessor.class.getName(), null, priority - 10, null);
}
} else {
// If exception on alter table to transition back to non transactional
if (descriptor.hasCoprocessor(PhoenixTransactionalProcessor.class.getName())) {
descriptor.removeCoprocessor(PhoenixTransactionalProcessor.class.getName());
}
}
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
}
}
private static interface RetriableOperation {
boolean checkForCompletion() throws TimeoutException, IOException;
String getOperationName();
}
private void pollForUpdatedTableDescriptor(final HBaseAdmin admin, final HTableDescriptor newTableDescriptor,
final byte[] tableName) throws InterruptedException, TimeoutException {
checkAndRetry(new RetriableOperation() {
@Override
public String getOperationName() {
return "UpdateOrNewTableDescriptor";
}
@Override
public boolean checkForCompletion() throws TimeoutException, IOException {
HTableDescriptor tableDesc = admin.getTableDescriptor(tableName);
return newTableDescriptor.equals(tableDesc);
}
});
}
private void checkAndRetry(RetriableOperation op) throws InterruptedException, TimeoutException {
int maxRetries = ConnectionQueryServicesImpl.this.props.getInt(
QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK,
QueryServicesOptions.DEFAULT_RETRIES_FOR_SCHEMA_UPDATE_CHECK);
long sleepInterval = ConnectionQueryServicesImpl.this.props
.getLong(QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK,
QueryServicesOptions.DEFAULT_DELAY_FOR_SCHEMA_UPDATE_CHECK);
boolean success = false;
int numTries = 1;
PhoenixStopWatch watch = new PhoenixStopWatch();
watch.start();
do {
try {
success = op.checkForCompletion();
} catch (Exception ex) {
// If we encounter any exception on the first or last try, propagate the exception and fail.
// Else, we swallow the exception and retry till we reach maxRetries.
if (numTries == 1 || numTries == maxRetries) {
watch.stop();
TimeoutException toThrow = new TimeoutException("Operation " + op.getOperationName()
+ " didn't complete because of exception. Time elapsed: " + watch.elapsedMillis());
toThrow.initCause(ex);
throw toThrow;
}
}
numTries++;
Thread.sleep(sleepInterval);
} while (numTries < maxRetries && !success);
watch.stop();
if (!success) {
throw new TimeoutException("Operation " + op.getOperationName() + " didn't complete within "
+ watch.elapsedMillis() + " ms "
+ (numTries > 1 ? ("after trying " + numTries + (numTries > 1 ? "times." : "time.")) : ""));
} else {
if (logger.isDebugEnabled()) {
logger.debug("Operation "
+ op.getOperationName()
+ " completed within "
+ watch.elapsedMillis()
+ "ms "
+ (numTries > 1 ? ("after trying " + numTries + (numTries > 1 ? "times." : "time.")) : ""));
}
}
}
private boolean allowOnlineTableSchemaUpdate() {
return props.getBoolean(
QueryServices.ALLOW_ONLINE_TABLE_SCHEMA_UPDATE,
QueryServicesOptions.DEFAULT_ALLOW_ONLINE_TABLE_SCHEMA_UPDATE);
}
private NamespaceDescriptor ensureNamespaceCreated(String schemaName) throws SQLException {
SQLException sqlE = null;
try (HBaseAdmin admin = getAdmin()) {
NamespaceDescriptor namespaceDescriptor = null;
try {
namespaceDescriptor = admin.getNamespaceDescriptor(schemaName);
} catch (org.apache.hadoop.hbase.NamespaceNotFoundException e) {
}
if (namespaceDescriptor == null) {
namespaceDescriptor = NamespaceDescriptor.create(schemaName).build();
admin.createNamespace(namespaceDescriptor);
}
return namespaceDescriptor;
} catch (IOException e) {
sqlE = ServerUtil.parseServerException(e);
} finally {
if (sqlE != null) { throw sqlE; }
}
return null; // will never make it here
}
/**
*
* @param tableName
* @param splits
* @param modifyExistingMetaData TODO
* @return true if table was created and false if it already exists
* @throws SQLException
*/
private HTableDescriptor ensureTableCreated(byte[] tableName, PTableType tableType, Map<String, Object> props,
List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, boolean modifyExistingMetaData,
boolean isNamespaceMapped) throws SQLException {
SQLException sqlE = null;
HTableDescriptor existingDesc = null;
boolean isMetaTable = SchemaUtil.isMetaTable(tableName);
byte[] physicalTable = SchemaUtil.getPhysicalHBaseTableName(tableName, isNamespaceMapped, tableType).getBytes();
boolean tableExist = true;
try (HBaseAdmin admin = getAdmin()) {
final String quorum = ZKConfig.getZKQuorumServersString(config);
final String znode = this.props.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
logger.debug("Found quorum: " + quorum + ":" + znode);
try {
existingDesc = admin.getTableDescriptor(physicalTable);
} catch (org.apache.hadoop.hbase.TableNotFoundException e) {
tableExist = false;
if (tableType == PTableType.VIEW) {
String fullTableName = Bytes.toString(tableName);
throw new ReadOnlyTableException(
"An HBase table for a VIEW must already exist",
SchemaUtil.getSchemaNameFromFullName(fullTableName),
SchemaUtil.getTableNameFromFullName(fullTableName));
}
}
HTableDescriptor newDesc = generateTableDescriptor(tableName, existingDesc, tableType, props, families,
splits, isNamespaceMapped);
if (!tableExist) {
if (newDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) != null && Boolean.TRUE.equals(
PBoolean.INSTANCE.toObject(newDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) {
newDesc.setValue(HTableDescriptor.SPLIT_POLICY, IndexRegionSplitPolicy.class.getName());
}
// Remove the splitPolicy attribute to prevent HBASE-12570
if (isMetaTable) {
newDesc.remove(HTableDescriptor.SPLIT_POLICY);
}
try {
if (splits == null) {
admin.createTable(newDesc);
} else {
admin.createTable(newDesc, splits);
}
} catch (TableExistsException e) {
// We can ignore this, as it just means that another client beat us
// to creating the HBase metadata.
return null;
}
if (isMetaTable) {
checkClientServerCompatibility(SchemaUtil.getPhysicalName(SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getName());
/*
* Now we modify the table to add the split policy, since we know that the client and
* server and compatible. This works around HBASE-12570 which causes the cluster to be
* brought down.
*/
newDesc.setValue(HTableDescriptor.SPLIT_POLICY, MetaDataSplitPolicy.class.getName());
modifyTable(physicalTable, newDesc, true);
}
return null;
} else {
if (isMetaTable) {
checkClientServerCompatibility(SchemaUtil.getPhysicalName(SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getName());
} else {
for(Pair<byte[],Map<String,Object>> family: families) {
if ((newDesc.getValue(HTableDescriptor.SPLIT_POLICY)==null || !newDesc.getValue(HTableDescriptor.SPLIT_POLICY).equals(
IndexRegionSplitPolicy.class.getName()))
&& Bytes.toString(family.getFirst()).startsWith(
QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
newDesc.setValue(HTableDescriptor.SPLIT_POLICY, IndexRegionSplitPolicy.class.getName());
break;
}
}
}
if (!modifyExistingMetaData) {
return existingDesc; // Caller already knows that no metadata was changed
}
boolean willBeTx = Boolean.TRUE.equals(props.get(TableProperty.TRANSACTIONAL.name()));
// If mapping an existing table as transactional, set property so that existing
// data is correctly read.
if (willBeTx) {
newDesc.setValue(TxConstants.READ_NON_TX_DATA, Boolean.TRUE.toString());
} else {
// If we think we're creating a non transactional table when it's already
// transactional, don't allow.
if (existingDesc.hasCoprocessor(PhoenixTransactionalProcessor.class.getName())) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX)
.setSchemaName(SchemaUtil.getSchemaNameFromFullName(tableName))
.setTableName(SchemaUtil.getTableNameFromFullName(tableName)).build().buildException();
}
newDesc.remove(TxConstants.READ_NON_TX_DATA);
}
if (existingDesc.equals(newDesc)) {
return null; // Indicate that no metadata was changed
}
modifyTable(physicalTable, newDesc, true);
return newDesc;
}
} catch (IOException e) {
sqlE = ServerUtil.parseServerException(e);
} catch (InterruptedException e) {
// restore the interrupt status
Thread.currentThread().interrupt();
sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException();
} catch (TimeoutException e) {
sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT).setRootCause(e.getCause() != null ? e.getCause() : e).build().buildException();
} finally {
if (sqlE != null) {
throw sqlE;
}
}
return null; // will never make it here
}
private void modifyTable(byte[] tableName, HTableDescriptor newDesc, boolean shouldPoll) throws IOException,
InterruptedException, TimeoutException, SQLException {
try (HBaseAdmin admin = getAdmin()) {
if (!allowOnlineTableSchemaUpdate()) {
admin.disableTable(tableName);
admin.modifyTable(tableName, newDesc);
admin.enableTable(tableName);
} else {
admin.modifyTable(tableName, newDesc);
if (shouldPoll) {
pollForUpdatedTableDescriptor(admin, newDesc, tableName);
}
}
}
}
private static boolean hasIndexWALCodec(Long serverVersion) {
if (serverVersion == null) {
return true;
}
return MetaDataUtil.decodeHasIndexWALCodec(serverVersion);
}
private static boolean isCompatible(Long serverVersion) {
if (serverVersion == null) {
return false;
}
return MetaDataUtil.areClientAndServerCompatible(serverVersion);
}
private void checkClientServerCompatibility(byte[] metaTable) throws SQLException {
StringBuilder buf = new StringBuilder("The following servers require an updated " + QueryConstants.DEFAULT_COPROCESS_PATH + " to be put in the classpath of HBase: ");
boolean isIncompatible = false;
int minHBaseVersion = Integer.MAX_VALUE;
boolean isTableNamespaceMappingEnabled = false;
HTableInterface ht = null;
try {
List<HRegionLocation> locations = this
.getAllTableRegions(metaTable);
Set<HRegionLocation> serverMap = Sets.newHashSetWithExpectedSize(locations.size());
TreeMap<byte[], HRegionLocation> regionMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
List<byte[]> regionKeys = Lists.newArrayListWithExpectedSize(locations.size());
for (HRegionLocation entry : locations) {
if (!serverMap.contains(entry)) {
regionKeys.add(entry.getRegionInfo().getStartKey());
regionMap.put(entry.getRegionInfo().getRegionName(), entry);
serverMap.add(entry);
}
}
ht = this.getTable(metaTable);
final Map<byte[], Long> results =
ht.coprocessorService(MetaDataService.class, null, null, new Batch.Call<MetaDataService,Long>() {
@Override
public Long call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<GetVersionResponse> rpcCallback =
new BlockingRpcCallback<GetVersionResponse>();
GetVersionRequest.Builder builder = GetVersionRequest.newBuilder();
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.getVersion(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get().getVersion();
}
});
for (Map.Entry<byte[],Long> result : results.entrySet()) {
// This is the "phoenix.jar" is in-place, but server is out-of-sync with client case.
long version = result.getValue();
isTableNamespaceMappingEnabled |= MetaDataUtil.decodeTableNamespaceMappingEnabled(version);
if (!isCompatible(result.getValue())) {
isIncompatible = true;
HRegionLocation name = regionMap.get(result.getKey());
buf.append(name);
buf.append(';');
}
hasIndexWALCodec &= hasIndexWALCodec(result.getValue());
if (minHBaseVersion > MetaDataUtil.decodeHBaseVersion(result.getValue())) {
minHBaseVersion = MetaDataUtil.decodeHBaseVersion(result.getValue());
}
}
if (isTableNamespaceMappingEnabled != SchemaUtil.isNamespaceMappingEnabled(PTableType.TABLE,
getProps())) { throw new SQLExceptionInfo.Builder(
SQLExceptionCode.INCONSISTENET_NAMESPACE_MAPPING_PROPERTIES)
.setMessage(
"Ensure that config " + QueryServices.IS_NAMESPACE_MAPPING_ENABLED
+ " is consitent on client and server.")
.build().buildException(); }
lowestClusterHBaseVersion = minHBaseVersion;
} catch (SQLException e) {
throw e;
} catch (Throwable t) {
// This is the case if the "phoenix.jar" is not on the classpath of HBase on the region server
throw new SQLExceptionInfo.Builder(SQLExceptionCode.INCOMPATIBLE_CLIENT_SERVER_JAR).setRootCause(t)
.setMessage("Ensure that " + QueryConstants.DEFAULT_COPROCESS_PATH + " is put on the classpath of HBase in every region server: " + t.getMessage())
.build().buildException();
} finally {
if (ht != null) {
try {
ht.close();
} catch (IOException e) {
logger.warn("Could not close HTable", e);
}
}
}
if (isIncompatible) {
buf.setLength(buf.length()-1);
throw new SQLExceptionInfo.Builder(SQLExceptionCode.OUTDATED_JARS).setMessage(buf.toString()).build().buildException();
}
}
/**
* Invoke meta data coprocessor with one retry if the key was found to not be in the regions
* (due to a table split)
*/
private MetaDataMutationResult metaDataCoprocessorExec(byte[] tableKey,
Batch.Call<MetaDataService, MetaDataResponse> callable) throws SQLException {
return metaDataCoprocessorExec(tableKey, callable, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
}
/**
* Invoke meta data coprocessor with one retry if the key was found to not be in the regions
* (due to a table split)
*/
private MetaDataMutationResult metaDataCoprocessorExec(byte[] tableKey,
Batch.Call<MetaDataService, MetaDataResponse> callable, byte[] tableName) throws SQLException {
try {
boolean retried = false;
while (true) {
if (retried) {
connection.relocateRegion(SchemaUtil.getPhysicalName(tableName, this.getProps()), tableKey);
}
HTableInterface ht = this.getTable(SchemaUtil.getPhysicalName(tableName, this.getProps()).getName());
try {
final Map<byte[], MetaDataResponse> results =
ht.coprocessorService(MetaDataService.class, tableKey, tableKey, callable);
assert(results.size() == 1);
MetaDataResponse result = results.values().iterator().next();
if (result.getReturnCode() == MetaDataProtos.MutationCode.TABLE_NOT_IN_REGION
|| result.getReturnCode() == MetaDataProtos.MutationCode.FUNCTION_NOT_IN_REGION) {
if (retried) return MetaDataMutationResult.constructFromProto(result);
retried = true;
continue;
}
return MetaDataMutationResult.constructFromProto(result);
} finally {
Closeables.closeQuietly(ht);
}
}
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
} catch (Throwable t) {
throw new SQLException(t);
}
}
// Our property values are translated using toString, so we need to "string-ify" this.
private static final String TRUE_BYTES_AS_STRING = Bytes.toString(PDataType.TRUE_BYTES);
private void ensureViewIndexTableCreated(byte[] physicalTableName, Map<String, Object> tableProps,
List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, long timestamp,
boolean isNamespaceMapped) throws SQLException {
byte[] physicalIndexName = MetaDataUtil.getViewIndexPhysicalName(physicalTableName);
tableProps.put(MetaDataUtil.IS_VIEW_INDEX_TABLE_PROP_NAME, TRUE_BYTES_AS_STRING);
HTableDescriptor desc = ensureTableCreated(physicalIndexName, PTableType.TABLE, tableProps, families, splits,
false, isNamespaceMapped);
if (desc != null) {
if (!Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(desc.getValue(MetaDataUtil.IS_VIEW_INDEX_TABLE_PROP_BYTES)))) {
String fullTableName = Bytes.toString(physicalIndexName);
throw new TableAlreadyExistsException(
"Unable to create shared physical table for indexes on views.",
SchemaUtil.getSchemaNameFromFullName(fullTableName),
SchemaUtil.getTableNameFromFullName(fullTableName));
}
}
}
private boolean ensureViewIndexTableDropped(byte[] physicalTableName, long timestamp) throws SQLException {
byte[] physicalIndexName = MetaDataUtil.getViewIndexPhysicalName(physicalTableName);
HTableDescriptor desc = null;
boolean wasDeleted = false;
try (HBaseAdmin admin = getAdmin()) {
try {
desc = admin.getTableDescriptor(physicalIndexName);
if (Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(desc.getValue(MetaDataUtil.IS_VIEW_INDEX_TABLE_PROP_BYTES)))) {
this.tableStatsCache.invalidate(new ImmutableBytesPtr(physicalIndexName));
final ReadOnlyProps props = this.getProps();
final boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
if (dropMetadata) {
admin.disableTable(physicalIndexName);
admin.deleteTable(physicalIndexName);
clearTableRegionCache(physicalIndexName);
wasDeleted = true;
}
}
} catch (org.apache.hadoop.hbase.TableNotFoundException ignore) {
// Ignore, as we may never have created a view index table
}
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
}
return wasDeleted;
}
private boolean ensureLocalIndexTableDropped(byte[] physicalTableName, long timestamp) throws SQLException {
HTableDescriptor desc = null;
boolean wasDeleted = false;
try (HBaseAdmin admin = getAdmin()) {
try {
desc = admin.getTableDescriptor(physicalTableName);
this.tableStatsCache.invalidate(new ImmutableBytesPtr(physicalTableName));
final ReadOnlyProps props = this.getProps();
final boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
if (dropMetadata) {
List<String> columnFamiles = new ArrayList<String>();
for(HColumnDescriptor cf : desc.getColumnFamilies()) {
if(cf.getNameAsString().startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
columnFamiles.add(cf.getNameAsString());
}
}
for(String cf: columnFamiles) {
admin.deleteColumn(physicalTableName, cf);
}
clearTableRegionCache(physicalTableName);
wasDeleted = true;
}
} catch (org.apache.hadoop.hbase.TableNotFoundException ignore) {
// Ignore, as we may never have created a view index table
}
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
}
return wasDeleted;
}
@Override
public MetaDataMutationResult createTable(final List<Mutation> tableMetaData, byte[] physicalTableName,
PTableType tableType, Map<String, Object> tableProps,
final List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, boolean isNamespaceMapped)
throws SQLException {
byte[][] rowKeyMetadata = new byte[3][];
Mutation m = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetaData);
byte[] key = m.getRow();
SchemaUtil.getVarChars(key, rowKeyMetadata);
byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
byte[] schemaBytes = rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
byte[] tableBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
byte[] tableName = physicalTableName != null ? physicalTableName : SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes);
boolean localIndexTable = false;
for(Pair<byte[], Map<String, Object>> family: families) {
if(Bytes.toString(family.getFirst()).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
localIndexTable = true;
break;
}
}
if ((tableType == PTableType.VIEW && physicalTableName != null) || (tableType != PTableType.VIEW && (physicalTableName == null || localIndexTable))) {
// For views this will ensure that metadata already exists
// For tables and indexes, this will create the metadata if it doesn't already exist
ensureTableCreated(tableName, tableType, tableProps, families, splits, true, isNamespaceMapped);
}
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
if (tableType == PTableType.INDEX) { // Index on view
// Physical index table created up front for multi tenant
// TODO: if viewIndexId is Short.MIN_VALUE, then we don't need to attempt to create it
if (physicalTableName != null) {
if (!localIndexTable && !MetaDataUtil.isMultiTenant(m, kvBuilder, ptr)) {
ensureViewIndexTableCreated(tenantIdBytes.length == 0 ? null : PNameFactory.newName(tenantIdBytes),
physicalTableName, MetaDataUtil.getClientTimeStamp(m), isNamespaceMapped);
}
}
} else if (tableType == PTableType.TABLE && MetaDataUtil.isMultiTenant(m, kvBuilder, ptr)) { // Create view index table up front for multi tenant tables
ptr.set(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES);
MetaDataUtil.getMutationValue(m, PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME_BYTES, kvBuilder, ptr);
List<Pair<byte[],Map<String,Object>>> familiesPlusDefault = null;
for (Pair<byte[],Map<String,Object>> family : families) {
byte[] cf = family.getFirst();
if (Bytes.compareTo(cf, 0, cf.length, ptr.get(), ptr.getOffset(),ptr.getLength()) == 0) {
familiesPlusDefault = families;
break;
}
}
// Don't override if default family already present
if (familiesPlusDefault == null) {
byte[] defaultCF = ByteUtil.copyKeyBytesIfNecessary(ptr);
// Only use splits if table is salted, otherwise it may not be applicable
// Always add default column family, as we don't know in advance if we'll need it
familiesPlusDefault = Lists.newArrayList(families);
familiesPlusDefault.add(new Pair<byte[],Map<String,Object>>(defaultCF,Collections.<String,Object>emptyMap()));
}
ensureViewIndexTableCreated(
SchemaUtil.getPhysicalHBaseTableName(tableName, isNamespaceMapped, tableType).getBytes(),
tableProps, familiesPlusDefault, MetaDataUtil.isSalted(m, kvBuilder, ptr) ? splits : null,
MetaDataUtil.getClientTimeStamp(m), isNamespaceMapped);
}
byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes);
MetaDataMutationResult result = metaDataCoprocessorExec(tableKey,
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
CreateTableRequest.Builder builder = CreateTableRequest.newBuilder();
for (Mutation m : tableMetaData) {
MutationProto mp = ProtobufUtil.toProto(m);
builder.addTableMetadataMutations(mp.toByteString());
}
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
CreateTableRequest build = builder.build();
instance.createTable(controller, build, rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
});
return result;
}
@Override
public MetaDataMutationResult getTable(final PName tenantId, final byte[] schemaBytes, final byte[] tableBytes,
final long tableTimestamp, final long clientTimestamp) throws SQLException {
final byte[] tenantIdBytes = tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes();
byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes);
return metaDataCoprocessorExec(tableKey,
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
GetTableRequest.Builder builder = GetTableRequest.newBuilder();
builder.setTenantId(ByteStringer.wrap(tenantIdBytes));
builder.setSchemaName(ByteStringer.wrap(schemaBytes));
builder.setTableName(ByteStringer.wrap(tableBytes));
builder.setTableTimestamp(tableTimestamp);
builder.setClientTimestamp(clientTimestamp);
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.getTable(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
});
}
@Override
public MetaDataMutationResult dropTable(final List<Mutation> tableMetaData, final PTableType tableType,
final boolean cascade) throws SQLException {
byte[][] rowKeyMetadata = new byte[3][];
SchemaUtil.getVarChars(tableMetaData.get(0).getRow(), rowKeyMetadata);
byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
byte[] schemaBytes = rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
byte[] tableBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantIdBytes, schemaBytes, tableBytes);
final MetaDataMutationResult result = metaDataCoprocessorExec(tableKey,
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
DropTableRequest.Builder builder = DropTableRequest.newBuilder();
for (Mutation m : tableMetaData) {
MutationProto mp = ProtobufUtil.toProto(m);
builder.addTableMetadataMutations(mp.toByteString());
}
builder.setTableType(tableType.getSerializedValue());
builder.setCascade(cascade);
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.dropTable(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
});
final MutationCode code = result.getMutationCode();
switch(code) {
case TABLE_ALREADY_EXISTS:
ReadOnlyProps props = this.getProps();
boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
PTable table = result.getTable();
if (dropMetadata) {
flushParentPhysicalTable(table);
dropTables(result.getTableNamesToDelete());
}
invalidateTables(result.getTableNamesToDelete());
long timestamp = MetaDataUtil.getClientTimeStamp(tableMetaData);
if (tableType == PTableType.TABLE) {
byte[] physicalName = table.getPhysicalName().getBytes();
ensureViewIndexTableDropped(physicalName, timestamp);
ensureLocalIndexTableDropped(physicalName, timestamp);
tableStatsCache.invalidate(new ImmutableBytesPtr(physicalName));
}
break;
default:
break;
}
return result;
}
/*
* PHOENIX-2915 while dropping index, flush data table to avoid stale WAL edits of indexes 1. Flush parent table if
* dropping view has indexes 2. Dropping table indexes 3. Dropping view indexes
*/
private void flushParentPhysicalTable(PTable table) throws SQLException {
byte[] parentPhysicalTableName = null;
if (PTableType.VIEW == table.getType()) {
if (!table.getIndexes().isEmpty()) {
parentPhysicalTableName = table.getPhysicalName().getBytes();
}
} else if (PTableType.INDEX == table.getType()) {
PTable parentTable = getTable(table.getTenantId(), table.getParentName().getString(), HConstants.LATEST_TIMESTAMP);
parentPhysicalTableName = parentTable.getPhysicalName().getBytes();
}
if (parentPhysicalTableName != null) {
flushTable(parentPhysicalTableName);
}
}
@Override
public MetaDataMutationResult dropFunction(final List<Mutation> functionData, final boolean ifExists) throws SQLException {
byte[][] rowKeyMetadata = new byte[2][];
byte[] key = functionData.get(0).getRow();
SchemaUtil.getVarChars(key, rowKeyMetadata);
byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
byte[] functionBytes = rowKeyMetadata[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX];
byte[] functionKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionBytes);
final MetaDataMutationResult result = metaDataCoprocessorExec(functionKey,
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
DropFunctionRequest.Builder builder = DropFunctionRequest.newBuilder();
for (Mutation m : functionData) {
MutationProto mp = ProtobufUtil.toProto(m);
builder.addTableMetadataMutations(mp.toByteString());
}
builder.setIfExists(ifExists);
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.dropFunction(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
}, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES);
return result;
}
private void invalidateTables(final List<byte[]> tableNamesToDelete) {
if (tableNamesToDelete != null) {
for (byte[] tableName : tableNamesToDelete) {
tableStatsCache.invalidate(new ImmutableBytesPtr(Bytes.toString(tableName)
.replace(QueryConstants.NAMESPACE_SEPARATOR, QueryConstants.NAME_SEPARATOR).getBytes()));
}
}
}
private void dropTables(final List<byte[]> tableNamesToDelete) throws SQLException {
SQLException sqlE = null;
try (HBaseAdmin admin = getAdmin()) {
if (tableNamesToDelete != null){
for ( byte[] tableName : tableNamesToDelete ) {
if ( admin.tableExists(tableName) ) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
clearTableRegionCache(tableName);
}
}
}
} catch (IOException e) {
sqlE = ServerUtil.parseServerException(e);
} finally {
if (sqlE != null) {
throw sqlE;
}
}
}
private static Map<String,Object> createPropertiesMap(Map<ImmutableBytesWritable,ImmutableBytesWritable> htableProps) {
Map<String,Object> props = Maps.newHashMapWithExpectedSize(htableProps.size());
for (Map.Entry<ImmutableBytesWritable,ImmutableBytesWritable> entry : htableProps.entrySet()) {
ImmutableBytesWritable key = entry.getKey();
ImmutableBytesWritable value = entry.getValue();
props.put(Bytes.toString(key.get(), key.getOffset(), key.getLength()), Bytes.toString(value.get(), value.getOffset(), value.getLength()));
}
return props;
}
private void ensureViewIndexTableCreated(PName tenantId, byte[] physicalIndexTableName, long timestamp,
boolean isNamespaceMapped) throws SQLException {
String name = Bytes
.toString(SchemaUtil.getParentTableNameFromIndexTable(physicalIndexTableName,
MetaDataUtil.VIEW_INDEX_TABLE_PREFIX))
.replace(QueryConstants.NAMESPACE_SEPARATOR, QueryConstants.NAME_SEPARATOR);
PTable table = getTable(tenantId, name, timestamp);
ensureViewIndexTableCreated(table, timestamp, isNamespaceMapped);
}
private PTable getTable(PName tenantId, String fullTableName, long timestamp) throws SQLException {
PTable table;
try {
PMetaData metadata = latestMetaData;
throwConnectionClosedIfNullMetaData();
table = metadata.getTableRef(new PTableKey(tenantId, fullTableName)).getTable();
if (table.getTimeStamp() >= timestamp) { // Table in cache is newer than client timestamp which shouldn't be
// the case
throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString());
}
} catch (TableNotFoundException e) {
byte[] schemaName = Bytes.toBytes(SchemaUtil.getSchemaNameFromFullName(fullTableName));
byte[] tableName = Bytes.toBytes(SchemaUtil.getTableNameFromFullName(fullTableName));
MetaDataMutationResult result = this.getTable(tenantId, schemaName, tableName, HConstants.LATEST_TIMESTAMP,
timestamp);
table = result.getTable();
if (table == null) { throw e; }
}
return table;
}
private void ensureViewIndexTableCreated(PTable table, long timestamp, boolean isNamespaceMapped)
throws SQLException {
byte[] physicalTableName = table.getPhysicalName().getBytes();
HTableDescriptor htableDesc = this.getTableDescriptor(physicalTableName);
Map<String,Object> tableProps = createPropertiesMap(htableDesc.getValues());
List<Pair<byte[],Map<String,Object>>> families = Lists.newArrayListWithExpectedSize(Math.max(1, table.getColumnFamilies().size()+1));
if (families.isEmpty()) {
byte[] familyName = SchemaUtil.getEmptyColumnFamily(table);
Map<String,Object> familyProps = createPropertiesMap(htableDesc.getFamily(familyName).getValues());
families.add(new Pair<byte[],Map<String,Object>>(familyName, familyProps));
} else {
for (PColumnFamily family : table.getColumnFamilies()) {
byte[] familyName = family.getName().getBytes();
Map<String,Object> familyProps = createPropertiesMap(htableDesc.getFamily(familyName).getValues());
families.add(new Pair<byte[],Map<String,Object>>(familyName, familyProps));
}
// Always create default column family, because we don't know in advance if we'll
// need it for an index with no covered columns.
families.add(new Pair<byte[],Map<String,Object>>(table.getDefaultFamilyName().getBytes(), Collections.<String,Object>emptyMap()));
}
byte[][] splits = null;
if (table.getBucketNum() != null) {
splits = SaltingUtil.getSalteByteSplitPoints(table.getBucketNum());
}
// Transfer over table values into tableProps
// TODO: encapsulate better
tableProps.put(PhoenixDatabaseMetaData.TRANSACTIONAL, table.isTransactional());
tableProps.put(PhoenixDatabaseMetaData.IMMUTABLE_ROWS, table.isImmutableRows());
ensureViewIndexTableCreated(physicalTableName, tableProps, families, splits, timestamp, isNamespaceMapped);
}
@Override
public MetaDataMutationResult addColumn(final List<Mutation> tableMetaData, PTable table, Map<String, List<Pair<String,Object>>> stmtProperties, Set<String> colFamiliesForPColumnsToBeAdded) throws SQLException {
List<Pair<byte[], Map<String, Object>>> families = new ArrayList<>(stmtProperties.size());
Map<String, Object> tableProps = new HashMap<String, Object>();
Set<HTableDescriptor> tableDescriptors = Collections.emptySet();
Set<HTableDescriptor> origTableDescriptors = Collections.emptySet();
boolean nonTxToTx = false;
Pair<HTableDescriptor,HTableDescriptor> tableDescriptorPair = separateAndValidateProperties(table, stmtProperties, colFamiliesForPColumnsToBeAdded, families, tableProps);
HTableDescriptor tableDescriptor = tableDescriptorPair.getSecond();
HTableDescriptor origTableDescriptor = tableDescriptorPair.getFirst();
if (tableDescriptor != null) {
tableDescriptors = Sets.newHashSetWithExpectedSize(3 + table.getIndexes().size());
origTableDescriptors = Sets.newHashSetWithExpectedSize(3 + table.getIndexes().size());
tableDescriptors.add(tableDescriptor);
origTableDescriptors.add(origTableDescriptor);
nonTxToTx = Boolean.TRUE.equals(tableProps.get(TxConstants.READ_NON_TX_DATA));
/*
* If the table was transitioned from non transactional to transactional, we need
* to also transition the index tables.
*/
if (nonTxToTx) {
updateDescriptorForTx(table, tableProps, tableDescriptor, Boolean.TRUE.toString(), tableDescriptors, origTableDescriptors);
}
}
boolean success = false;
boolean metaDataUpdated = !tableDescriptors.isEmpty();
boolean pollingNeeded = !(!tableProps.isEmpty() && families.isEmpty() && colFamiliesForPColumnsToBeAdded.isEmpty());
MetaDataMutationResult result = null;
try {
boolean modifyHTable = true;
if (table.getType() == PTableType.VIEW) {
boolean canViewsAddNewCF = props.getBoolean(QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE,
QueryServicesOptions.DEFAULT_ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE);
// When adding a column to a view, base physical table should only be modified when new column families are being added.
modifyHTable = canViewsAddNewCF && !existingColumnFamiliesForBaseTable(table.getPhysicalName()).containsAll(colFamiliesForPColumnsToBeAdded);
}
if (modifyHTable) {
sendHBaseMetaData(tableDescriptors, pollingNeeded);
}
// Special case for call during drop table to ensure that the empty column family exists.
// In this, case we only include the table header row, as until we add schemaBytes and tableBytes
// as args to this function, we have no way of getting them in this case.
// TODO: change to if (tableMetaData.isEmpty()) once we pass through schemaBytes and tableBytes
// Also, could be used to update property values on ALTER TABLE t SET prop=xxx
if ((tableMetaData.isEmpty()) || (tableMetaData.size() == 1 && tableMetaData.get(0).isEmpty())) {
return new MetaDataMutationResult(MutationCode.NO_OP, System.currentTimeMillis(), table);
}
byte[][] rowKeyMetaData = new byte[3][];
PTableType tableType = table.getType();
Mutation m = tableMetaData.get(0);
byte[] rowKey = m.getRow();
SchemaUtil.getVarChars(rowKey, rowKeyMetaData);
byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
byte[] schemaBytes = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
byte[] tableBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes);
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
result = metaDataCoprocessorExec(tableKey,
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
AddColumnRequest.Builder builder = AddColumnRequest.newBuilder();
for (Mutation m : tableMetaData) {
MutationProto mp = ProtobufUtil.toProto(m);
builder.addTableMetadataMutations(mp.toByteString());
}
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.addColumn(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
});
if (result.getMutationCode() == MutationCode.COLUMN_NOT_FOUND || result.getMutationCode() == MutationCode.TABLE_ALREADY_EXISTS) { // Success
success = true;
// Flush the table if transitioning DISABLE_WAL from TRUE to FALSE
if ( MetaDataUtil.getMutationValue(m,PhoenixDatabaseMetaData.DISABLE_WAL_BYTES, kvBuilder, ptr)
&& Boolean.FALSE.equals(PBoolean.INSTANCE.toObject(ptr))) {
flushTable(table.getPhysicalName().getBytes());
}
if (tableType == PTableType.TABLE) {
// If we're changing MULTI_TENANT to true or false, create or drop the view index table
if (MetaDataUtil.getMutationValue(m, PhoenixDatabaseMetaData.MULTI_TENANT_BYTES, kvBuilder, ptr)){
long timestamp = MetaDataUtil.getClientTimeStamp(m);
if (Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(ptr.get(), ptr.getOffset(), ptr.getLength()))) {
this.ensureViewIndexTableCreated(table, timestamp,table.isNamespaceMapped());
} else {
this.ensureViewIndexTableDropped(table.getPhysicalName().getBytes(), timestamp);
}
}
}
}
} finally {
// If we weren't successful with our metadata update
// and we've already pushed the HBase metadata changes to the server
// and we've tried to go from non transactional to transactional
// then we must undo the metadata change otherwise the table will
// no longer function correctly.
// Note that if this fails, we're in a corrupt state.
if (!success && metaDataUpdated && nonTxToTx) {
sendHBaseMetaData(origTableDescriptors, pollingNeeded);
}
}
return result;
}
private void updateDescriptorForTx(PTable table, Map<String, Object> tableProps, HTableDescriptor tableDescriptor,
String txValue, Set<HTableDescriptor> descriptorsToUpdate, Set<HTableDescriptor> origDescriptors) throws SQLException {
byte[] physicalTableName = table.getPhysicalName().getBytes();
try (HBaseAdmin admin = getAdmin()) {
setTransactional(tableDescriptor, table.getType(), txValue, tableProps);
Map<String, Object> indexTableProps;
if (txValue == null) {
indexTableProps = Collections.<String,Object>emptyMap();
} else {
indexTableProps = Maps.newHashMapWithExpectedSize(1);
indexTableProps.put(TxConstants.READ_NON_TX_DATA, Boolean.valueOf(txValue));
}
for (PTable index : table.getIndexes()) {
HTableDescriptor indexDescriptor = admin.getTableDescriptor(index.getPhysicalName().getBytes());
origDescriptors.add(indexDescriptor);
indexDescriptor = new HTableDescriptor(indexDescriptor);
descriptorsToUpdate.add(indexDescriptor);
if (index.getColumnFamilies().isEmpty()) {
byte[] dataFamilyName = SchemaUtil.getEmptyColumnFamily(table);
byte[] indexFamilyName = SchemaUtil.getEmptyColumnFamily(index);
HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(indexFamilyName);
HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(dataFamilyName);
indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions());
indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL));
} else {
for (PColumnFamily family : index.getColumnFamilies()) {
byte[] familyName = family.getName().getBytes();
indexDescriptor.getFamily(familyName).setMaxVersions(tableDescriptor.getFamily(familyName).getMaxVersions());
HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(familyName);
HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(familyName);
indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions());
indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL));
}
}
setTransactional(indexDescriptor, index.getType(), txValue, indexTableProps);
}
try {
HTableDescriptor indexDescriptor = admin.getTableDescriptor(MetaDataUtil.getViewIndexPhysicalName(physicalTableName));
origDescriptors.add(indexDescriptor);
indexDescriptor = new HTableDescriptor(indexDescriptor);
descriptorsToUpdate.add(indexDescriptor);
setSharedIndexMaxVersion(table, tableDescriptor, indexDescriptor);
setTransactional(indexDescriptor, PTableType.INDEX, txValue, indexTableProps);
} catch (org.apache.hadoop.hbase.TableNotFoundException ignore) {
// Ignore, as we may never have created a view index table
}
try {
HTableDescriptor indexDescriptor = admin.getTableDescriptor(MetaDataUtil.getLocalIndexPhysicalName(physicalTableName));
origDescriptors.add(indexDescriptor);
indexDescriptor = new HTableDescriptor(indexDescriptor);
descriptorsToUpdate.add(indexDescriptor);
setSharedIndexMaxVersion(table, tableDescriptor, indexDescriptor);
setTransactional(indexDescriptor, PTableType.INDEX, txValue, indexTableProps);
} catch (org.apache.hadoop.hbase.TableNotFoundException ignore) {
// Ignore, as we may never have created a view index table
}
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
}
}
private void setSharedIndexMaxVersion(PTable table, HTableDescriptor tableDescriptor,
HTableDescriptor indexDescriptor) {
if (table.getColumnFamilies().isEmpty()) {
byte[] familyName = SchemaUtil.getEmptyColumnFamily(table);
HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(familyName);
HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(familyName);
indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions());
indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL));
} else {
for (PColumnFamily family : table.getColumnFamilies()) {
byte[] familyName = family.getName().getBytes();
HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(familyName);
if (indexColDescriptor != null) {
HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(familyName);
indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions());
indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL));
}
}
}
}
private void sendHBaseMetaData(Set<HTableDescriptor> tableDescriptors, boolean pollingNeeded) throws SQLException {
SQLException sqlE = null;
for (HTableDescriptor descriptor : tableDescriptors) {
try {
modifyTable(descriptor.getName(), descriptor, pollingNeeded);
} catch (IOException e) {
sqlE = ServerUtil.parseServerException(e);
} catch (InterruptedException e) {
// restore the interrupt status
Thread.currentThread().interrupt();
sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException();
} catch (TimeoutException e) {
sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT).setRootCause(e.getCause() != null ? e.getCause() : e).build().buildException();
} finally {
if (sqlE != null) {
throw sqlE;
}
}
}
}
private void setTransactional(HTableDescriptor tableDescriptor, PTableType tableType, String txValue, Map<String, Object> tableProps) throws SQLException {
if (txValue == null) {
tableDescriptor.remove(TxConstants.READ_NON_TX_DATA);
} else {
tableDescriptor.setValue(TxConstants.READ_NON_TX_DATA, txValue);
}
this.addCoprocessors(tableDescriptor.getName(), tableDescriptor, tableType, tableProps);
}
private Pair<HTableDescriptor,HTableDescriptor> separateAndValidateProperties(PTable table, Map<String, List<Pair<String, Object>>> properties, Set<String> colFamiliesForPColumnsToBeAdded, List<Pair<byte[], Map<String, Object>>> families, Map<String, Object> tableProps) throws SQLException {
Map<String, Map<String, Object>> stmtFamiliesPropsMap = new HashMap<>(properties.size());
Map<String,Object> commonFamilyProps = new HashMap<>();
boolean addingColumns = colFamiliesForPColumnsToBeAdded != null && !colFamiliesForPColumnsToBeAdded.isEmpty();
HashSet<String> existingColumnFamilies = existingColumnFamilies(table);
Map<String, Map<String, Object>> allFamiliesProps = new HashMap<>(existingColumnFamilies.size());
boolean isTransactional = table.isTransactional();
boolean willBeTransactional = false;
boolean isOrWillBeTransactional = isTransactional;
Integer newTTL = null;
for (String family : properties.keySet()) {
List<Pair<String, Object>> propsList = properties.get(family);
if (propsList != null && propsList.size() > 0) {
Map<String, Object> colFamilyPropsMap = new HashMap<String, Object>(propsList.size());
for (Pair<String, Object> prop : propsList) {
String propName = prop.getFirst();
Object propValue = prop.getSecond();
if ((MetaDataUtil.isHTableProperty(propName) || TableProperty.isPhoenixTableProperty(propName)) && addingColumns) {
// setting HTable and PhoenixTable properties while adding a column is not allowed.
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_TABLE_PROPERTY_ADD_COLUMN)
.setMessage("Property: " + propName).build()
.buildException();
}
if (MetaDataUtil.isHTableProperty(propName)) {
// Can't have a column family name for a property that's an HTableProperty
if (!family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY)) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY)
.setMessage("Column Family: " + family + ", Property: " + propName).build()
.buildException();
}
tableProps.put(propName, propValue);
} else {
if (TableProperty.isPhoenixTableProperty(propName)) {
TableProperty.valueOf(propName).validate(true, !family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY), table.getType());
if (propName.equals(TTL)) {
newTTL = ((Number)prop.getSecond()).intValue();
// Even though TTL is really a HColumnProperty we treat it specially.
// We enforce that all column families have the same TTL.
commonFamilyProps.put(propName, prop.getSecond());
} else if (propName.equals(PhoenixDatabaseMetaData.TRANSACTIONAL) && Boolean.TRUE.equals(propValue)) {
willBeTransactional = isOrWillBeTransactional = true;
tableProps.put(TxConstants.READ_NON_TX_DATA, propValue);
}
} else {
if (MetaDataUtil.isHColumnProperty(propName)) {
if (family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY)) {
commonFamilyProps.put(propName, propValue);
} else {
colFamilyPropsMap.put(propName, propValue);
}
} else {
// invalid property - neither of HTableProp, HColumnProp or PhoenixTableProp
// FIXME: This isn't getting triggered as currently a property gets evaluated
// as HTableProp if its neither HColumnProp or PhoenixTableProp.
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ALTER_PROPERTY)
.setMessage("Column Family: " + family + ", Property: " + propName).build()
.buildException();
}
}
}
}
if (!colFamilyPropsMap.isEmpty()) {
stmtFamiliesPropsMap.put(family, colFamilyPropsMap);
}
}
}
commonFamilyProps = Collections.unmodifiableMap(commonFamilyProps);
boolean isAddingPkColOnly = colFamiliesForPColumnsToBeAdded.size() == 1 && colFamiliesForPColumnsToBeAdded.contains(null);
if (!commonFamilyProps.isEmpty()) {
if (!addingColumns) {
// Add the common family props to all existing column families
for (String existingColFamily : existingColumnFamilies) {
Map<String, Object> m = new HashMap<String, Object>(commonFamilyProps.size());
m.putAll(commonFamilyProps);
allFamiliesProps.put(existingColFamily, m);
}
} else {
// Add the common family props to the column families of the columns being added
for (String colFamily : colFamiliesForPColumnsToBeAdded) {
if (colFamily != null) {
// only set properties for key value columns
Map<String, Object> m = new HashMap<String, Object>(commonFamilyProps.size());
m.putAll(commonFamilyProps);
allFamiliesProps.put(colFamily, m);
} else if (isAddingPkColOnly) {
// Setting HColumnProperty for a pk column is invalid
// because it will be part of the row key and not a key value column family.
// However, if both pk cols as well as key value columns are getting added
// together, then its allowed. The above if block will make sure that we add properties
// only for the kv cols and not pk cols.
throw new SQLExceptionInfo.Builder(SQLExceptionCode.SET_UNSUPPORTED_PROP_ON_ALTER_TABLE)
.build().buildException();
}
}
}
}
// Now go through the column family properties specified in the statement
// and merge them with the common family properties.
for (String f : stmtFamiliesPropsMap.keySet()) {
if (!addingColumns && !existingColumnFamilies.contains(f)) {
throw new ColumnFamilyNotFoundException(f);
}
if (addingColumns && !colFamiliesForPColumnsToBeAdded.contains(f)) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_PROPERTY_FOR_COLUMN_NOT_ADDED).build().buildException();
}
Map<String, Object> commonProps = allFamiliesProps.get(f);
Map<String, Object> stmtProps = stmtFamiliesPropsMap.get(f);
if (commonProps != null) {
if (stmtProps != null) {
// merge common props with statement props for the family
commonProps.putAll(stmtProps);
}
} else {
// if no common props were specified, then assign family specific props
if (stmtProps != null) {
allFamiliesProps.put(f, stmtProps);
}
}
}
// case when there is a column family being added but there are no props
// For ex - in DROP COLUMN when a new empty CF needs to be added since all
// the columns of the existing empty CF are getting dropped. Or the case
// when one is just adding a column for a column family like this:
// ALTER TABLE ADD CF.COL
for (String cf : colFamiliesForPColumnsToBeAdded) {
if (cf != null && allFamiliesProps.get(cf) == null) {
allFamiliesProps.put(cf, new HashMap<String, Object>());
}
}
if (table.getColumnFamilies().isEmpty() && !addingColumns && !commonFamilyProps.isEmpty()) {
allFamiliesProps.put(Bytes.toString(table.getDefaultFamilyName() == null ? QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES : table.getDefaultFamilyName().getBytes() ), commonFamilyProps);
}
// Views are not allowed to have any of these properties.
if (table.getType() == PTableType.VIEW && (!stmtFamiliesPropsMap.isEmpty() || !commonFamilyProps.isEmpty() || !tableProps.isEmpty())) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.VIEW_WITH_PROPERTIES).build()
.buildException();
}
HTableDescriptor newTableDescriptor = null;
HTableDescriptor origTableDescriptor = null;
if (!allFamiliesProps.isEmpty() || !tableProps.isEmpty()) {
byte[] tableNameBytes = Bytes.toBytes(table.getPhysicalName().getString());
HTableDescriptor existingTableDescriptor = origTableDescriptor = getTableDescriptor(tableNameBytes);
newTableDescriptor = new HTableDescriptor(existingTableDescriptor);
if (!tableProps.isEmpty()) {
// add all the table properties to the existing table descriptor
for (Entry<String, Object> entry : tableProps.entrySet()) {
newTableDescriptor.setValue(entry.getKey(), entry.getValue() != null ? entry.getValue().toString() : null);
}
}
if (addingColumns) {
// Make sure that all the CFs of the table have the same TTL as the empty CF.
setTTLForNewCFs(allFamiliesProps, table, newTableDescriptor, newTTL);
}
// Set TTL on all table column families, even if they're not referenced here
if (newTTL != null) {
for (PColumnFamily family : table.getColumnFamilies()) {
if (!allFamiliesProps.containsKey(family.getName().getString())) {
Map<String,Object> familyProps = Maps.newHashMapWithExpectedSize(1);
familyProps.put(TTL, newTTL);
allFamiliesProps.put(family.getName().getString(), familyProps);
}
}
}
Integer defaultTxMaxVersions = null;
if (isOrWillBeTransactional) {
// Calculate default for max versions
Map<String, Object> emptyFamilyProps = allFamiliesProps.get(SchemaUtil.getEmptyColumnFamilyAsString(table));
if (emptyFamilyProps != null) {
defaultTxMaxVersions = (Integer)emptyFamilyProps.get(HConstants.VERSIONS);
}
if (defaultTxMaxVersions == null) {
if (isTransactional) {
defaultTxMaxVersions = newTableDescriptor.getFamily(SchemaUtil.getEmptyColumnFamily(table)).getMaxVersions();
} else {
defaultTxMaxVersions =
this.getProps().getInt(
QueryServices.MAX_VERSIONS_TRANSACTIONAL_ATTRIB,
QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL);
}
}
if (willBeTransactional) {
// Set VERSIONS for all column families when transitioning to transactional
for (PColumnFamily family : table.getColumnFamilies()) {
if (!allFamiliesProps.containsKey(family.getName().getString())) {
Map<String,Object> familyProps = Maps.newHashMapWithExpectedSize(1);
familyProps.put(HConstants.VERSIONS, defaultTxMaxVersions);
allFamiliesProps.put(family.getName().getString(), familyProps);
}
}
}
}
// Set Tephra's TTL property based on HBase property if we're
// transitioning to become transactional or setting TTL on
// an already transactional table.
if (isOrWillBeTransactional) {
int ttl = getTTL(table, newTableDescriptor, newTTL);
if (ttl != HColumnDescriptor.DEFAULT_TTL) {
for (Map.Entry<String, Map<String, Object>> entry : allFamiliesProps.entrySet()) {
Map<String, Object> props = entry.getValue();
if (props == null) {
props = new HashMap<String, Object>();
}
props.put(TxConstants.PROPERTY_TTL, ttl);
// Remove HBase TTL if we're not transitioning an existing table to become transactional
// or if the existing transactional table wasn't originally non transactional.
if (!willBeTransactional && !Boolean.valueOf(newTableDescriptor.getValue(TxConstants.READ_NON_TX_DATA))) {
props.remove(TTL);
}
}
}
}
for (Entry<String, Map<String, Object>> entry : allFamiliesProps.entrySet()) {
Map<String,Object> familyProps = entry.getValue();
if (isOrWillBeTransactional) {
if (!familyProps.containsKey(HConstants.VERSIONS)) {
familyProps.put(HConstants.VERSIONS, defaultTxMaxVersions);
}
}
byte[] cf = Bytes.toBytes(entry.getKey());
HColumnDescriptor colDescriptor = newTableDescriptor.getFamily(cf);
if (colDescriptor == null) {
// new column family
colDescriptor = generateColumnFamilyDescriptor(new Pair<>(cf, familyProps), table.getType());
newTableDescriptor.addFamily(colDescriptor);
} else {
modifyColumnFamilyDescriptor(colDescriptor, familyProps);
}
if (isOrWillBeTransactional) {
checkTransactionalVersionsValue(colDescriptor);
}
}
}
return new Pair<>(origTableDescriptor, newTableDescriptor);
}
private void checkTransactionalVersionsValue(HColumnDescriptor colDescriptor) throws SQLException {
int maxVersions = colDescriptor.getMaxVersions();
if (maxVersions <= 1) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MAX_VERSIONS_MUST_BE_GREATER_THAN_ONE)
.setFamilyName(colDescriptor.getNameAsString())
.build().buildException();
}
}
private HashSet<String> existingColumnFamiliesForBaseTable(PName baseTableName) throws TableNotFoundException {
throwConnectionClosedIfNullMetaData();
PTable table = latestMetaData.getTableRef(new PTableKey(null, baseTableName.getString())).getTable();
return existingColumnFamilies(table);
}
private HashSet<String> existingColumnFamilies(PTable table) {
List<PColumnFamily> cfs = table.getColumnFamilies();
HashSet<String> cfNames = new HashSet<>(cfs.size());
for (PColumnFamily cf : table.getColumnFamilies()) {
cfNames.add(cf.getName().getString());
}
return cfNames;
}
private static int getTTL(PTable table, HTableDescriptor tableDesc, Integer newTTL) throws SQLException {
// If we're setting TTL now, then use that value. Otherwise, use empty column family value
int ttl = newTTL != null ? newTTL
: tableDesc.getFamily(SchemaUtil.getEmptyColumnFamily(table)).getTimeToLive();
return ttl;
}
private static void setTTLForNewCFs(Map<String, Map<String, Object>> familyProps, PTable table,
HTableDescriptor tableDesc, Integer newTTL) throws SQLException {
if (!familyProps.isEmpty()) {
int ttl = getTTL(table, tableDesc, newTTL);
for (Map.Entry<String, Map<String, Object>> entry : familyProps.entrySet()) {
Map<String, Object> props = entry.getValue();
if (props == null) {
props = new HashMap<String, Object>();
}
props.put(TTL, ttl);
}
}
}
@Override
public MetaDataMutationResult dropColumn(final List<Mutation> tableMetaData, PTableType tableType) throws SQLException {
byte[][] rowKeyMetadata = new byte[3][];
SchemaUtil.getVarChars(tableMetaData.get(0).getRow(), rowKeyMetadata);
byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
byte[] schemaBytes = rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
byte[] tableBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes);
MetaDataMutationResult result = metaDataCoprocessorExec(tableKey,
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
DropColumnRequest.Builder builder = DropColumnRequest.newBuilder();
for (Mutation m : tableMetaData) {
MutationProto mp = ProtobufUtil.toProto(m);
builder.addTableMetadataMutations(mp.toByteString());
}
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.dropColumn(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
});
final MutationCode code = result.getMutationCode();
switch(code) {
case TABLE_ALREADY_EXISTS:
final ReadOnlyProps props = this.getProps();
final boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
if (dropMetadata) {
dropTables(result.getTableNamesToDelete());
}
invalidateTables(result.getTableNamesToDelete());
break;
default:
break;
}
return result;
}
/**
* This closes the passed connection.
*/
private PhoenixConnection addColumn(PhoenixConnection oldMetaConnection, String tableName, long timestamp, String columns, boolean addIfNotExists) throws SQLException {
Properties props = PropertiesUtil.deepCopy(oldMetaConnection.getClientInfo());
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timestamp));
// Cannot go through DriverManager or you end up in an infinite loop because it'll call init again
PhoenixConnection metaConnection = new PhoenixConnection(oldMetaConnection, this, props);
SQLException sqlE = null;
try {
metaConnection.createStatement().executeUpdate("ALTER TABLE " + tableName + " ADD " + (addIfNotExists ? " IF NOT EXISTS " : "") + columns );
} catch (NewerTableAlreadyExistsException e) {
logger.warn("Table already modified at this timestamp, so assuming add of these columns already done: " + columns);
} catch (SQLException e) {
logger.warn("Add column failed due to:" + e);
sqlE = e;
} finally {
try {
oldMetaConnection.close();
} catch (SQLException e) {
if (sqlE != null) {
sqlE.setNextException(e);
} else {
sqlE = e;
}
}
if (sqlE != null) {
throw sqlE;
}
}
return metaConnection;
}
/**
* Keeping this to use for further upgrades. This method closes the oldMetaConnection.
*/
private PhoenixConnection addColumnsIfNotExists(PhoenixConnection oldMetaConnection,
String tableName, long timestamp, String columns) throws SQLException {
return addColumn(oldMetaConnection, tableName, timestamp, columns, true);
}
@Override
public void init(final String url, final Properties props) throws SQLException {
try {
PhoenixContextExecutor.call(new Callable<Void>() {
@Override
public Void call() throws Exception {
if (initialized) {
if (initializationException != null) {
// Throw previous initialization exception, as we won't resuse this instance
throw initializationException;
}
return null;
}
synchronized (ConnectionQueryServicesImpl.this) {
if (initialized) {
if (initializationException != null) {
// Throw previous initialization exception, as we won't resuse this instance
throw initializationException;
}
return null;
}
checkClosed();
PhoenixConnection metaConnection = null;
boolean success = false;
String snapshotName = null;
String sysCatalogTableName = null;
boolean hConnectionEstablished = false;
boolean acquiredMutex = false;
byte[] mutexRowKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
boolean snapshotCreated = false;
try {
openConnection();
hConnectionEstablished = true;
String noUpgradeProp = props.getProperty(PhoenixRuntime.NO_UPGRADE_ATTRIB);
boolean upgradeSystemTables = !Boolean.TRUE.equals(Boolean.valueOf(noUpgradeProp));
Properties scnProps = PropertiesUtil.deepCopy(props);
scnProps.setProperty(
PhoenixRuntime.CURRENT_SCN_ATTRIB,
Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP));
scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
String globalUrl = JDBCUtil.removeProperty(url, PhoenixRuntime.TENANT_ID_ATTRIB);
metaConnection = new PhoenixConnection(
ConnectionQueryServicesImpl.this, globalUrl, scnProps, newEmptyMetaData());
try (HBaseAdmin admin = getAdmin()) {
boolean mappedSystemCatalogExists = admin
.tableExists(SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES, true));
if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM,
ConnectionQueryServicesImpl.this.getProps())) {
if (admin.tableExists(SYSTEM_CATALOG_NAME_BYTES)) {
//check if the server is already updated and have namespace config properly set.
checkClientServerCompatibility(SYSTEM_CATALOG_NAME_BYTES);
}
ensureSystemTablesUpgraded(ConnectionQueryServicesImpl.this.getProps());
} else if (mappedSystemCatalogExists) { throw new SQLExceptionInfo.Builder(
SQLExceptionCode.INCONSISTENET_NAMESPACE_MAPPING_PROPERTIES)
.setMessage("Cannot initiate connection as "
+ SchemaUtil.getPhysicalTableName(
SYSTEM_CATALOG_NAME_BYTES, true)
+ " is found but client does not have "
+ IS_NAMESPACE_MAPPING_ENABLED + " enabled")
.build().buildException(); }
}
try {
metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);
} catch (NewerTableAlreadyExistsException ignore) {
// Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed timestamp.
// A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
} catch (TableAlreadyExistsException e) {
if (upgradeSystemTables) {
long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
sysCatalogTableName = e.getTable().getPhysicalName().getString();
if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP
&& (acquiredMutex = acquireUpgradeMutex(currentServerSideTableTimeStamp, e
.getTable().getPhysicalName().getBytes()))) {
snapshotName = getSysCatalogSnapshotName(currentServerSideTableTimeStamp);
createSnapshot(snapshotName, sysCatalogTableName);
snapshotCreated = true;
}
String columnsToAdd = "";
// This will occur if we have an older SYSTEM.CATALOG and we need to update it to include
// any new columns we've added.
if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
// We know that we always need to add the STORE_NULLS column for 4.3 release
columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.STORE_NULLS + " " + PBoolean.INSTANCE.getSqlTypeName());
try (HBaseAdmin admin = getAdmin()) {
HTableDescriptor[] localIndexTables = admin.listTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX+".*");
for (HTableDescriptor table : localIndexTables) {
if (table.getValue(MetaDataUtil.PARENT_TABLE_KEY) == null
&& table.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME) != null) {
table.setValue(MetaDataUtil.PARENT_TABLE_KEY,
MetaDataUtil.getUserTableName(table
.getNameAsString()));
// Explicitly disable, modify and enable the table to ensure co-location of data
// and index regions. If we just modify the table descriptor when online schema
// change enabled may reopen the region in same region server instead of following data region.
admin.disableTable(table.getTableName());
admin.modifyTable(table.getTableName(), table);
admin.enableTable(table.getTableName());
}
}
}
}
// If the server side schema is before MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0 then
// we need to add INDEX_TYPE and INDEX_DISABLE_TIMESTAMP columns too.
// TODO: Once https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed,
// we should just have a ALTER TABLE ADD IF NOT EXISTS statement with all
// the column names that have been added to SYSTEM.CATALOG since 4.0.
if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) {
columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.INDEX_TYPE + " " + PUnsignedTinyint.INSTANCE.getSqlTypeName()
+ ", " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName());
}
// If we have some new columns from 4.1-4.3 to add, add them now.
if (!columnsToAdd.isEmpty()) {
// Ugh..need to assign to another local variable to keep eclipse happy.
PhoenixConnection newMetaConnection = addColumnsIfNotExists(metaConnection,
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0, columnsToAdd);
metaConnection = newMetaConnection;
}
if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0) {
columnsToAdd = PhoenixDatabaseMetaData.BASE_COLUMN_COUNT + " "
+ PInteger.INSTANCE.getSqlTypeName();
try {
metaConnection = addColumn(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0, columnsToAdd, false);
upgradeTo4_5_0(metaConnection);
} catch (ColumnAlreadyExistsException ignored) {
/*
* Upgrade to 4.5 is a slightly special case. We use the fact that the column
* BASE_COLUMN_COUNT is already part of the meta-data schema as the signal that
* the server side upgrade has finished or is in progress.
*/
logger.debug("No need to run 4.5 upgrade");
}
Properties props = PropertiesUtil.deepCopy(metaConnection.getClientInfo());
props.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB);
props.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
PhoenixConnection conn = new PhoenixConnection(ConnectionQueryServicesImpl.this, metaConnection.getURL(), props, metaConnection.getMetaDataCache());
try {
List<String> tablesNeedingUpgrade = UpgradeUtil.getPhysicalTablesWithDescRowKey(conn);
if (!tablesNeedingUpgrade.isEmpty()) {
logger.warn("The following tables require upgrade due to a bug causing the row key to be incorrect for descending columns and ascending BINARY columns (PHOENIX-2067 and PHOENIX-2120):\n" + Joiner.on(' ').join(tablesNeedingUpgrade) + "\nTo upgrade issue the \"bin/psql.py -u\" command.");
}
List<String> unsupportedTables = UpgradeUtil.getPhysicalTablesWithDescVarbinaryRowKey(conn);
if (!unsupportedTables.isEmpty()) {
logger.warn("The following tables use an unsupported VARBINARY DESC construct and need to be changed:\n" + Joiner.on(' ').join(unsupportedTables));
}
} catch (Exception ex) {
logger.error("Unable to determine tables requiring upgrade due to PHOENIX-2067", ex);
} finally {
conn.close();
}
}
// Add these columns one at a time, each with different timestamps so that if folks have
// run the upgrade code already for a snapshot, we'll still enter this block (and do the
// parts we haven't yet done).
if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0) {
columnsToAdd = PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP + " " + PBoolean.INSTANCE.getSqlTypeName();
metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0, columnsToAdd);
}
if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) {
// Drop old stats table so that new stats table is created
metaConnection = dropStatsTable(metaConnection,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 4);
metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 3,
PhoenixDatabaseMetaData.TRANSACTIONAL + " " + PBoolean.INSTANCE.getSqlTypeName());
metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 2,
PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY + " " + PLong.INSTANCE.getSqlTypeName());
metaConnection = setImmutableTableIndexesImmutable(metaConnection,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 1);
metaConnection = updateSystemCatalogTimestamp(metaConnection,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
ConnectionQueryServicesImpl.this.removeTable(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
clearCache();
}
if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) {
metaConnection = addColumnsIfNotExists(metaConnection,
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 2,
PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED + " "
+ PBoolean.INSTANCE.getSqlTypeName());
metaConnection = addColumnsIfNotExists(metaConnection,
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 1,
PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ + " "
+ PVarchar.INSTANCE.getSqlTypeName());
metaConnection = addColumnsIfNotExists(metaConnection,
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0,
PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA + " "
+ PBoolean.INSTANCE.getSqlTypeName());
metaConnection = UpgradeUtil.disableViewIndexes(metaConnection);
if(getProps().getBoolean(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB,
QueryServicesOptions.DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)) {
metaConnection = UpgradeUtil.upgradeLocalIndexes(metaConnection);
}
ConnectionQueryServicesImpl.this.removeTable(null,
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0);
clearCache();
}
}
}
int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
try {
String createSequenceTable = Sequence.getCreateTableStatement(nSaltBuckets);
metaConnection.createStatement().executeUpdate(createSequenceTable);
nSequenceSaltBuckets = nSaltBuckets;
} catch (NewerTableAlreadyExistsException e) {
// Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed timestamp.
// A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
nSequenceSaltBuckets = getSaltBuckets(e);
} catch (TableAlreadyExistsException e) {
if (upgradeSystemTables) {
// This will occur if we have an older SYSTEM.SEQUENCE and we need to update it to include
// any new columns we've added.
long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) {
// If the table time stamp is before 4.1.0 then we need to add below columns
// to the SYSTEM.SEQUENCE table.
String columnsToAdd = PhoenixDatabaseMetaData.MIN_VALUE + " " + PLong.INSTANCE.getSqlTypeName()
+ ", " + PhoenixDatabaseMetaData.MAX_VALUE + " " + PLong.INSTANCE.getSqlTypeName()
+ ", " + PhoenixDatabaseMetaData.CYCLE_FLAG + " " + PBoolean.INSTANCE.getSqlTypeName()
+ ", " + PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG + " " + PBoolean.INSTANCE.getSqlTypeName();
addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd);
}
// If the table timestamp is before 4.2.1 then run the upgrade script
if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_2_1) {
if (UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets, e.getTable())) {
metaConnection.removeTable(null,
PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_SCHEMA,
PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY,
PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_SCHEMA_BYTES,
PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE_BYTES,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
clearTableRegionCache(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES);
}
nSequenceSaltBuckets = nSaltBuckets;
} else {
nSequenceSaltBuckets = getSaltBuckets(e);
}
}
}
try {
metaConnection.createStatement().executeUpdate(
QueryConstants.CREATE_STATS_TABLE_METADATA);
} catch (NewerTableAlreadyExistsException ignore) {
} catch(TableAlreadyExistsException e) {
if (upgradeSystemTables) {
long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
metaConnection = addColumnsIfNotExists(
metaConnection,
SYSTEM_STATS_NAME,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT + " "
+ PLong.INSTANCE.getSqlTypeName());
}
}
}
try {
metaConnection.createStatement().executeUpdate(
QueryConstants.CREATE_FUNCTION_METADATA);
} catch (NewerTableAlreadyExistsException e) {
} catch (TableAlreadyExistsException e) {
}
if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM,
ConnectionQueryServicesImpl.this.getProps())) {
try {
metaConnection.createStatement().executeUpdate("CREATE SCHEMA IF NOT EXISTS "
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA);
} catch (NewerSchemaAlreadyExistsException e) {}
}
success = true;
scheduleRenewLeaseTasks();
} catch (UpgradeInProgressException e) {
// don't set it as initializationException because otherwise client won't be able to retry
throw e;
} catch (Exception e) {
if (e instanceof SQLException) {
initializationException = (SQLException)e;
} else {
// wrap every other exception into a SQLException
initializationException = new SQLException(e);
}
} finally {
try {
if (metaConnection != null) metaConnection.close();
} catch (SQLException e) {
if (initializationException != null) {
initializationException.setNextException(e);
} else {
initializationException = e;
}
} finally {
try {
if (snapshotCreated) {
restoreFromSnapshot(sysCatalogTableName, snapshotName, success);
}
} catch (SQLException e) {
if (initializationException != null) {
initializationException.setNextException(e);
} else {
initializationException = e;
}
} finally {
try {
if (!success && hConnectionEstablished) {
connection.close();
}
} catch (IOException e) {
SQLException ex = new SQLException(e);
if (initializationException != null) {
initializationException.setNextException(ex);
} else {
initializationException = ex;
}
} finally {
initialized = true;
if (acquiredMutex) {
releaseUpgradeMutex(mutexRowKey);
}
if (initializationException != null) {
throw initializationException;
}
}
}
}
}
}
return null;
}
private void createSnapshot(String snapshotName, String tableName)
throws SQLException {
HBaseAdmin admin = null;
SQLException sqlE = null;
try {
admin = getAdmin();
admin.snapshot(snapshotName, tableName);
logger.info("Successfully created snapshot " + snapshotName + " for "
+ tableName);
} catch (Exception e) {
sqlE = new SQLException(e);
} finally {
try {
if (admin != null) {
admin.close();
}
} catch (Exception e) {
SQLException adminCloseEx = new SQLException(e);
if (sqlE == null) {
sqlE = adminCloseEx;
} else {
sqlE.setNextException(adminCloseEx);
}
} finally {
if (sqlE != null) {
throw sqlE;
}
}
}
}
private void restoreFromSnapshot(String tableName, String snapshotName,
boolean success) throws SQLException {
boolean snapshotRestored = false;
boolean tableDisabled = false;
if (!success && snapshotName != null) {
SQLException sqlE = null;
HBaseAdmin admin = null;
try {
logger.warn("Starting restore of " + tableName + " using snapshot "
+ snapshotName + " because upgrade failed");
admin = getAdmin();
admin.disableTable(tableName);
tableDisabled = true;
admin.restoreSnapshot(snapshotName);
snapshotRestored = true;
logger.warn("Successfully restored " + tableName + " using snapshot "
+ snapshotName);
} catch (Exception e) {
sqlE = new SQLException(e);
} finally {
if (admin != null && tableDisabled) {
try {
admin.enableTable(tableName);
if (snapshotRestored) {
logger.warn("Successfully restored and enabled " + tableName + " using snapshot "
+ snapshotName);
} else {
logger.warn("Successfully enabled " + tableName + " after restoring using snapshot "
+ snapshotName + " failed. ");
}
} catch (Exception e1) {
SQLException enableTableEx = new SQLException(e1);
if (sqlE == null) {
sqlE = enableTableEx;
} else {
sqlE.setNextException(enableTableEx);
}
logger.error("Failure in enabling "
+ tableName
+ (snapshotRestored ? " after successfully restoring using snapshot"
+ snapshotName
: " after restoring using snapshot "
+ snapshotName + " failed. "));
} finally {
try {
admin.close();
} catch (Exception e2) {
SQLException adminCloseEx = new SQLException(e2);
if (sqlE == null) {
sqlE = adminCloseEx;
} else {
sqlE.setNextException(adminCloseEx);
}
} finally {
if (sqlE != null) {
throw sqlE;
}
}
}
}
}
}
}
private void ensureSystemTablesUpgraded(ReadOnlyProps props)
throws SQLException, IOException, IllegalArgumentException, InterruptedException {
if (!SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, props)) { return; }
HTableInterface metatable = null;
try (HBaseAdmin admin = getAdmin()) {
ensureNamespaceCreated(QueryConstants.SYSTEM_SCHEMA_NAME);
List<TableName> tableNames = Arrays
.asList(admin.listTableNames(QueryConstants.SYSTEM_SCHEMA_NAME + "\\..*"));
if (tableNames.size() == 0) { return; }
if (tableNames.size() > 4) { throw new IllegalArgumentException(
"Expected 4 system table only but found " + tableNames.size() + ":" + tableNames); }
byte[] mappedSystemTable = SchemaUtil
.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, props).getName();
metatable = getTable(mappedSystemTable);
if (tableNames.contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME)) {
if (!admin.tableExists(mappedSystemTable)) {
UpgradeUtil.mapTableToNamespace(admin, metatable,
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, props, null, PTableType.SYSTEM,
null);
ConnectionQueryServicesImpl.this.removeTable(null,
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0);
}
tableNames.remove(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME);
}
for (TableName table : tableNames) {
UpgradeUtil.mapTableToNamespace(admin, metatable, table.getNameAsString(), props, null, PTableType.SYSTEM,
null);
ConnectionQueryServicesImpl.this.removeTable(null, table.getNameAsString(), null,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0);
}
if (!tableNames.isEmpty()) {
clearCache();
}
} finally {
if (metatable != null) {
metatable.close();
}
}
}
});
} catch (Exception e) {
Throwables.propagateIfInstanceOf(e, SQLException.class);
Throwables.propagate(e);
}
}
/**
* Acquire distributed mutex of sorts to make sure only one JVM is able to run the upgrade code by
* making use of HBase's checkAndPut api.
*
* @return true if client won the race, false otherwise
* @throws IOException
* @throws SQLException
*/
@VisibleForTesting
public boolean acquireUpgradeMutex(long currentServerSideTableTimestamp, byte[] rowToLock) throws IOException,
SQLException {
Preconditions.checkArgument(currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP);
try (HBaseAdmin admin = getAdmin()) {
try {
HTableDescriptor tableDesc = new HTableDescriptor(
TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES));
HColumnDescriptor columnDesc = new HColumnDescriptor(
PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES);
columnDesc.setTimeToLive(TTL_FOR_MUTEX); // Let mutex expire after some time
tableDesc.addFamily(columnDesc);
admin.createTable(tableDesc);
try (HTableInterface sysMutexTable = getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) {
Put put = new Put(rowToLock);
put.addColumn(PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES, UPGRADE_MUTEX, UPGRADE_MUTEX_UNLOCKED);
sysMutexTable.put(put);
}
} catch (TableExistsException e) {
// Ignore
}
}
try (Table sysMutexTable = getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) {
byte[] family = PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
byte[] qualifier = UPGRADE_MUTEX;
byte[] oldValue = UPGRADE_MUTEX_UNLOCKED;
byte[] newValue = UPGRADE_MUTEX_LOCKED;
Put put = new Put(rowToLock);
put.addColumn(family, qualifier, newValue);
boolean acquired = sysMutexTable.checkAndPut(rowToLock, family, qualifier, oldValue, put);
if (!acquired) { throw new UpgradeInProgressException(getVersion(currentServerSideTableTimestamp),
getVersion(MIN_SYSTEM_TABLE_TIMESTAMP)); }
return true;
}
}
@VisibleForTesting
public boolean releaseUpgradeMutex(byte[] mutexRowKey) {
boolean released = false;
try (Table sysMutexTable = getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) {
byte[] family = PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
byte[] qualifier = UPGRADE_MUTEX;
byte[] expectedValue = UPGRADE_MUTEX_LOCKED;
byte[] newValue = UPGRADE_MUTEX_UNLOCKED;
Put put = new Put(mutexRowKey);
put.addColumn(family, qualifier, newValue);
released = sysMutexTable.checkAndPut(mutexRowKey, family, qualifier, expectedValue, put);
} catch (Exception e) {
logger.warn("Release of upgrade mutex failed", e);
}
return released;
}
public static class UpgradeInProgressException extends SQLException {
public UpgradeInProgressException(String upgradeFrom, String upgradeTo) {
super("Cluster is being concurrently upgraded from " + upgradeFrom + " to " + upgradeTo
+ ". Please retry establishing connection.", SQLExceptionCode.CONCURRENT_UPGRADE_IN_PROGRESS
.getSQLState(), SQLExceptionCode.CONCURRENT_UPGRADE_IN_PROGRESS.getErrorCode());
}
}
private String addColumn(String columnsToAddSoFar, String columns) {
if (columnsToAddSoFar == null || columnsToAddSoFar.isEmpty()) {
return columns;
} else {
return columnsToAddSoFar + ", " + columns;
}
}
/**
* Set IMMUTABLE_ROWS to true for all index tables over immutable tables.
* @param metaConnection connection over which to run the upgrade
* @throws SQLException
*/
private PhoenixConnection setImmutableTableIndexesImmutable(PhoenixConnection oldMetaConnection, long timestamp) throws SQLException {
SQLException sqlE = null;
Properties props = PropertiesUtil.deepCopy(oldMetaConnection.getClientInfo());
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timestamp));
PhoenixConnection metaConnection = new PhoenixConnection(oldMetaConnection, this, props);
boolean autoCommit = metaConnection.getAutoCommit();
try {
metaConnection.setAutoCommit(true);
metaConnection.createStatement().execute(
"UPSERT INTO SYSTEM.CATALOG(TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, IMMUTABLE_ROWS)\n" +
"SELECT A.TENANT_ID, A.TABLE_SCHEM,B.COLUMN_FAMILY,null,null,true\n" +
"FROM SYSTEM.CATALOG A JOIN SYSTEM.CATALOG B ON (\n" +
" A.TENANT_ID = B.TENANT_ID AND \n" +
" A.TABLE_SCHEM = B.TABLE_SCHEM AND\n" +
" A.TABLE_NAME = B.TABLE_NAME AND\n" +
" A.COLUMN_NAME = B.COLUMN_NAME AND\n" +
" B.LINK_TYPE = 1\n" +
")\n" +
"WHERE A.COLUMN_FAMILY IS NULL AND\n" +
" B.COLUMN_FAMILY IS NOT NULL AND\n" +
" A.IMMUTABLE_ROWS = TRUE");
} catch (SQLException e) {
logger.warn("exception during upgrading stats table:" + e);
sqlE = e;
} finally {
try {
metaConnection.setAutoCommit(autoCommit);
oldMetaConnection.close();
} catch (SQLException e) {
if (sqlE != null) {
sqlE.setNextException(e);
} else {
sqlE = e;
}
}
if (sqlE != null) {
throw sqlE;
}
}
return metaConnection;
}
/**
* Forces update of SYSTEM.CATALOG by setting column to existing value
* @param oldMetaConnection
* @param timestamp
* @return
* @throws SQLException
*/
private PhoenixConnection updateSystemCatalogTimestamp(PhoenixConnection oldMetaConnection, long timestamp) throws SQLException {
SQLException sqlE = null;
Properties props = PropertiesUtil.deepCopy(oldMetaConnection.getClientInfo());
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timestamp));
PhoenixConnection metaConnection = new PhoenixConnection(oldMetaConnection, this, props);
boolean autoCommit = metaConnection.getAutoCommit();
try {
metaConnection.setAutoCommit(true);
metaConnection.createStatement().execute(
"UPSERT INTO SYSTEM.CATALOG(TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, DISABLE_WAL)\n" +
"VALUES (NULL, '" + QueryConstants.SYSTEM_SCHEMA_NAME + "','" + PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE + "', NULL, NULL, FALSE)");
} catch (SQLException e) {
logger.warn("exception during upgrading stats table:" + e);
sqlE = e;
} finally {
try {
metaConnection.setAutoCommit(autoCommit);
oldMetaConnection.close();
} catch (SQLException e) {
if (sqlE != null) {
sqlE.setNextException(e);
} else {
sqlE = e;
}
}
if (sqlE != null) {
throw sqlE;
}
}
return metaConnection;
}
private PhoenixConnection dropStatsTable(PhoenixConnection oldMetaConnection, long timestamp)
throws SQLException, IOException {
Properties props = PropertiesUtil.deepCopy(oldMetaConnection.getClientInfo());
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timestamp));
PhoenixConnection metaConnection = new PhoenixConnection(oldMetaConnection, this, props);
SQLException sqlE = null;
boolean wasCommit = metaConnection.getAutoCommit();
try {
metaConnection.setAutoCommit(true);
metaConnection.createStatement()
.executeUpdate("DELETE FROM " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE "
+ PhoenixDatabaseMetaData.TABLE_NAME + "='" + PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE
+ "' AND " + PhoenixDatabaseMetaData.TABLE_SCHEM + "='"
+ PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME + "'");
} catch (SQLException e) {
logger.warn("exception during upgrading stats table:" + e);
sqlE = e;
} finally {
try {
metaConnection.setAutoCommit(wasCommit);
oldMetaConnection.close();
} catch (SQLException e) {
if (sqlE != null) {
sqlE.setNextException(e);
} else {
sqlE = e;
}
}
if (sqlE != null) {
throw sqlE;
}
}
return metaConnection;
}
private void scheduleRenewLeaseTasks() {
if (isRenewingLeasesEnabled()) {
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("PHOENIX-SCANNER-RENEW-LEASE" + "-thread-%s").build();
renewLeaseExecutor =
Executors.newScheduledThreadPool(renewLeasePoolSize, threadFactory);
for (LinkedBlockingQueue<WeakReference<PhoenixConnection>> q : connectionQueues) {
renewLeaseExecutor.scheduleAtFixedRate(new RenewLeaseTask(q), 0,
renewLeaseTaskFrequency, TimeUnit.MILLISECONDS);
}
}
}
private static int getSaltBuckets(TableAlreadyExistsException e) {
PTable table = e.getTable();
Integer sequenceSaltBuckets = table == null ? null : table.getBucketNum();
return sequenceSaltBuckets == null ? 0 : sequenceSaltBuckets;
}
@Override
public MutationState updateData(MutationPlan plan) throws SQLException {
MutationState state = plan.execute();
plan.getContext().getConnection().commit();
return state;
}
@Override
public int getLowestClusterHBaseVersion() {
return lowestClusterHBaseVersion;
}
@Override
public boolean hasIndexWALCodec() {
return hasIndexWALCodec;
}
/**
* Clears the Phoenix meta data cache on each region server
* @throws SQLException
*/
@Override
public long clearCache() throws SQLException {
try {
SQLException sqlE = null;
HTableInterface htable = this.getTable(SchemaUtil
.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getName());
try {
final Map<byte[], Long> results =
htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW, new Batch.Call<MetaDataService, Long>() {
@Override
public Long call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<ClearCacheResponse> rpcCallback =
new BlockingRpcCallback<ClearCacheResponse>();
ClearCacheRequest.Builder builder = ClearCacheRequest.newBuilder();
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.clearCache(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get().getUnfreedBytes();
}
});
long unfreedBytes = 0;
for (Map.Entry<byte[],Long> result : results.entrySet()) {
if (result.getValue() != null) {
unfreedBytes += result.getValue();
}
}
return unfreedBytes;
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
} catch (Throwable e) {
sqlE = new SQLException(e);
} finally {
try {
tableStatsCache.invalidateAll();
htable.close();
} catch (IOException e) {
if (sqlE == null) {
sqlE = ServerUtil.parseServerException(e);
} else {
sqlE.setNextException(ServerUtil.parseServerException(e));
}
} finally {
if (sqlE != null) {
throw sqlE;
}
}
}
} catch (Exception e) {
throw new SQLException(ServerUtil.parseServerException(e));
}
return 0;
}
private void flushTable(byte[] tableName) throws SQLException {
HBaseAdmin admin = getAdmin();
try {
admin.flush(tableName);
} catch (IOException e) {
throw new PhoenixIOException(e);
} catch (InterruptedException e) {
// restore the interrupt status
Thread.currentThread().interrupt();
throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build()
.buildException();
} finally {
Closeables.closeQuietly(admin);
}
}
@Override
public HBaseAdmin getAdmin() throws SQLException {
try {
return new HBaseAdmin(connection);
} catch (IOException e) {
throw new PhoenixIOException(e);
}
}
@Override
public MetaDataMutationResult updateIndexState(final List<Mutation> tableMetaData, String parentTableName) throws SQLException {
byte[][] rowKeyMetadata = new byte[3][];
SchemaUtil.getVarChars(tableMetaData.get(0).getRow(), rowKeyMetadata);
byte[] tableKey = SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY, rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX], rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]);
return metaDataCoprocessorExec(tableKey,
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
UpdateIndexStateRequest.Builder builder = UpdateIndexStateRequest.newBuilder();
for (Mutation m : tableMetaData) {
MutationProto mp = ProtobufUtil.toProto(m);
builder.addTableMetadataMutations(mp.toByteString());
}
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.updateIndexState(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
});
}
@Override
public long createSequence(String tenantId, String schemaName, String sequenceName,
long startWith, long incrementBy, long cacheSize, long minValue, long maxValue,
boolean cycle, long timestamp) throws SQLException {
SequenceKey sequenceKey = new SequenceKey(tenantId, schemaName, sequenceName, nSequenceSaltBuckets);
Sequence newSequences = new Sequence(sequenceKey);
Sequence sequence = sequenceMap.putIfAbsent(sequenceKey, newSequences);
if (sequence == null) {
sequence = newSequences;
}
try {
sequence.getLock().lock();
// Now that we have the lock we need, create the sequence
Append append = sequence.createSequence(startWith, incrementBy, cacheSize, timestamp, minValue, maxValue, cycle);
HTableInterface htable = this.getTable(SchemaUtil
.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES, this.getProps()).getName());
htable.setAutoFlush(true);
try {
Result result = htable.append(append);
return sequence.createSequence(result, minValue, maxValue, cycle);
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
} finally {
Closeables.closeQuietly(htable);
}
} finally {
sequence.getLock().unlock();
}
}
@Override
public long dropSequence(String tenantId, String schemaName, String sequenceName, long timestamp) throws SQLException {
SequenceKey sequenceKey = new SequenceKey(tenantId, schemaName, sequenceName, nSequenceSaltBuckets);
Sequence newSequences = new Sequence(sequenceKey);
Sequence sequence = sequenceMap.putIfAbsent(sequenceKey, newSequences);
if (sequence == null) {
sequence = newSequences;
}
try {
sequence.getLock().lock();
// Now that we have the lock we need, create the sequence
Append append = sequence.dropSequence(timestamp);
HTableInterface htable = this.getTable(SchemaUtil
.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES, this.getProps()).getName());
try {
Result result = htable.append(append);
return sequence.dropSequence(result);
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
} finally {
Closeables.closeQuietly(htable);
}
} finally {
sequence.getLock().unlock();
}
}
/**
* Gets the current sequence value
* @throws SQLException if cached sequence cannot be found
*/
@Override
public long currentSequenceValue(SequenceKey sequenceKey, long timestamp) throws SQLException {
Sequence sequence = sequenceMap.get(sequenceKey);
if (sequence == null) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CALL_CURRENT_BEFORE_NEXT_VALUE)
.setSchemaName(sequenceKey.getSchemaName()).setTableName(sequenceKey.getSequenceName())
.build().buildException();
}
sequence.getLock().lock();
try {
return sequence.currentValue(timestamp);
} catch (EmptySequenceCacheException e) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CALL_CURRENT_BEFORE_NEXT_VALUE)
.setSchemaName(sequenceKey.getSchemaName()).setTableName(sequenceKey.getSequenceName())
.build().buildException();
} finally {
sequence.getLock().unlock();
}
}
/**
* Verifies that sequences exist and reserves values for them if reserveValues is true
*/
@Override
public void validateSequences(List<SequenceAllocation> sequenceAllocations, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException {
incrementSequenceValues(sequenceAllocations, timestamp, values, exceptions, action);
}
/**
* Increment any of the set of sequences that need more values. These are the sequences
* that are asking for the next value within a given statement. The returned sequences
* are the ones that were not found because they were deleted by another client.
* @param sequenceKeys sorted list of sequence kyes
* @param timestamp
* @throws SQLException if any of the sequences cannot be found
*
*/
@Override
public void incrementSequences(List<SequenceAllocation> sequenceAllocations, long timestamp, long[] values, SQLException[] exceptions) throws SQLException {
incrementSequenceValues(sequenceAllocations, timestamp, values, exceptions, Sequence.ValueOp.INCREMENT_SEQUENCE);
}
@SuppressWarnings("deprecation")
private void incrementSequenceValues(List<SequenceAllocation> sequenceAllocations, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp op) throws SQLException {
List<Sequence> sequences = Lists.newArrayListWithExpectedSize(sequenceAllocations.size());
for (SequenceAllocation sequenceAllocation : sequenceAllocations) {
SequenceKey key = sequenceAllocation.getSequenceKey();
Sequence newSequences = new Sequence(key);
Sequence sequence = sequenceMap.putIfAbsent(key, newSequences);
if (sequence == null) {
sequence = newSequences;
}
sequences.add(sequence);
}
try {
for (Sequence sequence : sequences) {
sequence.getLock().lock();
}
// Now that we have all the locks we need, increment the sequences
List<Increment> incrementBatch = Lists.newArrayListWithExpectedSize(sequences.size());
List<Sequence> toIncrementList = Lists.newArrayListWithExpectedSize(sequences.size());
int[] indexes = new int[sequences.size()];
for (int i = 0; i < sequences.size(); i++) {
Sequence sequence = sequences.get(i);
try {
values[i] = sequence.incrementValue(timestamp, op, sequenceAllocations.get(i).getNumAllocations());
} catch (EmptySequenceCacheException e) {
indexes[toIncrementList.size()] = i;
toIncrementList.add(sequence);
Increment inc = sequence.newIncrement(timestamp, op, sequenceAllocations.get(i).getNumAllocations());
incrementBatch.add(inc);
} catch (SQLException e) {
exceptions[i] = e;
}
}
if (toIncrementList.isEmpty()) {
return;
}
HTableInterface hTable = this.getTable(SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES,this.getProps()).getName());
Object[] resultObjects = null;
SQLException sqlE = null;
try {
resultObjects= hTable.batch(incrementBatch);
} catch (IOException e) {
sqlE = ServerUtil.parseServerException(e);
} catch (InterruptedException e) {
// restore the interrupt status
Thread.currentThread().interrupt();
sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION)
.setRootCause(e).build().buildException(); // FIXME ?
} finally {
try {
hTable.close();
} catch (IOException e) {
if (sqlE == null) {
sqlE = ServerUtil.parseServerException(e);
} else {
sqlE.setNextException(ServerUtil.parseServerException(e));
}
}
if (sqlE != null) {
throw sqlE;
}
}
for (int i=0;i<resultObjects.length;i++){
Sequence sequence = toIncrementList.get(i);
Result result = (Result)resultObjects[i];
try {
long numToAllocate = Bytes.toLong(incrementBatch.get(i).getAttribute(SequenceRegionObserver.NUM_TO_ALLOCATE));
values[indexes[i]] = sequence.incrementValue(result, op, numToAllocate);
} catch (SQLException e) {
exceptions[indexes[i]] = e;
}
}
} finally {
for (Sequence sequence : sequences) {
sequence.getLock().unlock();
}
}
}
@Override
public void clearTableFromCache(final byte[] tenantId, final byte[] schemaName, final byte[] tableName,
final long clientTS) throws SQLException {
// clear the meta data cache for the table here
try {
SQLException sqlE = null;
HTableInterface htable = this.getTable(SchemaUtil
.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getName());
try {
htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
new Batch.Call<MetaDataService, ClearTableFromCacheResponse>() {
@Override
public ClearTableFromCacheResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<ClearTableFromCacheResponse> rpcCallback = new BlockingRpcCallback<ClearTableFromCacheResponse>();
ClearTableFromCacheRequest.Builder builder = ClearTableFromCacheRequest.newBuilder();
builder.setTenantId(ByteStringer.wrap(tenantId));
builder.setTableName(ByteStringer.wrap(tableName));
builder.setSchemaName(ByteStringer.wrap(schemaName));
builder.setClientTimestamp(clientTS);
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.clearTableFromCache(controller, builder.build(), rpcCallback);
if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
return rpcCallback.get();
}
});
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
} catch (Throwable e) {
sqlE = new SQLException(e);
} finally {
try {
if (tenantId.length == 0) tableStatsCache.invalidate(new ImmutableBytesPtr(SchemaUtil.getTableNameAsBytes(schemaName, tableName)));
htable.close();
} catch (IOException e) {
if (sqlE == null) {
sqlE = ServerUtil.parseServerException(e);
} else {
sqlE.setNextException(ServerUtil.parseServerException(e));
}
} finally {
if (sqlE != null) { throw sqlE; }
}
}
} catch (Exception e) {
throw new SQLException(ServerUtil.parseServerException(e));
}
}
@SuppressWarnings("deprecation")
@Override
public void returnSequences(List<SequenceKey> keys, long timestamp, SQLException[] exceptions) throws SQLException {
List<Sequence> sequences = Lists.newArrayListWithExpectedSize(keys.size());
for (SequenceKey key : keys) {
Sequence newSequences = new Sequence(key);
Sequence sequence = sequenceMap.putIfAbsent(key, newSequences);
if (sequence == null) {
sequence = newSequences;
}
sequences.add(sequence);
}
try {
for (Sequence sequence : sequences) {
sequence.getLock().lock();
}
// Now that we have all the locks we need, attempt to return the unused sequence values
List<Append> mutations = Lists.newArrayListWithExpectedSize(sequences.size());
List<Sequence> toReturnList = Lists.newArrayListWithExpectedSize(sequences.size());
int[] indexes = new int[sequences.size()];
for (int i = 0; i < sequences.size(); i++) {
Sequence sequence = sequences.get(i);
try {
Append append = sequence.newReturn(timestamp);
toReturnList.add(sequence);
mutations.add(append);
} catch (EmptySequenceCacheException ignore) { // Nothing to return, so ignore
}
}
if (toReturnList.isEmpty()) {
return;
}
HTableInterface hTable = this.getTable(SchemaUtil
.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES, this.getProps()).getName());
Object[] resultObjects = null;
SQLException sqlE = null;
try {
resultObjects= hTable.batch(mutations);
} catch (IOException e){
sqlE = ServerUtil.parseServerException(e);
} catch (InterruptedException e){
// restore the interrupt status
Thread.currentThread().interrupt();
sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION)
.setRootCause(e).build().buildException(); // FIXME ?
} finally {
try {
hTable.close();
} catch (IOException e) {
if (sqlE == null) {
sqlE = ServerUtil.parseServerException(e);
} else {
sqlE.setNextException(ServerUtil.parseServerException(e));
}
}
if (sqlE != null) {
throw sqlE;
}
}
for (int i=0;i<resultObjects.length;i++){
Sequence sequence = toReturnList.get(i);
Result result = (Result)resultObjects[i];
try {
sequence.returnValue(result);
} catch (SQLException e) {
exceptions[indexes[i]] = e;
}
}
} finally {
for (Sequence sequence : sequences) {
sequence.getLock().unlock();
}
}
}
// Take no locks, as this only gets run when there are no open connections
// so there's no danger of contention.
@SuppressWarnings("deprecation")
private void returnAllSequences(ConcurrentMap<SequenceKey,Sequence> sequenceMap) throws SQLException {
List<Append> mutations = Lists.newArrayListWithExpectedSize(sequenceMap.size());
for (Sequence sequence : sequenceMap.values()) {
mutations.addAll(sequence.newReturns());
}
if (mutations.isEmpty()) {
return;
}
HTableInterface hTable = this.getTable(
SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES, this.getProps()).getName());
SQLException sqlE = null;
try {
hTable.batch(mutations);
} catch (IOException e) {
sqlE = ServerUtil.parseServerException(e);
} catch (InterruptedException e) {
// restore the interrupt status
Thread.currentThread().interrupt();
sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION)
.setRootCause(e).build().buildException(); // FIXME ?
} finally {
try {
hTable.close();
} catch (IOException e) {
if (sqlE == null) {
sqlE = ServerUtil.parseServerException(e);
} else {
sqlE.setNextException(ServerUtil.parseServerException(e));
}
}
if (sqlE != null) {
throw sqlE;
}
}
}
@Override
public void addConnection(PhoenixConnection connection) throws SQLException {
connectionQueues.get(getQueueIndex(connection)).add(new WeakReference<PhoenixConnection>(connection));
if (returnSequenceValues) {
synchronized (connectionCountLock) {
connectionCount++;
}
}
}
@Override
public void removeConnection(PhoenixConnection connection) throws SQLException {
if (returnSequenceValues) {
ConcurrentMap<SequenceKey,Sequence> formerSequenceMap = null;
synchronized (connectionCountLock) {
if (--connectionCount == 0) {
if (!this.sequenceMap.isEmpty()) {
formerSequenceMap = this.sequenceMap;
this.sequenceMap = Maps.newConcurrentMap();
}
}
}
// Since we're using the former sequenceMap, we can do this outside
// the lock.
if (formerSequenceMap != null) {
// When there are no more connections, attempt to return any sequences
returnAllSequences(formerSequenceMap);
}
}
}
private int getQueueIndex(PhoenixConnection conn) {
return ThreadLocalRandom.current().nextInt(renewLeasePoolSize);
}
@Override
public KeyValueBuilder getKeyValueBuilder() {
return this.kvBuilder;
}
@Override
public boolean supportsFeature(Feature feature) {
FeatureSupported supported = featureMap.get(feature);
if (supported == null) {
return false;
}
return supported.isSupported(this);
}
@Override
public String getUserName() {
return userName;
}
private void checkClosed() {
if (closed) {
throwConnectionClosedException();
}
}
private void throwConnectionClosedIfNullMetaData() {
if (latestMetaData == null) {
throwConnectionClosedException();
}
}
private void throwConnectionClosedException() {
throw new IllegalStateException("Connection to the cluster is closed");
}
@Override
public PTableStats getTableStats(final byte[] physicalName, final long clientTimeStamp) throws SQLException {
try {
return tableStatsCache.get(new ImmutableBytesPtr(physicalName));
} catch (ExecutionException e) {
throw ServerUtil.parseServerException(e);
}
}
@Override
public int getSequenceSaltBuckets() {
return nSequenceSaltBuckets;
}
@Override
public void addFunction(PFunction function) throws SQLException {
synchronized (latestMetaDataLock) {
try {
throwConnectionClosedIfNullMetaData();
// If existing table isn't older than new table, don't replace
// If a client opens a connection at an earlier timestamp, this can happen
PFunction existingFunction = latestMetaData.getFunction(new PTableKey(function.getTenantId(), function.getFunctionName()));
if (existingFunction.getTimeStamp() >= function.getTimeStamp()) {
return;
}
} catch (FunctionNotFoundException e) {}
latestMetaData.addFunction(function);
latestMetaDataLock.notifyAll();
}
}
@Override
public void removeFunction(PName tenantId, String function, long functionTimeStamp)
throws SQLException {
synchronized (latestMetaDataLock) {
throwConnectionClosedIfNullMetaData();
latestMetaData.removeFunction(tenantId, function, functionTimeStamp);
latestMetaDataLock.notifyAll();
}
}
@Override
public MetaDataMutationResult getFunctions(PName tenantId, final List<Pair<byte[], Long>> functions,
final long clientTimestamp) throws SQLException {
final byte[] tenantIdBytes = tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes();
return metaDataCoprocessorExec(tenantIdBytes,
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
GetFunctionsRequest.Builder builder = GetFunctionsRequest.newBuilder();
builder.setTenantId(ByteStringer.wrap(tenantIdBytes));
for(Pair<byte[], Long> function: functions) {
builder.addFunctionNames(ByteStringer.wrap(function.getFirst()));
builder.addFunctionTimestamps(function.getSecond().longValue());
}
builder.setClientTimestamp(clientTimestamp);
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.getFunctions(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
}, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES);
}
@Override
public MetaDataMutationResult getSchema(final String schemaName, final long clientTimestamp) throws SQLException {
return metaDataCoprocessorExec(SchemaUtil.getSchemaKey(schemaName),
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback = new BlockingRpcCallback<MetaDataResponse>();
GetSchemaRequest.Builder builder = GetSchemaRequest.newBuilder();
builder.setSchemaName(schemaName);
builder.setClientTimestamp(clientTimestamp);
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION,
PHOENIX_PATCH_NUMBER));
instance.getSchema(controller, builder.build(), rpcCallback);
if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
return rpcCallback.get();
}
});
}
// TODO the mutations should be added to System functions table.
@Override
public MetaDataMutationResult createFunction(final List<Mutation> functionData,
final PFunction function, final boolean temporary) throws SQLException {
byte[][] rowKeyMetadata = new byte[2][];
Mutation m = MetaDataUtil.getPutOnlyTableHeaderRow(functionData);
byte[] key = m.getRow();
SchemaUtil.getVarChars(key, rowKeyMetadata);
byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
byte[] functionBytes = rowKeyMetadata[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX];
byte[] functionKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionBytes);
MetaDataMutationResult result = metaDataCoprocessorExec(functionKey,
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
CreateFunctionRequest.Builder builder = CreateFunctionRequest.newBuilder();
for (Mutation m : functionData) {
MutationProto mp = ProtobufUtil.toProto(m);
builder.addTableMetadataMutations(mp.toByteString());
}
builder.setTemporary(temporary);
builder.setReplace(function.isReplace());
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.createFunction(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
}, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES);
return result;
}
@VisibleForTesting
static class RenewLeaseTask implements Runnable {
private final LinkedBlockingQueue<WeakReference<PhoenixConnection>> connectionsQueue;
private final Random random = new Random();
private static final int MAX_WAIT_TIME = 1000;
RenewLeaseTask(LinkedBlockingQueue<WeakReference<PhoenixConnection>> queue) {
this.connectionsQueue = queue;
}
private void waitForRandomDuration() throws InterruptedException {
new CountDownLatch(1).await(random.nextInt(MAX_WAIT_TIME), MILLISECONDS);
}
@Override
public void run() {
try {
int numConnections = connectionsQueue.size();
boolean wait = true;
// We keep adding items to the end of the queue. So to stop the loop, iterate only up to
// whatever the current count is.
while (numConnections > 0) {
if (wait) {
// wait for some random duration to prevent all threads from renewing lease at
// the same time.
waitForRandomDuration();
wait = false;
}
// It is guaranteed that this poll won't hang indefinitely because this is the
// only thread that removes items from the queue. Still adding a 1 ms timeout
// for sanity check.
WeakReference<PhoenixConnection> connRef =
connectionsQueue.poll(1, TimeUnit.MILLISECONDS);
if (connRef == null) {
throw new IllegalStateException(
"Connection ref found to be null. This is a bug. Some other thread removed items from the connection queue.");
}
PhoenixConnection conn = connRef.get();
if (conn != null && !conn.isClosed()) {
LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue =
conn.getScanners();
// We keep adding items to the end of the queue. So to stop the loop,
// iterate only up to whatever the current count is.
int numScanners = scannerQueue.size();
int renewed = 0;
long start = System.currentTimeMillis();
while (numScanners > 0) {
// It is guaranteed that this poll won't hang indefinitely because this is the
// only thread that removes items from the queue. Still adding a 1 ms timeout
// for sanity check.
WeakReference<TableResultIterator> ref =
scannerQueue.poll(1, TimeUnit.MILLISECONDS);
if (ref == null) {
throw new IllegalStateException(
"TableResulIterator ref found to be null. This is a bug. Some other thread removed items from the scanner queue.");
}
TableResultIterator scanningItr = ref.get();
if (scanningItr != null) {
RenewLeaseStatus status = scanningItr.renewLease();
switch (status) {
case RENEWED:
renewed++;
// add it back at the tail
scannerQueue.offer(new WeakReference<TableResultIterator>(
scanningItr));
logger.info("Lease renewed for scanner: " + scanningItr);
break;
case UNINITIALIZED:
case THRESHOLD_NOT_REACHED:
// add it back at the tail
scannerQueue.offer(new WeakReference<TableResultIterator>(
scanningItr));
break;
// if lease wasn't renewed or scanner was closed, don't add the
// scanner back to the queue.
case CLOSED:
case NOT_RENEWED:
break;
}
}
numScanners--;
}
if (renewed > 0) {
logger.info("Renewed leases for " + renewed + " scanner/s in "
+ (System.currentTimeMillis() - start) + " ms ");
}
connectionsQueue.offer(connRef);
}
numConnections--;
}
} catch (InterruptedException e1) {
Thread.currentThread().interrupt(); // restore the interrupt status
logger.warn("Thread interrupted when renewing lease ", e1);
throw new RuntimeException(e1);
} catch (Exception e2) {
logger.warn("Exception thrown when renewing lease ", e2);
throw new RuntimeException(e2);
}
}
}
@Override
public long getRenewLeaseThresholdMilliSeconds() {
return renewLeaseThreshold;
}
@Override
public boolean isRenewingLeasesEnabled() {
return supportsFeature(ConnectionQueryServices.Feature.RENEW_LEASE) && renewLeaseEnabled;
}
@Override
public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] row) throws SQLException {
/*
* Use HConnection.getRegionLocation as it uses the cache in HConnection, to get the region
* to which specified row belongs to.
*/
int retryCount = 0, maxRetryCount = 1;
boolean reload =false;
while (true) {
try {
return connection.getRegionLocation(TableName.valueOf(tableName), row, reload);
} catch (org.apache.hadoop.hbase.TableNotFoundException e) {
String fullName = Bytes.toString(tableName);
throw new TableNotFoundException(SchemaUtil.getSchemaNameFromFullName(fullName), SchemaUtil.getTableNameFromFullName(fullName));
} catch (IOException e) {
if (retryCount++ < maxRetryCount) { // One retry, in case split occurs while navigating
reload = true;
continue;
}
throw new SQLExceptionInfo.Builder(SQLExceptionCode.GET_TABLE_REGIONS_FAIL)
.setRootCause(e).build().buildException();
}
}
}
@Override
public MetaDataMutationResult createSchema(final List<Mutation> schemaMutations, final String schemaName)
throws SQLException {
ensureNamespaceCreated(schemaName);
Mutation m = MetaDataUtil.getPutOnlyTableHeaderRow(schemaMutations);
byte[] key = m.getRow();
MetaDataMutationResult result = metaDataCoprocessorExec(key,
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback = new BlockingRpcCallback<MetaDataResponse>();
CreateSchemaRequest.Builder builder = CreateSchemaRequest.newBuilder();
for (Mutation m : schemaMutations) {
MutationProto mp = ProtobufUtil.toProto(m);
builder.addTableMetadataMutations(mp.toByteString());
}
builder.setSchemaName(schemaName);
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION,
PHOENIX_PATCH_NUMBER));
instance.createSchema(controller, builder.build(), rpcCallback);
if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
return rpcCallback.get();
}
});
return result;
}
@Override
public void addSchema(PSchema schema) throws SQLException {
latestMetaData.addSchema(schema);
}
@Override
public void removeSchema(PSchema schema, long schemaTimeStamp) {
latestMetaData.removeSchema(schema, schemaTimeStamp);
}
@Override
public MetaDataMutationResult dropSchema(final List<Mutation> schemaMetaData, final String schemaName)
throws SQLException {
final MetaDataMutationResult result = metaDataCoprocessorExec(SchemaUtil.getSchemaKey(schemaName),
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback = new BlockingRpcCallback<MetaDataResponse>();
DropSchemaRequest.Builder builder = DropSchemaRequest.newBuilder();
for (Mutation m : schemaMetaData) {
MutationProto mp = ProtobufUtil.toProto(m);
builder.addSchemaMetadataMutations(mp.toByteString());
}
builder.setSchemaName(schemaName);
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION,
PHOENIX_PATCH_NUMBER));
instance.dropSchema(controller, builder.build(), rpcCallback);
if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
return rpcCallback.get();
}
});
final MutationCode code = result.getMutationCode();
switch (code) {
case SCHEMA_ALREADY_EXISTS:
ReadOnlyProps props = this.getProps();
boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
if (dropMetadata) {
ensureNamespaceDropped(schemaName, result.getMutationTime());
}
break;
default:
break;
}
return result;
}
private void ensureNamespaceDropped(String schemaName, long mutationTime) throws SQLException {
SQLException sqlE = null;
try (HBaseAdmin admin = getAdmin()) {
final String quorum = ZKConfig.getZKQuorumServersString(config);
final String znode = this.props.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
logger.debug("Found quorum: " + quorum + ":" + znode);
boolean nameSpaceExists = true;
try {
admin.getNamespaceDescriptor(schemaName);
} catch (org.apache.hadoop.hbase.NamespaceNotFoundException e) {
nameSpaceExists = false;
}
if (nameSpaceExists) {
admin.deleteNamespace(schemaName);
}
} catch (IOException e) {
sqlE = ServerUtil.parseServerException(e);
} finally {
if (sqlE != null) { throw sqlE; }
}
}
/**
* Manually adds {@link PTableStats} for a table to the client-side cache. Not a
* {@link ConnectionQueryServices} method. Exposed for testing purposes.
*
* @param tableName Table name
* @param stats Stats instance
*/
public void addTableStats(ImmutableBytesPtr tableName, PTableStats stats) {
this.tableStatsCache.put(Objects.requireNonNull(tableName), stats);
}
@Override
public void invalidateStats(ImmutableBytesPtr tableName) {
this.tableStatsCache.invalidate(Objects.requireNonNull(tableName));
}
}