| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.query; |
| |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| import static org.apache.hadoop.hbase.HColumnDescriptor.TTL; |
| import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION; |
| import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION; |
| import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER; |
| import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES; |
| import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA; |
| import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ENABLED; |
| import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE; |
| import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS; |
| import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS; |
| import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0; |
| |
| import java.io.IOException; |
| import java.lang.ref.WeakReference; |
| import java.sql.SQLException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Properties; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| |
| import javax.annotation.concurrent.GuardedBy; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.HColumnDescriptor; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.HRegionLocation; |
| import org.apache.hadoop.hbase.HTableDescriptor; |
| import org.apache.hadoop.hbase.TableExistsException; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.client.Append; |
| import org.apache.hadoop.hbase.client.HBaseAdmin; |
| import org.apache.hadoop.hbase.client.HConnection; |
| import org.apache.hadoop.hbase.client.HTableInterface; |
| import org.apache.hadoop.hbase.client.Increment; |
| import org.apache.hadoop.hbase.client.Mutation; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.client.coprocessor.Batch; |
| import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; |
| import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
| import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; |
| import org.apache.hadoop.hbase.ipc.ServerRpcController; |
| import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; |
| import org.apache.hadoop.hbase.regionserver.IndexHalfStoreFileReaderGenerator; |
| import org.apache.hadoop.hbase.regionserver.LocalIndexSplitter; |
| import org.apache.hadoop.hbase.security.User; |
| import org.apache.hadoop.hbase.util.ByteStringer; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.Pair; |
| import org.apache.hadoop.hbase.util.VersionInfo; |
| import org.apache.hadoop.hbase.zookeeper.ZKConfig; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.phoenix.compile.MutationPlan; |
| import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver; |
| import org.apache.phoenix.coprocessor.MetaDataEndpointImpl; |
| import org.apache.phoenix.coprocessor.MetaDataProtocol; |
| import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; |
| import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; |
| import org.apache.phoenix.coprocessor.MetaDataRegionObserver; |
| import org.apache.phoenix.coprocessor.PhoenixTransactionalProcessor; |
| import org.apache.phoenix.coprocessor.ScanRegionObserver; |
| import org.apache.phoenix.coprocessor.SequenceRegionObserver; |
| import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl; |
| import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; |
| import org.apache.phoenix.coprocessor.generated.MetaDataProtos; |
| import org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest; |
| import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest; |
| import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse; |
| import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheRequest; |
| import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheResponse; |
| import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateFunctionRequest; |
| import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateTableRequest; |
| import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropColumnRequest; |
| import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropFunctionRequest; |
| import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropTableRequest; |
| import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetFunctionsRequest; |
| import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetTableRequest; |
| import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest; |
| import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse; |
| import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse; |
| import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService; |
| import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest; |
| import org.apache.phoenix.exception.PhoenixIOException; |
| import org.apache.phoenix.exception.SQLExceptionCode; |
| import org.apache.phoenix.exception.SQLExceptionInfo; |
| import org.apache.phoenix.execute.MutationState; |
| import org.apache.phoenix.hbase.index.IndexRegionSplitPolicy; |
| import org.apache.phoenix.hbase.index.Indexer; |
| import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; |
| import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; |
| import org.apache.phoenix.hbase.index.util.KeyValueBuilder; |
| import org.apache.phoenix.hbase.index.util.VersionUtil; |
| import org.apache.phoenix.index.PhoenixIndexBuilder; |
| import org.apache.phoenix.index.PhoenixIndexCodec; |
| import org.apache.phoenix.index.PhoenixTransactionalIndexer; |
| import org.apache.phoenix.iterate.TableResultIterator; |
| import org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus; |
| import org.apache.phoenix.jdbc.PhoenixConnection; |
| import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; |
| import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; |
| import org.apache.phoenix.parse.PFunction; |
| import org.apache.phoenix.protobuf.ProtobufUtil; |
| import org.apache.phoenix.schema.ColumnAlreadyExistsException; |
| import org.apache.phoenix.schema.ColumnFamilyNotFoundException; |
| import org.apache.phoenix.schema.EmptySequenceCacheException; |
| import org.apache.phoenix.schema.FunctionNotFoundException; |
| import org.apache.phoenix.schema.MetaDataSplitPolicy; |
| import org.apache.phoenix.schema.NewerTableAlreadyExistsException; |
| import org.apache.phoenix.schema.PColumn; |
| import org.apache.phoenix.schema.PColumnFamily; |
| import org.apache.phoenix.schema.PMetaData; |
| import org.apache.phoenix.schema.PMetaDataImpl; |
| import org.apache.phoenix.schema.PName; |
| import org.apache.phoenix.schema.PNameFactory; |
| import org.apache.phoenix.schema.PTable; |
| import org.apache.phoenix.schema.PTableKey; |
| import org.apache.phoenix.schema.PTableType; |
| import org.apache.phoenix.schema.ReadOnlyTableException; |
| import org.apache.phoenix.schema.SaltingUtil; |
| import org.apache.phoenix.schema.Sequence; |
| import org.apache.phoenix.schema.SequenceAllocation; |
| import org.apache.phoenix.schema.SequenceKey; |
| import org.apache.phoenix.schema.TableAlreadyExistsException; |
| import org.apache.phoenix.schema.TableNotFoundException; |
| import org.apache.phoenix.schema.TableProperty; |
| import org.apache.phoenix.schema.stats.PTableStats; |
| import org.apache.phoenix.schema.stats.StatisticsUtil; |
| import org.apache.phoenix.schema.types.PBoolean; |
| import org.apache.phoenix.schema.types.PDataType; |
| import org.apache.phoenix.schema.types.PInteger; |
| import org.apache.phoenix.schema.types.PLong; |
| import org.apache.phoenix.schema.types.PUnsignedTinyint; |
| import org.apache.phoenix.util.ByteUtil; |
| import org.apache.phoenix.util.Closeables; |
| import org.apache.phoenix.util.ConfigUtil; |
| import org.apache.phoenix.util.JDBCUtil; |
| import org.apache.phoenix.util.MetaDataUtil; |
| import org.apache.phoenix.util.PhoenixContextExecutor; |
| import org.apache.phoenix.util.PhoenixRuntime; |
| import org.apache.phoenix.util.PhoenixStopWatch; |
| import org.apache.phoenix.util.PropertiesUtil; |
| import org.apache.phoenix.util.ReadOnlyProps; |
| import org.apache.phoenix.util.SchemaUtil; |
| import org.apache.phoenix.util.ServerUtil; |
| import org.apache.phoenix.util.UpgradeUtil; |
| import org.apache.twill.discovery.ZKDiscoveryService; |
| import org.apache.twill.zookeeper.RetryStrategies; |
| import org.apache.twill.zookeeper.ZKClientService; |
| import org.apache.twill.zookeeper.ZKClientServices; |
| import org.apache.twill.zookeeper.ZKClients; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Throwables; |
| import com.google.common.cache.Cache; |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| import co.cask.tephra.TransactionSystemClient; |
| import co.cask.tephra.TxConstants; |
| import co.cask.tephra.distributed.PooledClientProvider; |
| import co.cask.tephra.distributed.TransactionServiceClient; |
| |
| |
| public class ConnectionQueryServicesImpl extends DelegateQueryServices implements ConnectionQueryServices { |
| private static final Logger logger = LoggerFactory.getLogger(ConnectionQueryServicesImpl.class); |
| private static final int INITIAL_CHILD_SERVICES_CAPACITY = 100; |
| private static final int DEFAULT_OUT_OF_ORDER_MUTATIONS_WAIT_TIME_MS = 1000; |
| // Max number of cached table stats for view or shared index physical tables |
| private static final int MAX_TABLE_STATS_CACHE_ENTRIES = 512; |
| protected final Configuration config; |
| private final ConnectionInfo connectionInfo; |
| // Copy of config.getProps(), but read-only to prevent synchronization that we |
| // don't need. |
| private final ReadOnlyProps props; |
| private final String userName; |
| private final ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices> childServices; |
| private final Cache<ImmutableBytesPtr, PTableStats> tableStatsCache; |
| |
| // Cache the latest meta data here for future connections |
| // writes guarded by "latestMetaDataLock" |
| private volatile PMetaData latestMetaData; |
| private final Object latestMetaDataLock = new Object(); |
| |
| // Lowest HBase version on the cluster. |
| private int lowestClusterHBaseVersion = Integer.MAX_VALUE; |
| private boolean hasIndexWALCodec = true; |
| |
| @GuardedBy("connectionCountLock") |
| private int connectionCount = 0; |
| private final Object connectionCountLock = new Object(); |
| private final boolean returnSequenceValues ; |
| |
| private HConnection connection; |
| private TransactionServiceClient txServiceClient; |
| private volatile boolean initialized; |
| private volatile int nSequenceSaltBuckets; |
| |
| // writes guarded by "this" |
| private volatile boolean closed; |
| |
| private volatile SQLException initializationException; |
| // setting this member variable guarded by "connectionCountLock" |
| private volatile ConcurrentMap<SequenceKey,Sequence> sequenceMap = Maps.newConcurrentMap(); |
| private KeyValueBuilder kvBuilder; |
| private final int renewLeaseTaskFrequency; |
| private final int renewLeasePoolSize; |
| private final int renewLeaseThreshold; |
| // List of queues instead of a single queue to provide reduced contention via lock striping |
| private final List<LinkedBlockingQueue<WeakReference<PhoenixConnection>>> connectionQueues; |
| private ScheduledExecutorService renewLeaseExecutor; |
| private final boolean renewLeaseEnabled; |
| |
| private static interface FeatureSupported { |
| boolean isSupported(ConnectionQueryServices services); |
| } |
| |
| private final Map<Feature, FeatureSupported> featureMap = ImmutableMap.<Feature, FeatureSupported>of( |
| Feature.LOCAL_INDEX, new FeatureSupported() { |
| @Override |
| public boolean isSupported(ConnectionQueryServices services) { |
| int hbaseVersion = services.getLowestClusterHBaseVersion(); |
| return hbaseVersion < PhoenixDatabaseMetaData.MIN_LOCAL_SI_VERSION_DISALLOW || hbaseVersion > PhoenixDatabaseMetaData.MAX_LOCAL_SI_VERSION_DISALLOW; |
| } |
| }, |
| Feature.RENEW_LEASE, new FeatureSupported() { |
| @Override |
| public boolean isSupported(ConnectionQueryServices services) { |
| int hbaseVersion = services.getLowestClusterHBaseVersion(); |
| return hbaseVersion >= PhoenixDatabaseMetaData.MIN_RENEW_LEASE_VERSION; |
| } |
| }); |
| |
| private PMetaData newEmptyMetaData() { |
| long maxSizeBytes = props.getLong(QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB, |
| QueryServicesOptions.DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE); |
| return new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, maxSizeBytes); |
| } |
| |
| /** |
| * Construct a ConnectionQueryServicesImpl that represents a connection to an HBase |
| * cluster. |
| * @param services base services from where we derive our default configuration |
| * @param connectionInfo to provide connection information |
| * @param info hbase configuration properties |
| * @throws SQLException |
| */ |
| public ConnectionQueryServicesImpl(QueryServices services, ConnectionInfo connectionInfo, Properties info) { |
| super(services); |
| Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); |
| for (Entry<String,String> entry : services.getProps()) { |
| config.set(entry.getKey(), entry.getValue()); |
| } |
| if (info != null) { |
| for (Object key : info.keySet()) { |
| config.set((String) key, info.getProperty((String) key)); |
| } |
| } |
| for (Entry<String,String> entry : connectionInfo.asProps()) { |
| config.set(entry.getKey(), entry.getValue()); |
| } |
| this.connectionInfo = connectionInfo; |
| |
| // Without making a copy of the configuration we cons up, we lose some of our properties |
| // on the server side during testing. |
| this.config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config); |
| // set replication required parameter |
| ConfigUtil.setReplicationConfigIfAbsent(this.config); |
| this.props = new ReadOnlyProps(this.config.iterator()); |
| this.userName = connectionInfo.getPrincipal(); |
| this.latestMetaData = newEmptyMetaData(); |
| // TODO: should we track connection wide memory usage or just org-wide usage? |
| // If connection-wide, create a MemoryManager here, otherwise just use the one from the delegate |
| this.childServices = new ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices>(INITIAL_CHILD_SERVICES_CAPACITY); |
| // find the HBase version and use that to determine the KeyValueBuilder that should be used |
| String hbaseVersion = VersionInfo.getVersion(); |
| this.kvBuilder = KeyValueBuilder.get(hbaseVersion); |
| long halfStatsUpdateFreq = config.getLong( |
| QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, |
| QueryServicesOptions.DEFAULT_STATS_UPDATE_FREQ_MS) / 2; |
| tableStatsCache = CacheBuilder.newBuilder() |
| .maximumSize(MAX_TABLE_STATS_CACHE_ENTRIES) |
| .expireAfterWrite(halfStatsUpdateFreq, TimeUnit.MILLISECONDS) |
| .build(); |
| this.returnSequenceValues = props.getBoolean(QueryServices.RETURN_SEQUENCE_VALUES_ATTRIB, QueryServicesOptions.DEFAULT_RETURN_SEQUENCE_VALUES); |
| this.renewLeaseEnabled = config.getBoolean(RENEW_LEASE_ENABLED, DEFAULT_RENEW_LEASE_ENABLED); |
| this.renewLeasePoolSize = config.getInt(RENEW_LEASE_THREAD_POOL_SIZE, DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE); |
| this.renewLeaseThreshold = config.getInt(RENEW_LEASE_THRESHOLD_MILLISECONDS, DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS); |
| this.renewLeaseTaskFrequency = config.getInt(RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS, DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS); |
| List<LinkedBlockingQueue<WeakReference<PhoenixConnection>>> list = Lists.newArrayListWithCapacity(renewLeasePoolSize); |
| for (int i = 0; i < renewLeasePoolSize; i++) { |
| LinkedBlockingQueue<WeakReference<PhoenixConnection>> queue = new LinkedBlockingQueue<WeakReference<PhoenixConnection>>(); |
| list.add(queue); |
| } |
| connectionQueues = ImmutableList.copyOf(list); |
| } |
| |
| @Override |
| public TransactionSystemClient getTransactionSystemClient() { |
| return txServiceClient; |
| } |
| |
| private void initTxServiceClient() { |
| String zkQuorumServersString = this.getProps().get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM); |
| if (zkQuorumServersString==null) { |
| zkQuorumServersString = connectionInfo.getZookeeperQuorum()+":"+connectionInfo.getPort(); |
| } |
| ZKClientService zkClientService = ZKClientServices.delegate( |
| ZKClients.reWatchOnExpire( |
| ZKClients.retryOnFailure( |
| ZKClientService.Builder.of(zkQuorumServersString) |
| .setSessionTimeout(props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT)) |
| .build(), |
| RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS) |
| ) |
| ) |
| ); |
| zkClientService.startAndWait(); |
| ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService); |
| PooledClientProvider pooledClientProvider = new PooledClientProvider( |
| config, zkDiscoveryService); |
| this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider); |
| } |
| |
| private void openConnection() throws SQLException { |
| try { |
| // check if we need to authenticate with kerberos |
| String clientKeytab = this.getProps().get(HBASE_CLIENT_KEYTAB); |
| String clientPrincipal = this.getProps().get(HBASE_CLIENT_PRINCIPAL); |
| if (clientKeytab != null && clientPrincipal != null) { |
| logger.info("Trying to connect to a secure cluster with keytab:" + clientKeytab); |
| UserGroupInformation.setConfiguration(config); |
| User.login(config, HBASE_CLIENT_KEYTAB, HBASE_CLIENT_PRINCIPAL, null); |
| logger.info("Successfull login to secure cluster!!"); |
| } |
| boolean transactionsEnabled = props.getBoolean( |
| QueryServices.TRANSACTIONS_ENABLED, |
| QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED); |
| // only initialize the tx service client if needed |
| if (transactionsEnabled) { |
| initTxServiceClient(); |
| } |
| this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config); |
| } catch (IOException e) { |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION) |
| .setRootCause(e).build().buildException(); |
| } |
| if (this.connection.isClosed()) { // TODO: why the heck doesn't this throw above? |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION).build().buildException(); |
| } |
| } |
| |
| @Override |
| public HTableInterface getTable(byte[] tableName) throws SQLException { |
| try { |
| return HBaseFactoryProvider.getHTableFactory().getTable(tableName, connection, null); |
| } catch (org.apache.hadoop.hbase.TableNotFoundException e) { |
| throw new TableNotFoundException(SchemaUtil.getSchemaNameFromFullName(tableName), SchemaUtil.getTableNameFromFullName(tableName)); |
| } catch (IOException e) { |
| throw new SQLException(e); |
| } |
| } |
| |
| @Override |
| public HTableDescriptor getTableDescriptor(byte[] tableName) throws SQLException { |
| HTableInterface htable = getTable(tableName); |
| try { |
| return htable.getTableDescriptor(); |
| } catch (IOException e) { |
| if(e instanceof org.apache.hadoop.hbase.TableNotFoundException || |
| e.getCause() instanceof org.apache.hadoop.hbase.TableNotFoundException) { |
| byte[][] schemaAndTableName = new byte[2][]; |
| SchemaUtil.getVarChars(tableName, schemaAndTableName); |
| throw new TableNotFoundException(Bytes.toString(schemaAndTableName[0]), Bytes.toString(schemaAndTableName[1])); |
| } |
| throw new RuntimeException(e); |
| } finally { |
| Closeables.closeQuietly(htable); |
| } |
| } |
| |
| @Override |
| public ReadOnlyProps getProps() { |
| return props; |
| } |
| |
| /** |
| * Closes the underlying connection to zookeeper. The QueryServices |
| * may not be used after that point. When a Connection is closed, |
| * this is not called, since these instances are pooled by the |
| * Driver. Instead, the Driver should call this if the QueryServices |
| * is ever removed from the pool |
| */ |
| @Override |
| public void close() throws SQLException { |
| if (closed) { |
| return; |
| } |
| synchronized (this) { |
| if (closed) { |
| return; |
| } |
| closed = true; |
| SQLException sqlE = null; |
| try { |
| // Attempt to return any unused sequences. |
| if (connection != null) returnAllSequences(this.sequenceMap); |
| } catch (SQLException e) { |
| sqlE = e; |
| } finally { |
| try { |
| childServices.clear(); |
| if (renewLeaseExecutor != null) { |
| renewLeaseExecutor.shutdownNow(); |
| } |
| synchronized (latestMetaDataLock) { |
| latestMetaData = null; |
| latestMetaDataLock.notifyAll(); |
| } |
| if (connection != null) connection.close(); |
| } catch (IOException e) { |
| if (sqlE == null) { |
| sqlE = ServerUtil.parseServerException(e); |
| } else { |
| sqlE.setNextException(ServerUtil.parseServerException(e)); |
| } |
| } finally { |
| try { |
| tableStatsCache.invalidateAll(); |
| super.close(); |
| } catch (SQLException e) { |
| if (sqlE == null) { |
| sqlE = e; |
| } else { |
| sqlE.setNextException(e); |
| } |
| } finally { |
| if (sqlE != null) { throw sqlE; } |
| } |
| } |
| } |
| } |
| } |
| |
| protected ConnectionQueryServices newChildQueryService() { |
| return new ChildQueryServices(this); |
| } |
| |
| /** |
| * Get (and create if necessary) a child QueryService for a given tenantId. |
| * The QueryService will be cached for the lifetime of the parent QueryService |
| * @param tenantId the tenant ID |
| * @return the child QueryService |
| */ |
| @Override |
| public ConnectionQueryServices getChildQueryServices(ImmutableBytesWritable tenantId) { |
| ConnectionQueryServices childQueryService = childServices.get(tenantId); |
| if (childQueryService == null) { |
| childQueryService = newChildQueryService(); |
| ConnectionQueryServices prevQueryService = childServices.putIfAbsent(tenantId, childQueryService); |
| return prevQueryService == null ? childQueryService : prevQueryService; |
| } |
| return childQueryService; |
| } |
| |
| @Override |
| public void clearTableRegionCache(byte[] tableName) throws SQLException { |
| connection.clearRegionCache(TableName.valueOf(tableName)); |
| } |
| |
| @Override |
| public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException { |
| /* |
| * Use HConnection.getRegionLocation as it uses the cache in HConnection, while getting |
| * all region locations from the HTable doesn't. |
| */ |
| int retryCount = 0, maxRetryCount = 1; |
| boolean reload =false; |
| while (true) { |
| try { |
| // We could surface the package projected HConnectionImplementation.getNumberOfCachedRegionLocations |
| // to get the sizing info we need, but this would require a new class in the same package and a cast |
| // to this implementation class, so it's probably not worth it. |
| List<HRegionLocation> locations = Lists.newArrayList(); |
| byte[] currentKey = HConstants.EMPTY_START_ROW; |
| do { |
| HRegionLocation regionLocation = connection.getRegionLocation( |
| TableName.valueOf(tableName), currentKey, reload); |
| locations.add(regionLocation); |
| currentKey = regionLocation.getRegionInfo().getEndKey(); |
| } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)); |
| return locations; |
| } catch (org.apache.hadoop.hbase.TableNotFoundException e) { |
| String fullName = Bytes.toString(tableName); |
| throw new TableNotFoundException(SchemaUtil.getSchemaNameFromFullName(fullName), SchemaUtil.getTableNameFromFullName(fullName)); |
| } catch (IOException e) { |
| if (retryCount++ < maxRetryCount) { // One retry, in case split occurs while navigating |
| reload = true; |
| continue; |
| } |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.GET_TABLE_REGIONS_FAIL) |
| .setRootCause(e).build().buildException(); |
| } |
| } |
| } |
| |
| @Override |
| public PMetaData addTable(PTable table, long resolvedTime) throws SQLException { |
| synchronized (latestMetaDataLock) { |
| try { |
| throwConnectionClosedIfNullMetaData(); |
| // If existing table isn't older than new table, don't replace |
| // If a client opens a connection at an earlier timestamp, this can happen |
| PTable existingTable = latestMetaData.getTableRef(new PTableKey(table.getTenantId(), table.getName().getString())).getTable(); |
| if (existingTable.getTimeStamp() >= table.getTimeStamp()) { |
| return latestMetaData; |
| } |
| } catch (TableNotFoundException e) {} |
| latestMetaData = latestMetaData.addTable(table, resolvedTime); |
| latestMetaDataLock.notifyAll(); |
| return latestMetaData; |
| } |
| } |
| |
| public PMetaData updateResolvedTimestamp(PTable table, long resolvedTime) throws SQLException { |
| synchronized (latestMetaDataLock) { |
| throwConnectionClosedIfNullMetaData(); |
| latestMetaData = latestMetaData.updateResolvedTimestamp(table, resolvedTime); |
| latestMetaDataLock.notifyAll(); |
| return latestMetaData; |
| } |
| } |
| |
| private static interface Mutator { |
| PMetaData mutate(PMetaData metaData) throws SQLException; |
| } |
| |
| /** |
| * Ensures that metaData mutations are handled in the correct order |
| */ |
| private PMetaData metaDataMutated(PName tenantId, String tableName, long tableSeqNum, Mutator mutator) throws SQLException { |
| synchronized (latestMetaDataLock) { |
| throwConnectionClosedIfNullMetaData(); |
| PMetaData metaData = latestMetaData; |
| PTable table; |
| long endTime = System.currentTimeMillis() + DEFAULT_OUT_OF_ORDER_MUTATIONS_WAIT_TIME_MS; |
| while (true) { |
| try { |
| try { |
| table = metaData.getTableRef(new PTableKey(tenantId, tableName)).getTable(); |
| /* If the table is at the prior sequence number, then we're good to go. |
| * We know if we've got this far, that the server validated the mutations, |
| * so we'd just need to wait until the other connection that mutated the same |
| * table is processed. |
| */ |
| if (table.getSequenceNumber() + 1 == tableSeqNum) { |
| // TODO: assert that timeStamp is bigger that table timeStamp? |
| metaData = mutator.mutate(metaData); |
| break; |
| } else if (table.getSequenceNumber() >= tableSeqNum) { |
| logger.warn("Attempt to cache older version of " + tableName + ": current= " + table.getSequenceNumber() + ", new=" + tableSeqNum); |
| break; |
| } |
| } catch (TableNotFoundException e) { |
| } |
| long waitTime = endTime - System.currentTimeMillis(); |
| // We waited long enough - just remove the table from the cache |
| // and the next time it's used it'll be pulled over from the server. |
| if (waitTime <= 0) { |
| logger.warn("Unable to update meta data repo within " + (DEFAULT_OUT_OF_ORDER_MUTATIONS_WAIT_TIME_MS/1000) + " seconds for " + tableName); |
| // There will never be a parentTableName here, as that would only |
| // be non null for an index an we never add/remove columns from an index. |
| metaData = metaData.removeTable(tenantId, tableName, null, HConstants.LATEST_TIMESTAMP); |
| break; |
| } |
| latestMetaDataLock.wait(waitTime); |
| } catch (InterruptedException e) { |
| // restore the interrupt status |
| Thread.currentThread().interrupt(); |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION) |
| .setRootCause(e).build().buildException(); // FIXME |
| } |
| } |
| latestMetaData = metaData; |
| latestMetaDataLock.notifyAll(); |
| return metaData; |
| } |
| } |
| |
| @Override |
| public PMetaData addColumn(final PName tenantId, final String tableName, final List<PColumn> columns, final long tableTimeStamp, |
| final long tableSeqNum, final boolean isImmutableRows, final boolean isWalDisabled, final boolean isMultitenant, |
| final boolean storeNulls, final boolean isTransactional, final long updateCacheFrequency, final long resolvedTime) throws SQLException { |
| return metaDataMutated(tenantId, tableName, tableSeqNum, new Mutator() { |
| @Override |
| public PMetaData mutate(PMetaData metaData) throws SQLException { |
| try { |
| return metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, updateCacheFrequency, resolvedTime); |
| } catch (TableNotFoundException e) { |
| // The DROP TABLE may have been processed first, so just ignore. |
| return metaData; |
| } |
| } |
| }); |
| } |
| |
| @Override |
| public PMetaData removeTable(PName tenantId, final String tableName, String parentTableName, long tableTimeStamp) throws SQLException { |
| synchronized (latestMetaDataLock) { |
| throwConnectionClosedIfNullMetaData(); |
| latestMetaData = latestMetaData.removeTable(tenantId, tableName, parentTableName, tableTimeStamp); |
| latestMetaDataLock.notifyAll(); |
| return latestMetaData; |
| } |
| } |
| |
| @Override |
| public PMetaData removeColumn(final PName tenantId, final String tableName, final List<PColumn> columnsToRemove, final long tableTimeStamp, final long tableSeqNum, final long resolvedTime) throws SQLException { |
| return metaDataMutated(tenantId, tableName, tableSeqNum, new Mutator() { |
| @Override |
| public PMetaData mutate(PMetaData metaData) throws SQLException { |
| try { |
| return metaData.removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime); |
| } catch (TableNotFoundException e) { |
| // The DROP TABLE may have been processed first, so just ignore. |
| return metaData; |
| } |
| } |
| }); |
| } |
| |
| |
| |
| @Override |
| public PhoenixConnection connect(String url, Properties info) throws SQLException { |
| checkClosed(); |
| PMetaData metadata = latestMetaData; |
| if (metadata == null) { |
| throwConnectionClosedException(); |
| } |
| |
| return new PhoenixConnection(this, url, info, metadata); |
| } |
| |
| |
| private HColumnDescriptor generateColumnFamilyDescriptor(Pair<byte[],Map<String,Object>> family, PTableType tableType) throws SQLException { |
| HColumnDescriptor columnDesc = new HColumnDescriptor(family.getFirst()); |
| if (tableType != PTableType.VIEW) { |
| if(props.get(QueryServices.DEFAULT_KEEP_DELETED_CELLS_ATTRIB) != null){ |
| columnDesc.setKeepDeletedCells(props.getBoolean( |
| QueryServices.DEFAULT_KEEP_DELETED_CELLS_ATTRIB, QueryServicesOptions.DEFAULT_KEEP_DELETED_CELLS)); |
| } |
| columnDesc.setDataBlockEncoding(SchemaUtil.DEFAULT_DATA_BLOCK_ENCODING); |
| for (Entry<String,Object> entry : family.getSecond().entrySet()) { |
| String key = entry.getKey(); |
| Object value = entry.getValue(); |
| setHColumnDescriptorValue(columnDesc, key, value); |
| } |
| } |
| return columnDesc; |
| } |
| |
| // Workaround HBASE-14737 |
| private static void setHColumnDescriptorValue(HColumnDescriptor columnDesc, String key, Object value) { |
| if (HConstants.VERSIONS.equals(key)) { |
| columnDesc.setMaxVersions(getMaxVersion(value)); |
| } else { |
| columnDesc.setValue(key, value == null ? null : value.toString()); |
| } |
| } |
| |
| private static int getMaxVersion(Object value) { |
| if (value == null) { |
| return -1; // HColumnDescriptor.UNINITIALIZED is private |
| } |
| if (value instanceof Number) { |
| return ((Number)value).intValue(); |
| } |
| String stringValue = value.toString(); |
| if (stringValue.isEmpty()) { |
| return -1; |
| } |
| return Integer.parseInt(stringValue); |
| } |
| |
| private void modifyColumnFamilyDescriptor(HColumnDescriptor hcd, Map<String,Object> props) throws SQLException { |
| for (Entry<String, Object> entry : props.entrySet()) { |
| String propName = entry.getKey(); |
| Object value = entry.getValue(); |
| setHColumnDescriptorValue(hcd, propName, value); |
| } |
| } |
| |
| private HTableDescriptor generateTableDescriptor(byte[] tableName, HTableDescriptor existingDesc, PTableType tableType, Map<String,Object> tableProps, List<Pair<byte[],Map<String,Object>>> families, byte[][] splits) throws SQLException { |
| String defaultFamilyName = (String)tableProps.remove(PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME); |
| HTableDescriptor tableDescriptor = (existingDesc != null) ? new HTableDescriptor(existingDesc) : |
| new HTableDescriptor(TableName.valueOf(tableName)); |
| for (Entry<String,Object> entry : tableProps.entrySet()) { |
| String key = entry.getKey(); |
| if (!TableProperty.isPhoenixTableProperty(key)) { |
| Object value = entry.getValue(); |
| tableDescriptor.setValue(key, value == null ? null : value.toString()); |
| } |
| } |
| if (families.isEmpty()) { |
| if (tableType != PTableType.VIEW) { |
| byte[] defaultFamilyByes = defaultFamilyName == null ? QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES : Bytes.toBytes(defaultFamilyName); |
| // Add dummy column family so we have key values for tables that |
| HColumnDescriptor columnDescriptor = generateColumnFamilyDescriptor(new Pair<byte[],Map<String,Object>>(defaultFamilyByes,Collections.<String,Object>emptyMap()), tableType); |
| tableDescriptor.addFamily(columnDescriptor); |
| } |
| } else { |
| for (Pair<byte[],Map<String,Object>> family : families) { |
| // If family is only in phoenix description, add it. otherwise, modify its property accordingly. |
| byte[] familyByte = family.getFirst(); |
| if (tableDescriptor.getFamily(familyByte) == null) { |
| if (tableType == PTableType.VIEW) { |
| String fullTableName = Bytes.toString(tableName); |
| throw new ReadOnlyTableException( |
| "The HBase column families for a read-only table must already exist", |
| SchemaUtil.getSchemaNameFromFullName(fullTableName), |
| SchemaUtil.getTableNameFromFullName(fullTableName), |
| Bytes.toString(familyByte)); |
| } |
| HColumnDescriptor columnDescriptor = generateColumnFamilyDescriptor(family, tableType); |
| tableDescriptor.addFamily(columnDescriptor); |
| } else { |
| if (tableType != PTableType.VIEW) { |
| HColumnDescriptor columnDescriptor = tableDescriptor.getFamily(familyByte); |
| if (columnDescriptor == null) { |
| throw new IllegalArgumentException("Unable to find column descriptor with family name " + Bytes.toString(family.getFirst())); |
| } |
| modifyColumnFamilyDescriptor(columnDescriptor, family.getSecond()); |
| } |
| } |
| } |
| } |
| addCoprocessors(tableName, tableDescriptor, tableType, tableProps); |
| return tableDescriptor; |
| } |
| |
| private void addCoprocessors(byte[] tableName, HTableDescriptor descriptor, PTableType tableType, Map<String,Object> tableProps) throws SQLException { |
| // The phoenix jar must be available on HBase classpath |
| int priority = props.getInt(QueryServices.COPROCESSOR_PRIORITY_ATTRIB, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY); |
| try { |
| if (!descriptor.hasCoprocessor(ScanRegionObserver.class.getName())) { |
| descriptor.addCoprocessor(ScanRegionObserver.class.getName(), null, priority, null); |
| } |
| if (!descriptor.hasCoprocessor(UngroupedAggregateRegionObserver.class.getName())) { |
| descriptor.addCoprocessor(UngroupedAggregateRegionObserver.class.getName(), null, priority, null); |
| } |
| if (!descriptor.hasCoprocessor(GroupedAggregateRegionObserver.class.getName())) { |
| descriptor.addCoprocessor(GroupedAggregateRegionObserver.class.getName(), null, priority, null); |
| } |
| if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) { |
| descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, priority, null); |
| } |
| boolean isTransactional = |
| Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) || |
| Boolean.TRUE.equals(tableProps.get(TxConstants.READ_NON_TX_DATA)); // For ALTER TABLE |
| // TODO: better encapsulation for this |
| // Since indexes can't have indexes, don't install our indexing coprocessor for indexes. |
| // Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table because we use |
| // all-or-none mutate class which break when this coprocessor is installed (PHOENIX-1318). |
| if ((tableType != PTableType.INDEX && tableType != PTableType.VIEW) |
| && !SchemaUtil.isMetaTable(tableName) |
| && !SchemaUtil.isStatsTable(tableName)) { |
| if (isTransactional) { |
| if (!descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) { |
| descriptor.addCoprocessor(PhoenixTransactionalIndexer.class.getName(), null, priority, null); |
| } |
| // For alter table, remove non transactional index coprocessor |
| if (descriptor.hasCoprocessor(Indexer.class.getName())) { |
| descriptor.removeCoprocessor(Indexer.class.getName()); |
| } |
| } else { |
| if (!descriptor.hasCoprocessor(Indexer.class.getName())) { |
| // If exception on alter table to transition back to non transactional |
| if (descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) { |
| descriptor.removeCoprocessor(PhoenixTransactionalIndexer.class.getName()); |
| } |
| Map<String, String> opts = Maps.newHashMapWithExpectedSize(1); |
| opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName()); |
| Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority); |
| } |
| } |
| } |
| if (SchemaUtil.isStatsTable(tableName) && !descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName())) { |
| descriptor.addCoprocessor(MultiRowMutationEndpoint.class.getName(), |
| null, priority, null); |
| } |
| |
| if (descriptor.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) != null |
| && Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(descriptor |
| .getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) { |
| if (!descriptor.hasCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName())) { |
| descriptor.addCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName(), |
| null, priority, null); |
| } |
| } else { |
| if (!descriptor.hasCoprocessor(LocalIndexSplitter.class.getName()) |
| && !SchemaUtil.isMetaTable(tableName) |
| && !SchemaUtil.isSequenceTable(tableName)) { |
| descriptor.addCoprocessor(LocalIndexSplitter.class.getName(), null, priority, null); |
| } |
| } |
| |
| // Setup split policy on Phoenix metadata table to ensure that the key values of a Phoenix table |
| // stay on the same region. |
| if (SchemaUtil.isMetaTable(tableName) || SchemaUtil.isFunctionTable(tableName)) { |
| if (!descriptor.hasCoprocessor(MetaDataEndpointImpl.class.getName())) { |
| descriptor.addCoprocessor(MetaDataEndpointImpl.class.getName(), null, priority, null); |
| } |
| if(SchemaUtil.isMetaTable(tableName) ) { |
| if (!descriptor.hasCoprocessor(MetaDataRegionObserver.class.getName())) { |
| descriptor.addCoprocessor(MetaDataRegionObserver.class.getName(), null, priority + 1, null); |
| } |
| } |
| } else if (SchemaUtil.isSequenceTable(tableName)) { |
| if (!descriptor.hasCoprocessor(SequenceRegionObserver.class.getName())) { |
| descriptor.addCoprocessor(SequenceRegionObserver.class.getName(), null, priority, null); |
| } |
| } |
| |
| if (isTransactional) { |
| if (!descriptor.hasCoprocessor(PhoenixTransactionalProcessor.class.getName())) { |
| descriptor.addCoprocessor(PhoenixTransactionalProcessor.class.getName(), null, priority - 10, null); |
| } |
| } else { |
| // If exception on alter table to transition back to non transactional |
| if (descriptor.hasCoprocessor(PhoenixTransactionalProcessor.class.getName())) { |
| descriptor.removeCoprocessor(PhoenixTransactionalProcessor.class.getName()); |
| } |
| } |
| } catch (IOException e) { |
| throw ServerUtil.parseServerException(e); |
| } |
| } |
| |
| private static interface RetriableOperation { |
| boolean checkForCompletion() throws TimeoutException, IOException; |
| String getOperatioName(); |
| } |
| |
| private void pollForUpdatedTableDescriptor(final HBaseAdmin admin, final HTableDescriptor newTableDescriptor, |
| final byte[] tableName) throws InterruptedException, TimeoutException { |
| checkAndRetry(new RetriableOperation() { |
| |
| @Override |
| public String getOperatioName() { |
| return "UpdateOrNewTableDescriptor"; |
| } |
| |
| @Override |
| public boolean checkForCompletion() throws TimeoutException, IOException { |
| HTableDescriptor tableDesc = admin.getTableDescriptor(tableName); |
| return newTableDescriptor.equals(tableDesc); |
| } |
| }); |
| } |
| |
| private void checkAndRetry(RetriableOperation op) throws InterruptedException, TimeoutException { |
| int maxRetries = ConnectionQueryServicesImpl.this.props.getInt( |
| QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK, |
| QueryServicesOptions.DEFAULT_RETRIES_FOR_SCHEMA_UPDATE_CHECK); |
| long sleepInterval = ConnectionQueryServicesImpl.this.props |
| .getLong(QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK, |
| QueryServicesOptions.DEFAULT_DELAY_FOR_SCHEMA_UPDATE_CHECK); |
| boolean success = false; |
| int numTries = 1; |
| PhoenixStopWatch watch = new PhoenixStopWatch(); |
| watch.start(); |
| do { |
| try { |
| success = op.checkForCompletion(); |
| } catch (Exception ex) { |
| // If we encounter any exception on the first or last try, propagate the exception and fail. |
| // Else, we swallow the exception and retry till we reach maxRetries. |
| if (numTries == 1 || numTries == maxRetries) { |
| watch.stop(); |
| TimeoutException toThrow = new TimeoutException("Operation " + op.getOperatioName() |
| + " didn't complete because of exception. Time elapsed: " + watch.elapsedMillis()); |
| toThrow.initCause(ex); |
| throw toThrow; |
| } |
| } |
| numTries++; |
| Thread.sleep(sleepInterval); |
| } while (numTries < maxRetries && !success); |
| |
| watch.stop(); |
| |
| if (!success) { |
| throw new TimeoutException("Operation " + op.getOperatioName() + " didn't complete within " |
| + watch.elapsedMillis() + " ms " |
| + (numTries > 1 ? ("after trying " + numTries + (numTries > 1 ? "times." : "time.")) : "")); |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Operation " |
| + op.getOperatioName() |
| + " completed within " |
| + watch.elapsedMillis() |
| + "ms " |
| + (numTries > 1 ? ("after trying " + numTries + (numTries > 1 ? "times." : "time.")) : "")); |
| } |
| } |
| } |
| |
| private boolean allowOnlineTableSchemaUpdate() { |
| return props.getBoolean( |
| QueryServices.ALLOW_ONLINE_TABLE_SCHEMA_UPDATE, |
| QueryServicesOptions.DEFAULT_ALLOW_ONLINE_TABLE_SCHEMA_UPDATE); |
| } |
| |
| /** |
| * |
| * @param tableName |
| * @param splits |
| * @param modifyExistingMetaData TODO |
| * @return true if table was created and false if it already exists |
| * @throws SQLException |
| */ |
| private HTableDescriptor ensureTableCreated(byte[] tableName, PTableType tableType , Map<String,Object> props, List<Pair<byte[],Map<String,Object>>> families, byte[][] splits, boolean modifyExistingMetaData) throws SQLException { |
| SQLException sqlE = null; |
| HTableDescriptor existingDesc = null; |
| boolean isMetaTable = SchemaUtil.isMetaTable(tableName); |
| boolean tableExist = true; |
| try (HBaseAdmin admin = getAdmin()) { |
| final String quorum = ZKConfig.getZKQuorumServersString(config); |
| final String znode = this.props.get(HConstants.ZOOKEEPER_ZNODE_PARENT); |
| logger.debug("Found quorum: " + quorum + ":" + znode); |
| try { |
| existingDesc = admin.getTableDescriptor(tableName); |
| } catch (org.apache.hadoop.hbase.TableNotFoundException e) { |
| tableExist = false; |
| if (tableType == PTableType.VIEW) { |
| String fullTableName = Bytes.toString(tableName); |
| throw new ReadOnlyTableException( |
| "An HBase table for a VIEW must already exist", |
| SchemaUtil.getSchemaNameFromFullName(fullTableName), |
| SchemaUtil.getTableNameFromFullName(fullTableName)); |
| } |
| } |
| |
| HTableDescriptor newDesc = generateTableDescriptor(tableName, existingDesc, tableType , props, families, splits); |
| |
| if (!tableExist) { |
| if (newDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) != null && Boolean.TRUE.equals( |
| PBoolean.INSTANCE.toObject(newDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) { |
| newDesc.setValue(HTableDescriptor.SPLIT_POLICY, IndexRegionSplitPolicy.class.getName()); |
| } |
| // Remove the splitPolicy attribute to prevent HBASE-12570 |
| if (isMetaTable) { |
| newDesc.remove(HTableDescriptor.SPLIT_POLICY); |
| } |
| try { |
| if (splits == null) { |
| admin.createTable(newDesc); |
| } else { |
| admin.createTable(newDesc, splits); |
| } |
| } catch (TableExistsException e) { |
| // We can ignore this, as it just means that another client beat us |
| // to creating the HBase metadata. |
| return null; |
| } |
| if (isMetaTable) { |
| checkClientServerCompatibility(); |
| /* |
| * Now we modify the table to add the split policy, since we know that the client and |
| * server and compatible. This works around HBASE-12570 which causes the cluster to be |
| * brought down. |
| */ |
| newDesc.setValue(HTableDescriptor.SPLIT_POLICY, MetaDataSplitPolicy.class.getName()); |
| if (allowOnlineTableSchemaUpdate()) { |
| // No need to wait/poll for this update |
| admin.modifyTable(tableName, newDesc); |
| } else { |
| admin.disableTable(tableName); |
| admin.modifyTable(tableName, newDesc); |
| admin.enableTable(tableName); |
| } |
| } |
| return null; |
| } else { |
| if (isMetaTable) { |
| checkClientServerCompatibility(); |
| } |
| |
| if (!modifyExistingMetaData) { |
| return existingDesc; // Caller already knows that no metadata was changed |
| } |
| boolean willBeTx = Boolean.TRUE.equals(props.get(TableProperty.TRANSACTIONAL.name())); |
| // If mapping an existing table as transactional, set property so that existing |
| // data is correctly read. |
| if (willBeTx) { |
| newDesc.setValue(TxConstants.READ_NON_TX_DATA, Boolean.TRUE.toString()); |
| } else { |
| // If we think we're creating a non transactional table when it's already |
| // transactional, don't allow. |
| if (existingDesc.hasCoprocessor(PhoenixTransactionalProcessor.class.getName())) { |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX) |
| .setSchemaName(SchemaUtil.getSchemaNameFromFullName(tableName)) |
| .setTableName(SchemaUtil.getTableNameFromFullName(tableName)).build().buildException(); |
| } |
| newDesc.remove(TxConstants.READ_NON_TX_DATA); |
| } |
| if (existingDesc.equals(newDesc)) { |
| return null; // Indicate that no metadata was changed |
| } |
| |
| modifyTable(tableName, newDesc, true); |
| return newDesc; |
| } |
| |
| } catch (IOException e) { |
| sqlE = ServerUtil.parseServerException(e); |
| } catch (InterruptedException e) { |
| // restore the interrupt status |
| Thread.currentThread().interrupt(); |
| sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException(); |
| } catch (TimeoutException e) { |
| sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT).setRootCause(e.getCause() != null ? e.getCause() : e).build().buildException(); |
| } finally { |
| if (sqlE != null) { |
| throw sqlE; |
| } |
| } |
| return null; // will never make it here |
| } |
| |
| private void modifyTable(byte[] tableName, HTableDescriptor newDesc, boolean shouldPoll) throws IOException, |
| InterruptedException, TimeoutException, SQLException { |
| try (HBaseAdmin admin = getAdmin()) { |
| if (!allowOnlineTableSchemaUpdate()) { |
| admin.disableTable(tableName); |
| admin.modifyTable(tableName, newDesc); |
| admin.enableTable(tableName); |
| } else { |
| admin.modifyTable(tableName, newDesc); |
| if (shouldPoll) { |
| pollForUpdatedTableDescriptor(admin, newDesc, tableName); |
| } |
| } |
| } |
| } |
| |
| private static boolean hasIndexWALCodec(Long serverVersion) { |
| if (serverVersion == null) { |
| return true; |
| } |
| return MetaDataUtil.decodeHasIndexWALCodec(serverVersion); |
| } |
| |
| private static boolean isCompatible(Long serverVersion) { |
| if (serverVersion == null) { |
| return false; |
| } |
| return MetaDataUtil.areClientAndServerCompatible(serverVersion); |
| } |
| |
| private void checkClientServerCompatibility() throws SQLException { |
| StringBuilder buf = new StringBuilder("The following servers require an updated " + QueryConstants.DEFAULT_COPROCESS_PATH + " to be put in the classpath of HBase: "); |
| boolean isIncompatible = false; |
| int minHBaseVersion = Integer.MAX_VALUE; |
| try { |
| List<HRegionLocation> locations = this.getAllTableRegions(SYSTEM_CATALOG_NAME_BYTES); |
| Set<HRegionLocation> serverMap = Sets.newHashSetWithExpectedSize(locations.size()); |
| TreeMap<byte[], HRegionLocation> regionMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); |
| List<byte[]> regionKeys = Lists.newArrayListWithExpectedSize(locations.size()); |
| for (HRegionLocation entry : locations) { |
| if (!serverMap.contains(entry)) { |
| regionKeys.add(entry.getRegionInfo().getStartKey()); |
| regionMap.put(entry.getRegionInfo().getRegionName(), entry); |
| serverMap.add(entry); |
| } |
| } |
| |
| HTableInterface ht = this.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); |
| final Map<byte[], Long> results = |
| ht.coprocessorService(MetaDataService.class, null, null, new Batch.Call<MetaDataService,Long>() { |
| @Override |
| public Long call(MetaDataService instance) throws IOException { |
| ServerRpcController controller = new ServerRpcController(); |
| BlockingRpcCallback<GetVersionResponse> rpcCallback = |
| new BlockingRpcCallback<GetVersionResponse>(); |
| GetVersionRequest.Builder builder = GetVersionRequest.newBuilder(); |
| builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); |
| instance.getVersion(controller, builder.build(), rpcCallback); |
| if(controller.getFailedOn() != null) { |
| throw controller.getFailedOn(); |
| } |
| return rpcCallback.get().getVersion(); |
| } |
| }); |
| for (Map.Entry<byte[],Long> result : results.entrySet()) { |
| // This is the "phoenix.jar" is in-place, but server is out-of-sync with client case. |
| if (!isCompatible(result.getValue())) { |
| isIncompatible = true; |
| HRegionLocation name = regionMap.get(result.getKey()); |
| buf.append(name); |
| buf.append(';'); |
| } |
| hasIndexWALCodec &= hasIndexWALCodec(result.getValue()); |
| if (minHBaseVersion > MetaDataUtil.decodeHBaseVersion(result.getValue())) { |
| minHBaseVersion = MetaDataUtil.decodeHBaseVersion(result.getValue()); |
| } |
| } |
| lowestClusterHBaseVersion = minHBaseVersion; |
| } catch (SQLException e) { |
| throw e; |
| } catch (Throwable t) { |
| // This is the case if the "phoenix.jar" is not on the classpath of HBase on the region server |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.INCOMPATIBLE_CLIENT_SERVER_JAR).setRootCause(t) |
| .setMessage("Ensure that " + QueryConstants.DEFAULT_COPROCESS_PATH + " is put on the classpath of HBase in every region server: " + t.getMessage()) |
| .build().buildException(); |
| } |
| if (isIncompatible) { |
| buf.setLength(buf.length()-1); |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.OUTDATED_JARS).setMessage(buf.toString()).build().buildException(); |
| } |
| } |
| |
| /** |
| * Invoke meta data coprocessor with one retry if the key was found to not be in the regions |
| * (due to a table split) |
| */ |
| private MetaDataMutationResult metaDataCoprocessorExec(byte[] tableKey, |
| Batch.Call<MetaDataService, MetaDataResponse> callable) throws SQLException { |
| return metaDataCoprocessorExec(tableKey, callable, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); |
| } |
| /** |
| * Invoke meta data coprocessor with one retry if the key was found to not be in the regions |
| * (due to a table split) |
| */ |
| private MetaDataMutationResult metaDataCoprocessorExec(byte[] tableKey, |
| Batch.Call<MetaDataService, MetaDataResponse> callable, byte[] tableName) throws SQLException { |
| |
| try { |
| boolean retried = false; |
| while (true) { |
| if (retried) { |
| connection.relocateRegion( |
| TableName.valueOf(tableName), |
| tableKey); |
| } |
| |
| HTableInterface ht = this.getTable(tableName); |
| try { |
| final Map<byte[], MetaDataResponse> results = |
| ht.coprocessorService(MetaDataService.class, tableKey, tableKey, callable); |
| |
| assert(results.size() == 1); |
| MetaDataResponse result = results.values().iterator().next(); |
| if (result.getReturnCode() == MetaDataProtos.MutationCode.TABLE_NOT_IN_REGION |
| || result.getReturnCode() == MetaDataProtos.MutationCode.FUNCTION_NOT_IN_REGION) { |
| if (retried) return MetaDataMutationResult.constructFromProto(result); |
| retried = true; |
| continue; |
| } |
| return MetaDataMutationResult.constructFromProto(result); |
| } finally { |
| Closeables.closeQuietly(ht); |
| } |
| } |
| } catch (IOException e) { |
| throw ServerUtil.parseServerException(e); |
| } catch (Throwable t) { |
| throw new SQLException(t); |
| } |
| } |
| |
| // Our property values are translated using toString, so we need to "string-ify" this. |
| private static final String TRUE_BYTES_AS_STRING = Bytes.toString(PDataType.TRUE_BYTES); |
| |
| private void ensureViewIndexTableCreated(byte[] physicalTableName, Map<String,Object> tableProps, List<Pair<byte[],Map<String,Object>>> families, byte[][] splits, long timestamp) throws SQLException { |
| Long maxFileSize = (Long)tableProps.get(HTableDescriptor.MAX_FILESIZE); |
| if (maxFileSize == null) { |
| maxFileSize = this.props.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE); |
| } |
| byte[] physicalIndexName = MetaDataUtil.getViewIndexPhysicalName(physicalTableName); |
| |
| int indexMaxFileSizePerc; |
| // Get percentage to use from table props first and then fallback to config |
| Integer indexMaxFileSizePercProp = (Integer)tableProps.remove(QueryServices.INDEX_MAX_FILESIZE_PERC_ATTRIB); |
| if (indexMaxFileSizePercProp == null) { |
| indexMaxFileSizePerc = this.props.getInt(QueryServices.INDEX_MAX_FILESIZE_PERC_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_MAX_FILESIZE_PERC); |
| } else { |
| indexMaxFileSizePerc = indexMaxFileSizePercProp; |
| } |
| long indexMaxFileSize = maxFileSize * indexMaxFileSizePerc / 100; |
| tableProps.put(HTableDescriptor.MAX_FILESIZE, indexMaxFileSize); |
| tableProps.put(MetaDataUtil.IS_VIEW_INDEX_TABLE_PROP_NAME, TRUE_BYTES_AS_STRING); |
| HTableDescriptor desc = ensureTableCreated(physicalIndexName, PTableType.TABLE, tableProps, families, splits, false); |
| if (desc != null) { |
| if (!Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(desc.getValue(MetaDataUtil.IS_VIEW_INDEX_TABLE_PROP_BYTES)))) { |
| String fullTableName = Bytes.toString(physicalIndexName); |
| throw new TableAlreadyExistsException( |
| "Unable to create shared physical table for indexes on views.", |
| SchemaUtil.getSchemaNameFromFullName(fullTableName), |
| SchemaUtil.getTableNameFromFullName(fullTableName)); |
| } |
| } |
| } |
| |
| private void ensureLocalIndexTableCreated(byte[] physicalTableName, Map<String,Object> tableProps, List<Pair<byte[],Map<String,Object>>> families, byte[][] splits, long timestamp) throws SQLException { |
| PTable table; |
| String parentTableName = Bytes.toString(physicalTableName, MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX_BYTES.length, |
| physicalTableName.length - MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX_BYTES.length); |
| try { |
| synchronized (latestMetaDataLock) { |
| throwConnectionClosedIfNullMetaData(); |
| table = latestMetaData.getTableRef(new PTableKey(PName.EMPTY_NAME, parentTableName)).getTable(); |
| latestMetaDataLock.notifyAll(); |
| } |
| if (table.getTimeStamp() >= timestamp) { // Table in cache is newer than client timestamp which shouldn't be the case |
| throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString()); |
| } |
| } catch (TableNotFoundException e) { |
| byte[] schemaName = Bytes.toBytes(SchemaUtil.getSchemaNameFromFullName(parentTableName)); |
| byte[] tableName = Bytes.toBytes(SchemaUtil.getTableNameFromFullName(parentTableName)); |
| MetaDataMutationResult result = this.getTable(null, schemaName, tableName, HConstants.LATEST_TIMESTAMP, timestamp); |
| table = result.getTable(); |
| if (table == null) { |
| throw e; |
| } |
| } |
| ensureLocalIndexTableCreated(physicalTableName, tableProps, families, splits); |
| } |
| |
| private void ensureLocalIndexTableCreated(byte[] physicalTableName, Map<String, Object> tableProps, List<Pair<byte[], Map<String, Object>>> families, byte[][] splits) throws SQLException, TableAlreadyExistsException { |
| |
| // If we're not allowing local indexes or the hbase version is too low, |
| // don't create the local index table |
| if ( !this.getProps().getBoolean(QueryServices.ALLOW_LOCAL_INDEX_ATTRIB, QueryServicesOptions.DEFAULT_ALLOW_LOCAL_INDEX) |
| || !this.supportsFeature(Feature.LOCAL_INDEX)) { |
| return; |
| } |
| |
| tableProps.put(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME, TRUE_BYTES_AS_STRING); |
| HTableDescriptor desc = ensureTableCreated(physicalTableName, PTableType.TABLE, tableProps, families, splits, true); |
| if (desc != null) { |
| if (!Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(desc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) { |
| String fullTableName = Bytes.toString(physicalTableName); |
| throw new TableAlreadyExistsException( |
| "Unable to create shared physical table for local indexes.", |
| SchemaUtil.getSchemaNameFromFullName(fullTableName), |
| SchemaUtil.getTableNameFromFullName(fullTableName)); |
| } |
| } |
| } |
| |
| private boolean ensureViewIndexTableDropped(byte[] physicalTableName, long timestamp) throws SQLException { |
| byte[] physicalIndexName = MetaDataUtil.getViewIndexPhysicalName(physicalTableName); |
| HTableDescriptor desc = null; |
| boolean wasDeleted = false; |
| try (HBaseAdmin admin= getAdmin()) { |
| try { |
| desc = admin.getTableDescriptor(physicalIndexName); |
| if (Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(desc.getValue(MetaDataUtil.IS_VIEW_INDEX_TABLE_PROP_BYTES)))) { |
| this.tableStatsCache.invalidate(new ImmutableBytesPtr(physicalIndexName)); |
| final ReadOnlyProps props = this.getProps(); |
| final boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA); |
| if (dropMetadata) { |
| admin.disableTable(physicalIndexName); |
| admin.deleteTable(physicalIndexName); |
| clearTableRegionCache(physicalIndexName); |
| wasDeleted = true; |
| } |
| } |
| } catch (org.apache.hadoop.hbase.TableNotFoundException ignore) { |
| // Ignore, as we may never have created a view index table |
| } |
| } catch (IOException e) { |
| throw ServerUtil.parseServerException(e); |
| } |
| return wasDeleted; |
| } |
| |
| private boolean ensureLocalIndexTableDropped(byte[] physicalTableName, long timestamp) throws SQLException { |
| byte[] physicalIndexName = MetaDataUtil.getLocalIndexPhysicalName(physicalTableName); |
| HTableDescriptor desc = null; |
| boolean wasDeleted = false; |
| try (HBaseAdmin admin = getAdmin()) { |
| try { |
| desc = admin.getTableDescriptor(physicalIndexName); |
| if (Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(desc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) { |
| this.tableStatsCache.invalidate(new ImmutableBytesPtr(physicalIndexName)); |
| final ReadOnlyProps props = this.getProps(); |
| final boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA); |
| if (dropMetadata) { |
| admin.disableTable(physicalIndexName); |
| admin.deleteTable(physicalIndexName); |
| clearTableRegionCache(physicalIndexName); |
| wasDeleted = true; |
| } |
| } |
| } catch (org.apache.hadoop.hbase.TableNotFoundException ignore) { |
| // Ignore, as we may never have created a view index table |
| } |
| } catch (IOException e) { |
| throw ServerUtil.parseServerException(e); |
| } |
| return wasDeleted; |
| } |
| |
| @Override |
| public MetaDataMutationResult createTable(final List<Mutation> tableMetaData, byte[] physicalTableName, PTableType tableType, |
| Map<String,Object> tableProps, final List<Pair<byte[],Map<String,Object>>> families, byte[][] splits) throws SQLException { |
| byte[][] rowKeyMetadata = new byte[3][]; |
| Mutation m = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetaData); |
| byte[] key = m.getRow(); |
| SchemaUtil.getVarChars(key, rowKeyMetadata); |
| byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; |
| byte[] schemaBytes = rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX]; |
| byte[] tableBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]; |
| byte[] tableName = physicalTableName != null ? physicalTableName : SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes); |
| boolean localIndexTable = Boolean.TRUE.equals(tableProps.remove(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME)); |
| |
| if ((tableType == PTableType.VIEW && physicalTableName != null) || (tableType != PTableType.VIEW && physicalTableName == null)) { |
| // For views this will ensure that metadata already exists |
| // For tables and indexes, this will create the metadata if it doesn't already exist |
| ensureTableCreated(tableName, tableType, tableProps, families, splits, true); |
| } |
| ImmutableBytesWritable ptr = new ImmutableBytesWritable(); |
| if (tableType == PTableType.INDEX) { // Index on view |
| // Physical index table created up front for multi tenant |
| // TODO: if viewIndexId is Short.MIN_VALUE, then we don't need to attempt to create it |
| if (physicalTableName != null) { |
| if (localIndexTable) { |
| ensureLocalIndexTableCreated(tableName, tableProps, families, splits, MetaDataUtil.getClientTimeStamp(m)); |
| } else if (!MetaDataUtil.isMultiTenant(m, kvBuilder, ptr)) { |
| ensureViewIndexTableCreated(tenantIdBytes.length == 0 ? null : PNameFactory.newName(tenantIdBytes), physicalTableName, MetaDataUtil.getClientTimeStamp(m)); |
| } |
| } |
| } else if (tableType == PTableType.TABLE && MetaDataUtil.isMultiTenant(m, kvBuilder, ptr)) { // Create view index table up front for multi tenant tables |
| ptr.set(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES); |
| MetaDataUtil.getMutationValue(m, PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME_BYTES, kvBuilder, ptr); |
| List<Pair<byte[],Map<String,Object>>> familiesPlusDefault = null; |
| for (Pair<byte[],Map<String,Object>> family : families) { |
| byte[] cf = family.getFirst(); |
| if (Bytes.compareTo(cf, 0, cf.length, ptr.get(), ptr.getOffset(),ptr.getLength()) == 0) { |
| familiesPlusDefault = families; |
| break; |
| } |
| } |
| // Don't override if default family already present |
| if (familiesPlusDefault == null) { |
| byte[] defaultCF = ByteUtil.copyKeyBytesIfNecessary(ptr); |
| // Only use splits if table is salted, otherwise it may not be applicable |
| // Always add default column family, as we don't know in advance if we'll need it |
| familiesPlusDefault = Lists.newArrayList(families); |
| familiesPlusDefault.add(new Pair<byte[],Map<String,Object>>(defaultCF,Collections.<String,Object>emptyMap())); |
| } |
| ensureViewIndexTableCreated(tableName, tableProps, familiesPlusDefault, MetaDataUtil.isSalted(m, kvBuilder, ptr) ? splits : null, MetaDataUtil.getClientTimeStamp(m)); |
| } |
| |
| byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes); |
| MetaDataMutationResult result = metaDataCoprocessorExec(tableKey, |
| new Batch.Call<MetaDataService, MetaDataResponse>() { |
| @Override |
| public MetaDataResponse call(MetaDataService instance) throws IOException { |
| ServerRpcController controller = new ServerRpcController(); |
| BlockingRpcCallback<MetaDataResponse> rpcCallback = |
| new BlockingRpcCallback<MetaDataResponse>(); |
| CreateTableRequest.Builder builder = CreateTableRequest.newBuilder(); |
| for (Mutation m : tableMetaData) { |
| MutationProto mp = ProtobufUtil.toProto(m); |
| builder.addTableMetadataMutations(mp.toByteString()); |
| } |
| builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); |
| CreateTableRequest build = builder.build(); |
| instance.createTable(controller, build, rpcCallback); |
| if(controller.getFailedOn() != null) { |
| throw controller.getFailedOn(); |
| } |
| return rpcCallback.get(); |
| } |
| }); |
| return result; |
| } |
| |
| @Override |
| public MetaDataMutationResult getTable(final PName tenantId, final byte[] schemaBytes, final byte[] tableBytes, |
| final long tableTimestamp, final long clientTimestamp) throws SQLException { |
| final byte[] tenantIdBytes = tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(); |
| byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes); |
| return metaDataCoprocessorExec(tableKey, |
| new Batch.Call<MetaDataService, MetaDataResponse>() { |
| @Override |
| public MetaDataResponse call(MetaDataService instance) throws IOException { |
| ServerRpcController controller = new ServerRpcController(); |
| BlockingRpcCallback<MetaDataResponse> rpcCallback = |
| new BlockingRpcCallback<MetaDataResponse>(); |
| GetTableRequest.Builder builder = GetTableRequest.newBuilder(); |
| builder.setTenantId(ByteStringer.wrap(tenantIdBytes)); |
| builder.setSchemaName(ByteStringer.wrap(schemaBytes)); |
| builder.setTableName(ByteStringer.wrap(tableBytes)); |
| builder.setTableTimestamp(tableTimestamp); |
| builder.setClientTimestamp(clientTimestamp); |
| builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); |
| instance.getTable(controller, builder.build(), rpcCallback); |
| if(controller.getFailedOn() != null) { |
| throw controller.getFailedOn(); |
| } |
| return rpcCallback.get(); |
| } |
| }); |
| } |
| |
| @Override |
| public MetaDataMutationResult dropTable(final List<Mutation> tableMetaData, final PTableType tableType, final boolean cascade) throws SQLException { |
| byte[][] rowKeyMetadata = new byte[3][]; |
| SchemaUtil.getVarChars(tableMetaData.get(0).getRow(), rowKeyMetadata); |
| byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; |
| byte[] schemaBytes = rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX]; |
| byte[] tableBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]; |
| byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantIdBytes, schemaBytes, tableBytes); |
| final MetaDataMutationResult result = metaDataCoprocessorExec(tableKey, |
| new Batch.Call<MetaDataService, MetaDataResponse>() { |
| @Override |
| public MetaDataResponse call(MetaDataService instance) throws IOException { |
| ServerRpcController controller = new ServerRpcController(); |
| BlockingRpcCallback<MetaDataResponse> rpcCallback = |
| new BlockingRpcCallback<MetaDataResponse>(); |
| DropTableRequest.Builder builder = DropTableRequest.newBuilder(); |
| for (Mutation m : tableMetaData) { |
| MutationProto mp = ProtobufUtil.toProto(m); |
| builder.addTableMetadataMutations(mp.toByteString()); |
| } |
| builder.setTableType(tableType.getSerializedValue()); |
| builder.setCascade(cascade); |
| builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); |
| instance.dropTable(controller, builder.build(), rpcCallback); |
| if(controller.getFailedOn() != null) { |
| throw controller.getFailedOn(); |
| } |
| return rpcCallback.get(); |
| } |
| }); |
| |
| final MutationCode code = result.getMutationCode(); |
| switch(code) { |
| case TABLE_ALREADY_EXISTS: |
| ReadOnlyProps props = this.getProps(); |
| boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA); |
| if (dropMetadata) { |
| dropTables(result.getTableNamesToDelete()); |
| } |
| invalidateTables(result.getTableNamesToDelete()); |
| if (tableType == PTableType.TABLE) { |
| byte[] physicalName = SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes); |
| long timestamp = MetaDataUtil.getClientTimeStamp(tableMetaData); |
| ensureViewIndexTableDropped(physicalName, timestamp); |
| ensureLocalIndexTableDropped(physicalName, timestamp); |
| tableStatsCache.invalidate(new ImmutableBytesPtr(physicalName)); |
| } |
| break; |
| default: |
| break; |
| } |
| return result; |
| } |
| |
| @Override |
| public MetaDataMutationResult dropFunction(final List<Mutation> functionData, final boolean ifExists) throws SQLException { |
| byte[][] rowKeyMetadata = new byte[2][]; |
| byte[] key = functionData.get(0).getRow(); |
| SchemaUtil.getVarChars(key, rowKeyMetadata); |
| byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; |
| byte[] functionBytes = rowKeyMetadata[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX]; |
| byte[] functionKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionBytes); |
| |
| final MetaDataMutationResult result = metaDataCoprocessorExec(functionKey, |
| new Batch.Call<MetaDataService, MetaDataResponse>() { |
| @Override |
| public MetaDataResponse call(MetaDataService instance) throws IOException { |
| ServerRpcController controller = new ServerRpcController(); |
| BlockingRpcCallback<MetaDataResponse> rpcCallback = |
| new BlockingRpcCallback<MetaDataResponse>(); |
| DropFunctionRequest.Builder builder = DropFunctionRequest.newBuilder(); |
| for (Mutation m : functionData) { |
| MutationProto mp = ProtobufUtil.toProto(m); |
| builder.addTableMetadataMutations(mp.toByteString()); |
| } |
| builder.setIfExists(ifExists); |
| builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); |
| instance.dropFunction(controller, builder.build(), rpcCallback); |
| if(controller.getFailedOn() != null) { |
| throw controller.getFailedOn(); |
| } |
| return rpcCallback.get(); |
| } |
| }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES); |
| return result; |
| } |
| private void invalidateTables(final List<byte[]> tableNamesToDelete) { |
| if (tableNamesToDelete != null) { |
| for ( byte[] tableName : tableNamesToDelete ) { |
| tableStatsCache.invalidate(new ImmutableBytesPtr(tableName)); |
| } |
| } |
| } |
| |
| private void dropTables(final List<byte[]> tableNamesToDelete) throws SQLException { |
| try (HBaseAdmin admin = getAdmin()){ |
| if (tableNamesToDelete != null){ |
| for ( byte[] tableName : tableNamesToDelete ) { |
| if ( admin.tableExists(tableName) ) { |
| admin.disableTable(tableName); |
| admin.deleteTable(tableName); |
| clearTableRegionCache(tableName); |
| } |
| } |
| } |
| |
| } catch (IOException e) { |
| throw ServerUtil.parseServerException(e); |
| } |
| } |
| |
| private static Map<String,Object> createPropertiesMap(Map<ImmutableBytesWritable,ImmutableBytesWritable> htableProps) { |
| Map<String,Object> props = Maps.newHashMapWithExpectedSize(htableProps.size()); |
| for (Map.Entry<ImmutableBytesWritable,ImmutableBytesWritable> entry : htableProps.entrySet()) { |
| ImmutableBytesWritable key = entry.getKey(); |
| ImmutableBytesWritable value = entry.getValue(); |
| props.put(Bytes.toString(key.get(), key.getOffset(), key.getLength()), Bytes.toString(value.get(), value.getOffset(), value.getLength())); |
| } |
| return props; |
| } |
| |
| private void ensureViewIndexTableCreated(PName tenantId, byte[] physicalIndexTableName, long timestamp) throws SQLException { |
| PTable table; |
| String name = Bytes.toString( |
| physicalIndexTableName, |
| MetaDataUtil.VIEW_INDEX_TABLE_PREFIX_BYTES.length, |
| physicalIndexTableName.length-MetaDataUtil.VIEW_INDEX_TABLE_PREFIX_BYTES.length); |
| try { |
| PMetaData metadata = latestMetaData; |
| if (metadata == null) { |
| throwConnectionClosedException(); |
| } |
| table = metadata.getTableRef(new PTableKey(tenantId, name)).getTable(); |
| if (table.getTimeStamp() >= timestamp) { // Table in cache is newer than client timestamp which shouldn't be the case |
| throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString()); |
| } |
| } catch (TableNotFoundException e) { |
| byte[] schemaName = Bytes.toBytes(SchemaUtil.getSchemaNameFromFullName(name)); |
| byte[] tableName = Bytes.toBytes(SchemaUtil.getTableNameFromFullName(name)); |
| MetaDataMutationResult result = this.getTable(null, schemaName, tableName, HConstants.LATEST_TIMESTAMP, timestamp); |
| table = result.getTable(); |
| if (table == null) { |
| throw e; |
| } |
| } |
| ensureViewIndexTableCreated(table, timestamp); |
| } |
| |
| private void ensureViewIndexTableCreated(PTable table, long timestamp) throws SQLException { |
| byte[] physicalTableName = table.getPhysicalName().getBytes(); |
| HTableDescriptor htableDesc = this.getTableDescriptor(physicalTableName); |
| Map<String,Object> tableProps = createPropertiesMap(htableDesc.getValues()); |
| List<Pair<byte[],Map<String,Object>>> families = Lists.newArrayListWithExpectedSize(Math.max(1, table.getColumnFamilies().size()+1)); |
| if (families.isEmpty()) { |
| byte[] familyName = SchemaUtil.getEmptyColumnFamily(table); |
| Map<String,Object> familyProps = createPropertiesMap(htableDesc.getFamily(familyName).getValues()); |
| families.add(new Pair<byte[],Map<String,Object>>(familyName, familyProps)); |
| } else { |
| for (PColumnFamily family : table.getColumnFamilies()) { |
| byte[] familyName = family.getName().getBytes(); |
| Map<String,Object> familyProps = createPropertiesMap(htableDesc.getFamily(familyName).getValues()); |
| families.add(new Pair<byte[],Map<String,Object>>(familyName, familyProps)); |
| } |
| // Always create default column family, because we don't know in advance if we'll |
| // need it for an index with no covered columns. |
| families.add(new Pair<byte[],Map<String,Object>>(table.getDefaultFamilyName().getBytes(), Collections.<String,Object>emptyMap())); |
| } |
| byte[][] splits = null; |
| if (table.getBucketNum() != null) { |
| splits = SaltingUtil.getSalteByteSplitPoints(table.getBucketNum()); |
| } |
| |
| ensureViewIndexTableCreated(physicalTableName, tableProps, families, splits, timestamp); |
| } |
| |
| @Override |
| public MetaDataMutationResult addColumn(final List<Mutation> tableMetaData, PTable table, Map<String, List<Pair<String,Object>>> stmtProperties, Set<String> colFamiliesForPColumnsToBeAdded) throws SQLException { |
| List<Pair<byte[], Map<String, Object>>> families = new ArrayList<>(stmtProperties.size()); |
| Map<String, Object> tableProps = new HashMap<String, Object>(); |
| Set<HTableDescriptor> tableDescriptors = Collections.emptySet(); |
| Set<HTableDescriptor> origTableDescriptors = Collections.emptySet(); |
| boolean nonTxToTx = false; |
| Pair<HTableDescriptor,HTableDescriptor> tableDescriptorPair = separateAndValidateProperties(table, stmtProperties, colFamiliesForPColumnsToBeAdded, families, tableProps); |
| HTableDescriptor tableDescriptor = tableDescriptorPair.getSecond(); |
| HTableDescriptor origTableDescriptor = tableDescriptorPair.getFirst(); |
| if (tableDescriptor != null) { |
| tableDescriptors = Sets.newHashSetWithExpectedSize(3 + table.getIndexes().size()); |
| origTableDescriptors = Sets.newHashSetWithExpectedSize(3 + table.getIndexes().size()); |
| tableDescriptors.add(tableDescriptor); |
| origTableDescriptors.add(origTableDescriptor); |
| nonTxToTx = Boolean.TRUE.equals(tableProps.get(TxConstants.READ_NON_TX_DATA)); |
| /* |
| * If the table was transitioned from non transactional to transactional, we need |
| * to also transition the index tables. |
| */ |
| if (nonTxToTx) { |
| updateDescriptorForTx(table, tableProps, tableDescriptor, Boolean.TRUE.toString(), tableDescriptors, origTableDescriptors); |
| } |
| } |
| |
| boolean success = false; |
| boolean metaDataUpdated = !tableDescriptors.isEmpty(); |
| boolean pollingNeeded = !(!tableProps.isEmpty() && families.isEmpty() && colFamiliesForPColumnsToBeAdded.isEmpty()); |
| MetaDataMutationResult result = null; |
| try { |
| boolean modifyHTable = true; |
| if (table.getType() == PTableType.VIEW) { |
| boolean canViewsAddNewCF = props.getBoolean(QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE, |
| QueryServicesOptions.DEFAULT_ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE); |
| // When adding a column to a view, base physical table should only be modified when new column families are being added. |
| modifyHTable = canViewsAddNewCF && !existingColumnFamiliesForBaseTable(table.getPhysicalName()).containsAll(colFamiliesForPColumnsToBeAdded); |
| } |
| if (modifyHTable) { |
| sendHBaseMetaData(tableDescriptors, pollingNeeded); |
| } |
| |
| // Special case for call during drop table to ensure that the empty column family exists. |
| // In this, case we only include the table header row, as until we add schemaBytes and tableBytes |
| // as args to this function, we have no way of getting them in this case. |
| // TODO: change to if (tableMetaData.isEmpty()) once we pass through schemaBytes and tableBytes |
| // Also, could be used to update property values on ALTER TABLE t SET prop=xxx |
| if ((tableMetaData.isEmpty()) || (tableMetaData.size() == 1 && tableMetaData.get(0).isEmpty())) { |
| return new MetaDataMutationResult(MutationCode.NO_OP, System.currentTimeMillis(), table); |
| } |
| byte[][] rowKeyMetaData = new byte[3][]; |
| PTableType tableType = table.getType(); |
| |
| Mutation m = tableMetaData.get(0); |
| byte[] rowKey = m.getRow(); |
| SchemaUtil.getVarChars(rowKey, rowKeyMetaData); |
| byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; |
| byte[] schemaBytes = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX]; |
| byte[] tableBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]; |
| byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes); |
| |
| ImmutableBytesWritable ptr = new ImmutableBytesWritable(); |
| result = metaDataCoprocessorExec(tableKey, |
| new Batch.Call<MetaDataService, MetaDataResponse>() { |
| @Override |
| public MetaDataResponse call(MetaDataService instance) throws IOException { |
| ServerRpcController controller = new ServerRpcController(); |
| BlockingRpcCallback<MetaDataResponse> rpcCallback = |
| new BlockingRpcCallback<MetaDataResponse>(); |
| AddColumnRequest.Builder builder = AddColumnRequest.newBuilder(); |
| for (Mutation m : tableMetaData) { |
| MutationProto mp = ProtobufUtil.toProto(m); |
| builder.addTableMetadataMutations(mp.toByteString()); |
| } |
| builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); |
| instance.addColumn(controller, builder.build(), rpcCallback); |
| if(controller.getFailedOn() != null) { |
| throw controller.getFailedOn(); |
| } |
| return rpcCallback.get(); |
| } |
| }); |
| |
| if (result.getMutationCode() == MutationCode.COLUMN_NOT_FOUND || result.getMutationCode() == MutationCode.TABLE_ALREADY_EXISTS) { // Success |
| success = true; |
| // Flush the table if transitioning DISABLE_WAL from TRUE to FALSE |
| if ( MetaDataUtil.getMutationValue(m,PhoenixDatabaseMetaData.DISABLE_WAL_BYTES, kvBuilder, ptr) |
| && Boolean.FALSE.equals(PBoolean.INSTANCE.toObject(ptr))) { |
| flushTable(table.getPhysicalName().getBytes()); |
| } |
| |
| if (tableType == PTableType.TABLE) { |
| // If we're changing MULTI_TENANT to true or false, create or drop the view index table |
| if (MetaDataUtil.getMutationValue(m, PhoenixDatabaseMetaData.MULTI_TENANT_BYTES, kvBuilder, ptr)){ |
| long timestamp = MetaDataUtil.getClientTimeStamp(m); |
| if (Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(ptr.get(), ptr.getOffset(), ptr.getLength()))) { |
| this.ensureViewIndexTableCreated(table, timestamp); |
| } else { |
| this.ensureViewIndexTableDropped(table.getPhysicalName().getBytes(), timestamp); |
| } |
| } |
| } |
| } |
| } finally { |
| // If we weren't successful with our metadata update |
| // and we've already pushed the HBase metadata changes to the server |
| // and we've tried to go from non transactional to transactional |
| // then we must undo the metadata change otherwise the table will |
| // no longer function correctly. |
| // Note that if this fails, we're in a corrupt state. |
| if (!success && metaDataUpdated && nonTxToTx) { |
| sendHBaseMetaData(origTableDescriptors, pollingNeeded); |
| } |
| } |
| return result; |
| } |
| private void updateDescriptorForTx(PTable table, Map<String, Object> tableProps, HTableDescriptor tableDescriptor, |
| String txValue, Set<HTableDescriptor> descriptorsToUpdate, Set<HTableDescriptor> origDescriptors) throws SQLException { |
| byte[] physicalTableName = table.getPhysicalName().getBytes(); |
| try (HBaseAdmin admin = getAdmin()) { |
| setTransactional(tableDescriptor, table.getType(), txValue, tableProps); |
| Map<String, Object> indexTableProps; |
| if (txValue == null) { |
| indexTableProps = Collections.<String,Object>emptyMap(); |
| } else { |
| indexTableProps = Maps.newHashMapWithExpectedSize(1); |
| indexTableProps.put(TxConstants.READ_NON_TX_DATA, Boolean.valueOf(txValue)); |
| } |
| for (PTable index : table.getIndexes()) { |
| HTableDescriptor indexDescriptor = admin.getTableDescriptor(index.getPhysicalName().getBytes()); |
| origDescriptors.add(indexDescriptor); |
| indexDescriptor = new HTableDescriptor(indexDescriptor); |
| descriptorsToUpdate.add(indexDescriptor); |
| if (index.getColumnFamilies().isEmpty()) { |
| byte[] dataFamilyName = SchemaUtil.getEmptyColumnFamily(table); |
| byte[] indexFamilyName = SchemaUtil.getEmptyColumnFamily(index); |
| HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(indexFamilyName); |
| HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(dataFamilyName); |
| indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions()); |
| indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL)); |
| } else { |
| for (PColumnFamily family : index.getColumnFamilies()) { |
| byte[] familyName = family.getName().getBytes(); |
| indexDescriptor.getFamily(familyName).setMaxVersions(tableDescriptor.getFamily(familyName).getMaxVersions()); |
| HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(familyName); |
| HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(familyName); |
| indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions()); |
| indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL)); |
| } |
| } |
| setTransactional(indexDescriptor, index.getType(), txValue, indexTableProps); |
| } |
| try { |
| HTableDescriptor indexDescriptor = admin.getTableDescriptor(MetaDataUtil.getViewIndexPhysicalName(physicalTableName)); |
| origDescriptors.add(indexDescriptor); |
| indexDescriptor = new HTableDescriptor(indexDescriptor); |
| descriptorsToUpdate.add(indexDescriptor); |
| setSharedIndexMaxVersion(table, tableDescriptor, indexDescriptor); |
| setTransactional(indexDescriptor, PTableType.INDEX, txValue, indexTableProps); |
| } catch (org.apache.hadoop.hbase.TableNotFoundException ignore) { |
| // Ignore, as we may never have created a view index table |
| } |
| try { |
| HTableDescriptor indexDescriptor = admin.getTableDescriptor(MetaDataUtil.getLocalIndexPhysicalName(physicalTableName)); |
| origDescriptors.add(indexDescriptor); |
| indexDescriptor = new HTableDescriptor(indexDescriptor); |
| descriptorsToUpdate.add(indexDescriptor); |
| setSharedIndexMaxVersion(table, tableDescriptor, indexDescriptor); |
| setTransactional(indexDescriptor, PTableType.INDEX, txValue, indexTableProps); |
| } catch (org.apache.hadoop.hbase.TableNotFoundException ignore) { |
| // Ignore, as we may never have created a view index table |
| } |
| } catch (IOException e) { |
| throw ServerUtil.parseServerException(e); |
| } |
| } |
| private void setSharedIndexMaxVersion(PTable table, HTableDescriptor tableDescriptor, |
| HTableDescriptor indexDescriptor) { |
| if (table.getColumnFamilies().isEmpty()) { |
| byte[] familyName = SchemaUtil.getEmptyColumnFamily(table); |
| HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(familyName); |
| HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(familyName); |
| indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions()); |
| indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL)); |
| } else { |
| for (PColumnFamily family : table.getColumnFamilies()) { |
| byte[] familyName = family.getName().getBytes(); |
| HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(familyName); |
| if (indexColDescriptor != null) { |
| HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(familyName); |
| indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions()); |
| indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL)); |
| } |
| } |
| } |
| } |
| |
| private void sendHBaseMetaData(Set<HTableDescriptor> tableDescriptors, boolean pollingNeeded) throws SQLException { |
| SQLException sqlE = null; |
| for (HTableDescriptor descriptor : tableDescriptors) { |
| try { |
| modifyTable(descriptor.getName(), descriptor, pollingNeeded); |
| } catch (IOException e) { |
| sqlE = ServerUtil.parseServerException(e); |
| } catch (InterruptedException e) { |
| // restore the interrupt status |
| Thread.currentThread().interrupt(); |
| sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException(); |
| } catch (TimeoutException e) { |
| sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT).setRootCause(e.getCause() != null ? e.getCause() : e).build().buildException(); |
| } finally { |
| if (sqlE != null) { |
| throw sqlE; |
| } |
| } |
| } |
| } |
| private void setTransactional(HTableDescriptor tableDescriptor, PTableType tableType, String txValue, Map<String, Object> tableProps) throws SQLException { |
| if (txValue == null) { |
| tableDescriptor.remove(TxConstants.READ_NON_TX_DATA); |
| } else { |
| tableDescriptor.setValue(TxConstants.READ_NON_TX_DATA, txValue); |
| } |
| this.addCoprocessors(tableDescriptor.getName(), tableDescriptor, tableType, tableProps); |
| } |
| |
| private Pair<HTableDescriptor,HTableDescriptor> separateAndValidateProperties(PTable table, Map<String, List<Pair<String, Object>>> properties, Set<String> colFamiliesForPColumnsToBeAdded, List<Pair<byte[], Map<String, Object>>> families, Map<String, Object> tableProps) throws SQLException { |
| Map<String, Map<String, Object>> stmtFamiliesPropsMap = new HashMap<>(properties.size()); |
| Map<String,Object> commonFamilyProps = new HashMap<>(); |
| boolean addingColumns = colFamiliesForPColumnsToBeAdded != null && !colFamiliesForPColumnsToBeAdded.isEmpty(); |
| HashSet<String> existingColumnFamilies = existingColumnFamilies(table); |
| Map<String, Map<String, Object>> allFamiliesProps = new HashMap<>(existingColumnFamilies.size()); |
| boolean isTransactional = table.isTransactional(); |
| boolean willBeTransactional = false; |
| boolean isOrWillBeTransactional = isTransactional; |
| Integer newTTL = null; |
| for (String family : properties.keySet()) { |
| List<Pair<String, Object>> propsList = properties.get(family); |
| if (propsList != null && propsList.size() > 0) { |
| Map<String, Object> colFamilyPropsMap = new HashMap<String, Object>(propsList.size()); |
| for (Pair<String, Object> prop : propsList) { |
| String propName = prop.getFirst(); |
| Object propValue = prop.getSecond(); |
| if ((isHTableProperty(propName) || TableProperty.isPhoenixTableProperty(propName)) && addingColumns) { |
| // setting HTable and PhoenixTable properties while adding a column is not allowed. |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_TABLE_PROPERTY_ADD_COLUMN) |
| .setMessage("Property: " + propName).build() |
| .buildException(); |
| } |
| if (isHTableProperty(propName)) { |
| // Can't have a column family name for a property that's an HTableProperty |
| if (!family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY)) { |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY) |
| .setMessage("Column Family: " + family + ", Property: " + propName).build() |
| .buildException(); |
| } |
| tableProps.put(propName, propValue); |
| } else { |
| if (TableProperty.isPhoenixTableProperty(propName)) { |
| TableProperty.valueOf(propName).validate(true, !family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY), table.getType()); |
| if (propName.equals(TTL)) { |
| newTTL = ((Number)prop.getSecond()).intValue(); |
| // Even though TTL is really a HColumnProperty we treat it specially. |
| // We enforce that all column families have the same TTL. |
| commonFamilyProps.put(propName, prop.getSecond()); |
| } else if (propName.equals(PhoenixDatabaseMetaData.TRANSACTIONAL) && Boolean.TRUE.equals(propValue)) { |
| willBeTransactional = isOrWillBeTransactional = true; |
| tableProps.put(TxConstants.READ_NON_TX_DATA, propValue); |
| } |
| } else { |
| if (isHColumnProperty(propName)) { |
| if (family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY)) { |
| commonFamilyProps.put(propName, propValue); |
| } else { |
| colFamilyPropsMap.put(propName, propValue); |
| } |
| } else { |
| // invalid property - neither of HTableProp, HColumnProp or PhoenixTableProp |
| // FIXME: This isn't getting triggered as currently a property gets evaluated |
| // as HTableProp if its neither HColumnProp or PhoenixTableProp. |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ALTER_PROPERTY) |
| .setMessage("Column Family: " + family + ", Property: " + propName).build() |
| .buildException(); |
| } |
| } |
| } |
| } |
| if (!colFamilyPropsMap.isEmpty()) { |
| stmtFamiliesPropsMap.put(family, colFamilyPropsMap); |
| } |
| |
| } |
| } |
| commonFamilyProps = Collections.unmodifiableMap(commonFamilyProps); |
| boolean isAddingPkColOnly = colFamiliesForPColumnsToBeAdded.size() == 1 && colFamiliesForPColumnsToBeAdded.contains(null); |
| if (!commonFamilyProps.isEmpty()) { |
| if (!addingColumns) { |
| // Add the common family props to all existing column families |
| for (String existingColFamily : existingColumnFamilies) { |
| Map<String, Object> m = new HashMap<String, Object>(commonFamilyProps.size()); |
| m.putAll(commonFamilyProps); |
| allFamiliesProps.put(existingColFamily, m); |
| } |
| } else { |
| // Add the common family props to the column families of the columns being added |
| for (String colFamily : colFamiliesForPColumnsToBeAdded) { |
| if (colFamily != null) { |
| // only set properties for key value columns |
| Map<String, Object> m = new HashMap<String, Object>(commonFamilyProps.size()); |
| m.putAll(commonFamilyProps); |
| allFamiliesProps.put(colFamily, m); |
| } else if (isAddingPkColOnly) { |
| // Setting HColumnProperty for a pk column is invalid |
| // because it will be part of the row key and not a key value column family. |
| // However, if both pk cols as well as key value columns are getting added |
| // together, then its allowed. The above if block will make sure that we add properties |
| // only for the kv cols and not pk cols. |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.SET_UNSUPPORTED_PROP_ON_ALTER_TABLE) |
| .build().buildException(); |
| } |
| } |
| } |
| } |
| |
| // Now go through the column family properties specified in the statement |
| // and merge them with the common family properties. |
| for (String f : stmtFamiliesPropsMap.keySet()) { |
| if (!addingColumns && !existingColumnFamilies.contains(f)) { |
| throw new ColumnFamilyNotFoundException(f); |
| } |
| if (addingColumns && !colFamiliesForPColumnsToBeAdded.contains(f)) { |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_PROPERTY_FOR_COLUMN_NOT_ADDED).build().buildException(); |
| } |
| Map<String, Object> commonProps = allFamiliesProps.get(f); |
| Map<String, Object> stmtProps = stmtFamiliesPropsMap.get(f); |
| if (commonProps != null) { |
| if (stmtProps != null) { |
| // merge common props with statement props for the family |
| commonProps.putAll(stmtProps); |
| } |
| } else { |
| // if no common props were specified, then assign family specific props |
| if (stmtProps != null) { |
| allFamiliesProps.put(f, stmtProps); |
| } |
| } |
| } |
| |
| // case when there is a column family being added but there are no props |
| // For ex - in DROP COLUMN when a new empty CF needs to be added since all |
| // the columns of the existing empty CF are getting dropped. Or the case |
| // when one is just adding a column for a column family like this: |
| // ALTER TABLE ADD CF.COL |
| for (String cf : colFamiliesForPColumnsToBeAdded) { |
| if (cf != null && allFamiliesProps.get(cf) == null) { |
| allFamiliesProps.put(cf, new HashMap<String, Object>()); |
| } |
| } |
| |
| if (table.getColumnFamilies().isEmpty() && !addingColumns && !commonFamilyProps.isEmpty()) { |
| allFamiliesProps.put(Bytes.toString(table.getDefaultFamilyName() == null ? QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES : table.getDefaultFamilyName().getBytes() ), commonFamilyProps); |
| } |
| |
| // Views are not allowed to have any of these properties. |
| if (table.getType() == PTableType.VIEW && (!stmtFamiliesPropsMap.isEmpty() || !commonFamilyProps.isEmpty() || !tableProps.isEmpty())) { |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.VIEW_WITH_PROPERTIES).build() |
| .buildException(); |
| } |
| |
| HTableDescriptor newTableDescriptor = null; |
| HTableDescriptor origTableDescriptor = null; |
| if (!allFamiliesProps.isEmpty() || !tableProps.isEmpty()) { |
| byte[] tableNameBytes = Bytes.toBytes(table.getPhysicalName().getString()); |
| HTableDescriptor existingTableDescriptor = origTableDescriptor = getTableDescriptor(tableNameBytes); |
| newTableDescriptor = new HTableDescriptor(existingTableDescriptor); |
| if (!tableProps.isEmpty()) { |
| // add all the table properties to the existing table descriptor |
| for (Entry<String, Object> entry : tableProps.entrySet()) { |
| newTableDescriptor.setValue(entry.getKey(), entry.getValue() != null ? entry.getValue().toString() : null); |
| } |
| } |
| if (addingColumns) { |
| // Make sure that all the CFs of the table have the same TTL as the empty CF. |
| setTTLForNewCFs(allFamiliesProps, table, newTableDescriptor, newTTL); |
| } |
| // Set TTL on all table column families, even if they're not referenced here |
| if (newTTL != null) { |
| for (PColumnFamily family : table.getColumnFamilies()) { |
| if (!allFamiliesProps.containsKey(family.getName().getString())) { |
| Map<String,Object> familyProps = Maps.newHashMapWithExpectedSize(1); |
| familyProps.put(TTL, newTTL); |
| allFamiliesProps.put(family.getName().getString(), familyProps); |
| } |
| } |
| } |
| Integer defaultTxMaxVersions = null; |
| if (isOrWillBeTransactional) { |
| // Calculate default for max versions |
| Map<String, Object> emptyFamilyProps = allFamiliesProps.get(SchemaUtil.getEmptyColumnFamilyAsString(table)); |
| if (emptyFamilyProps != null) { |
| defaultTxMaxVersions = (Integer)emptyFamilyProps.get(HConstants.VERSIONS); |
| } |
| if (defaultTxMaxVersions == null) { |
| if (isTransactional) { |
| defaultTxMaxVersions = newTableDescriptor.getFamily(SchemaUtil.getEmptyColumnFamily(table)).getMaxVersions(); |
| } else { |
| defaultTxMaxVersions = |
| this.getProps().getInt( |
| QueryServices.MAX_VERSIONS_TRANSACTIONAL_ATTRIB, |
| QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL); |
| } |
| } |
| if (willBeTransactional) { |
| // Set VERSIONS for all column families when transitioning to transactional |
| for (PColumnFamily family : table.getColumnFamilies()) { |
| if (!allFamiliesProps.containsKey(family.getName().getString())) { |
| Map<String,Object> familyProps = Maps.newHashMapWithExpectedSize(1); |
| familyProps.put(HConstants.VERSIONS, defaultTxMaxVersions); |
| allFamiliesProps.put(family.getName().getString(), familyProps); |
| } |
| } |
| } |
| } |
| // Set Tephra's TTL property based on HBase property if we're |
| // transitioning to become transactional or setting TTL on |
| // an already transactional table. |
| if (isOrWillBeTransactional) { |
| int ttl = getTTL(table, newTableDescriptor, newTTL); |
| if (ttl != HColumnDescriptor.DEFAULT_TTL) { |
| for (Map.Entry<String, Map<String, Object>> entry : allFamiliesProps.entrySet()) { |
| Map<String, Object> props = entry.getValue(); |
| if (props == null) { |
| props = new HashMap<String, Object>(); |
| } |
| props.put(TxConstants.PROPERTY_TTL, ttl); |
| // Remove HBase TTL if we're not transitioning an existing table to become transactional |
| // or if the existing transactional table wasn't originally non transactional. |
| if (!willBeTransactional && !Boolean.valueOf(newTableDescriptor.getValue(TxConstants.READ_NON_TX_DATA))) { |
| props.remove(TTL); |
| } |
| } |
| } |
| } |
| for (Entry<String, Map<String, Object>> entry : allFamiliesProps.entrySet()) { |
| Map<String,Object> familyProps = entry.getValue(); |
| if (isOrWillBeTransactional) { |
| if (!familyProps.containsKey(HConstants.VERSIONS)) { |
| familyProps.put(HConstants.VERSIONS, defaultTxMaxVersions); |
| } |
| } |
| byte[] cf = Bytes.toBytes(entry.getKey()); |
| HColumnDescriptor colDescriptor = newTableDescriptor.getFamily(cf); |
| if (colDescriptor == null) { |
| // new column family |
| colDescriptor = generateColumnFamilyDescriptor(new Pair<>(cf, familyProps), table.getType()); |
| newTableDescriptor.addFamily(colDescriptor); |
| } else { |
| modifyColumnFamilyDescriptor(colDescriptor, familyProps); |
| } |
| if (isOrWillBeTransactional) { |
| checkTransactionalVersionsValue(colDescriptor); |
| } |
| } |
| } |
| return new Pair<>(origTableDescriptor, newTableDescriptor); |
| } |
| |
| private void checkTransactionalVersionsValue(HColumnDescriptor colDescriptor) throws SQLException { |
| int maxVersions = colDescriptor.getMaxVersions(); |
| if (maxVersions <= 1) { |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MAX_VERSIONS_MUST_BE_GREATER_THAN_ONE) |
| .setFamilyName(colDescriptor.getNameAsString()) |
| .build().buildException(); |
| } |
| } |
| |
| private boolean isHColumnProperty(String propName) { |
| return HColumnDescriptor.getDefaultValues().containsKey(propName); |
| } |
| |
| private boolean isHTableProperty(String propName) { |
| return !isHColumnProperty(propName) && !TableProperty.isPhoenixTableProperty(propName); |
| } |
| |
| private HashSet<String> existingColumnFamiliesForBaseTable(PName baseTableName) throws TableNotFoundException { |
| synchronized (latestMetaDataLock) { |
| throwConnectionClosedIfNullMetaData(); |
| PTable table = latestMetaData.getTableRef(new PTableKey(null, baseTableName.getString())).getTable(); |
| latestMetaDataLock.notifyAll(); |
| return existingColumnFamilies(table); |
| } |
| } |
| |
| private HashSet<String> existingColumnFamilies(PTable table) { |
| List<PColumnFamily> cfs = table.getColumnFamilies(); |
| HashSet<String> cfNames = new HashSet<>(cfs.size()); |
| for (PColumnFamily cf : table.getColumnFamilies()) { |
| cfNames.add(cf.getName().getString()); |
| } |
| return cfNames; |
| } |
| |
| private static int getTTL(PTable table, HTableDescriptor tableDesc, Integer newTTL) throws SQLException { |
| // If we're setting TTL now, then use that value. Otherwise, use empty column family value |
| int ttl = newTTL != null ? newTTL |
| : tableDesc.getFamily(SchemaUtil.getEmptyColumnFamily(table)).getTimeToLive(); |
| return ttl; |
| } |
| |
| private static void setTTLForNewCFs(Map<String, Map<String, Object>> familyProps, PTable table, |
| HTableDescriptor tableDesc, Integer newTTL) throws SQLException { |
| if (!familyProps.isEmpty()) { |
| int ttl = getTTL(table, tableDesc, newTTL); |
| for (Map.Entry<String, Map<String, Object>> entry : familyProps.entrySet()) { |
| Map<String, Object> props = entry.getValue(); |
| if (props == null) { |
| props = new HashMap<String, Object>(); |
| } |
| props.put(TTL, ttl); |
| } |
| } |
| } |
| |
| @Override |
| public MetaDataMutationResult dropColumn(final List<Mutation> tableMetaData, PTableType tableType) throws SQLException { |
| byte[][] rowKeyMetadata = new byte[3][]; |
| SchemaUtil.getVarChars(tableMetaData.get(0).getRow(), rowKeyMetadata); |
| byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; |
| byte[] schemaBytes = rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX]; |
| byte[] tableBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]; |
| byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes); |
| MetaDataMutationResult result = metaDataCoprocessorExec(tableKey, |
| new Batch.Call<MetaDataService, MetaDataResponse>() { |
| @Override |
| public MetaDataResponse call(MetaDataService instance) throws IOException { |
| ServerRpcController controller = new ServerRpcController(); |
| BlockingRpcCallback<MetaDataResponse> rpcCallback = |
| new BlockingRpcCallback<MetaDataResponse>(); |
| DropColumnRequest.Builder builder = DropColumnRequest.newBuilder(); |
| for (Mutation m : tableMetaData) { |
| MutationProto mp = ProtobufUtil.toProto(m); |
| builder.addTableMetadataMutations(mp.toByteString()); |
| } |
| builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); |
| instance.dropColumn(controller, builder.build(), rpcCallback); |
| if(controller.getFailedOn() != null) { |
| throw controller.getFailedOn(); |
| } |
| return rpcCallback.get(); |
| } |
| }); |
| final MutationCode code = result.getMutationCode(); |
| switch(code) { |
| case TABLE_ALREADY_EXISTS: |
| final ReadOnlyProps props = this.getProps(); |
| final boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA); |
| if (dropMetadata) { |
| dropTables(result.getTableNamesToDelete()); |
| } |
| invalidateTables(result.getTableNamesToDelete()); |
| break; |
| default: |
| break; |
| } |
| return result; |
| |
| } |
| |
| /** |
| * This closes the passed connection. |
| */ |
| private PhoenixConnection addColumn(PhoenixConnection oldMetaConnection, String tableName, long timestamp, String columns, boolean addIfNotExists) throws SQLException { |
| Properties props = PropertiesUtil.deepCopy(oldMetaConnection.getClientInfo()); |
| props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timestamp)); |
| // Cannot go through DriverManager or you end up in an infinite loop because it'll call init again |
| PhoenixConnection metaConnection = new PhoenixConnection(oldMetaConnection, this, props); |
| SQLException sqlE = null; |
| try { |
| metaConnection.createStatement().executeUpdate("ALTER TABLE " + tableName + " ADD " + (addIfNotExists ? " IF NOT EXISTS " : "") + columns ); |
| } catch (NewerTableAlreadyExistsException e) { |
| logger.warn("Table already modified at this timestamp, so assuming add of these columns already done: " + columns); |
| } catch (SQLException e) { |
| logger.warn("Add column failed due to:" + e); |
| sqlE = e; |
| } finally { |
| try { |
| oldMetaConnection.close(); |
| } catch (SQLException e) { |
| if (sqlE != null) { |
| sqlE.setNextException(e); |
| } else { |
| sqlE = e; |
| } |
| } |
| if (sqlE != null) { |
| throw sqlE; |
| } |
| } |
| return metaConnection; |
| } |
| |
| /** |
| * Keeping this to use for further upgrades. This method closes the oldMetaConnection. |
| */ |
| private PhoenixConnection addColumnsIfNotExists(PhoenixConnection oldMetaConnection, |
| String tableName, long timestamp, String columns) throws SQLException { |
| return addColumn(oldMetaConnection, tableName, timestamp, columns, true); |
| } |
| |
| @Override |
| public void init(final String url, final Properties props) throws SQLException { |
| try { |
| PhoenixContextExecutor.call(new Callable<Void>() { |
| @Override |
| public Void call() throws Exception { |
| if (initialized) { |
| if (initializationException != null) { |
| // Throw previous initialization exception, as we won't resuse this instance |
| throw initializationException; |
| } |
| return null; |
| } |
| synchronized (ConnectionQueryServicesImpl.this) { |
| if (initialized) { |
| if (initializationException != null) { |
| // Throw previous initialization exception, as we won't resuse this instance |
| throw initializationException; |
| } |
| return null; |
| } |
| checkClosed(); |
| PhoenixConnection metaConnection = null; |
| try { |
| openConnection(); |
| Properties scnProps = PropertiesUtil.deepCopy(props); |
| scnProps.setProperty( |
| PhoenixRuntime.CURRENT_SCN_ATTRIB, |
| Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP)); |
| scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB); |
| String globalUrl = JDBCUtil.removeProperty(url, PhoenixRuntime.TENANT_ID_ATTRIB); |
| metaConnection = new PhoenixConnection( |
| ConnectionQueryServicesImpl.this, globalUrl, scnProps, newEmptyMetaData()); |
| try { |
| metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA); |
| } catch (NewerTableAlreadyExistsException ignore) { |
| // Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed timestamp. |
| // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp. |
| } catch (TableAlreadyExistsException e) { |
| // This will occur if we have an older SYSTEM.CATALOG and we need to update it to include |
| // any new columns we've added. |
| long currentServerSideTableTimeStamp = e.getTable().getTimeStamp(); |
| |
| String columnsToAdd = ""; |
| if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) { |
| // We know that we always need to add the STORE_NULLS column for 4.3 release |
| columnsToAdd += "," + PhoenixDatabaseMetaData.STORE_NULLS + " " + PBoolean.INSTANCE.getSqlTypeName(); |
| try (HBaseAdmin admin = getAdmin()) { |
| HTableDescriptor[] localIndexTables = admin.listTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX+".*"); |
| for (HTableDescriptor table : localIndexTables) { |
| if (table.getValue(MetaDataUtil.PARENT_TABLE_KEY) == null |
| && table.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME) != null) { |
| table.setValue(MetaDataUtil.PARENT_TABLE_KEY, |
| MetaDataUtil.getUserTableName(table |
| .getNameAsString())); |
| // Explicitly disable, modify and enable the table to ensure co-location of data |
| // and index regions. If we just modify the table descriptor when online schema |
| // change enabled may reopen the region in same region server instead of following data region. |
| admin.disableTable(table.getTableName()); |
| admin.modifyTable(table.getTableName(), table); |
| admin.enableTable(table.getTableName()); |
| } |
| } |
| } |
| } |
| |
| // If the server side schema is before MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0 then |
| // we need to add INDEX_TYPE and INDEX_DISABLE_TIMESTAMP columns too. |
| // TODO: Once https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed, |
| // we should just have a ALTER TABLE ADD IF NOT EXISTS statement with all |
| // the column names that have been added to SYSTEM.CATALOG since 4.0. |
| if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) { |
| columnsToAdd += ", " + PhoenixDatabaseMetaData.INDEX_TYPE + " " + PUnsignedTinyint.INSTANCE.getSqlTypeName() |
| + ", " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName(); |
| } |
| |
| // If we have some new columns from 4.1-4.3 to add, add them now. |
| if (!columnsToAdd.isEmpty()) { |
| // Ugh..need to assign to another local variable to keep eclipse happy. |
| PhoenixConnection newMetaConnection = addColumnsIfNotExists(metaConnection, |
| PhoenixDatabaseMetaData.SYSTEM_CATALOG, |
| MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0, columnsToAdd); |
| metaConnection = newMetaConnection; |
| } |
| |
| if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0) { |
| columnsToAdd = PhoenixDatabaseMetaData.BASE_COLUMN_COUNT + " " |
| + PInteger.INSTANCE.getSqlTypeName(); |
| try { |
| metaConnection = addColumn(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, |
| MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0, columnsToAdd, false); |
| upgradeTo4_5_0(metaConnection); |
| } catch (ColumnAlreadyExistsException ignored) { |
| /* |
| * Upgrade to 4.5 is a slightly special case. We use the fact that the column |
| * BASE_COLUMN_COUNT is already part of the meta-data schema as the signal that |
| * the server side upgrade has finished or is in progress. |
| */ |
| logger.debug("No need to run 4.5 upgrade"); |
| } |
| Properties props = PropertiesUtil.deepCopy(metaConnection.getClientInfo()); |
| props.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB); |
| props.remove(PhoenixRuntime.TENANT_ID_ATTRIB); |
| PhoenixConnection conn = new PhoenixConnection(ConnectionQueryServicesImpl.this, metaConnection.getURL(), props, metaConnection.getMetaDataCache()); |
| try { |
| List<String> tablesNeedingUpgrade = UpgradeUtil.getPhysicalTablesWithDescRowKey(conn); |
| if (!tablesNeedingUpgrade.isEmpty()) { |
| logger.warn("The following tables require upgrade due to a bug causing the row key to be incorrect for descending columns and ascending BINARY columns (PHOENIX-2067 and PHOENIX-2120):\n" + Joiner.on(' ').join(tablesNeedingUpgrade) + "\nTo upgrade issue the \"bin/psql.py -u\" command."); |
| } |
| List<String> unsupportedTables = UpgradeUtil.getPhysicalTablesWithDescVarbinaryRowKey(conn); |
| if (!unsupportedTables.isEmpty()) { |
| logger.warn("The following tables use an unsupported VARBINARY DESC construct and need to be changed:\n" + Joiner.on(' ').join(unsupportedTables)); |
| } |
| } catch (Exception ex) { |
| logger.error("Unable to determine tables requiring upgrade due to PHOENIX-2067", ex); |
| } finally { |
| conn.close(); |
| } |
| } |
| if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0) { |
| columnsToAdd = PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP + " " + PBoolean.INSTANCE.getSqlTypeName(); |
| metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, |
| MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0, columnsToAdd); |
| } |
| if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) { |
| // Add these columns one at a time, each with different timestamps so that if folks have |
| // run the upgrade code already for a snapshot, we'll still enter this block (and do the |
| // parts we haven't yet done). |
| metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 2, |
| PhoenixDatabaseMetaData.TRANSACTIONAL + " " + PBoolean.INSTANCE.getSqlTypeName()); |
| // Drop old stats table so that new stats table is created |
| metaConnection = dropStatsTable(metaConnection, |
| MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 1); |
| metaConnection = addColumnsIfNotExists(metaConnection, |
| PhoenixDatabaseMetaData.SYSTEM_CATALOG, |
| MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, |
| PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY + " " |
| + PLong.INSTANCE.getSqlTypeName()); |
| setImmutableTableIndexesImmutable(metaConnection); |
| // that already have cached data. |
| clearCache(); |
| } |
| |
| } |
| int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB, |
| QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS); |
| try { |
| String createSequenceTable = Sequence.getCreateTableStatement(nSaltBuckets); |
| metaConnection.createStatement().executeUpdate(createSequenceTable); |
| nSequenceSaltBuckets = nSaltBuckets; |
| } catch (NewerTableAlreadyExistsException e) { |
| // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed timestamp. |
| // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp. |
| nSequenceSaltBuckets = getSaltBuckets(e); |
| } catch (TableAlreadyExistsException e) { |
| // This will occur if we have an older SYSTEM.SEQUENCE and we need to update it to include |
| // any new columns we've added. |
| long currentServerSideTableTimeStamp = e.getTable().getTimeStamp(); |
| if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) { |
| // If the table time stamp is before 4.1.0 then we need to add below columns |
| // to the SYSTEM.SEQUENCE table. |
| String columnsToAdd = PhoenixDatabaseMetaData.MIN_VALUE + " " + PLong.INSTANCE.getSqlTypeName() |
| + ", " + PhoenixDatabaseMetaData.MAX_VALUE + " " + PLong.INSTANCE.getSqlTypeName() |
| + ", " + PhoenixDatabaseMetaData.CYCLE_FLAG + " " + PBoolean.INSTANCE.getSqlTypeName() |
| + ", " + PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG + " " + PBoolean.INSTANCE.getSqlTypeName(); |
| addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, |
| MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd); |
| } |
| // If the table timestamp is before 4.2.1 then run the upgrade script |
| if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_2_1) { |
| if (UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets, e.getTable())) { |
| metaConnection.removeTable(null, |
| PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME, |
| PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME, |
| MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP); |
| clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY, |
| PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME_BYTES, |
| PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES, |
| MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP); |
| clearTableRegionCache(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES); |
| } |
| nSequenceSaltBuckets = nSaltBuckets; |
| } else { |
| nSequenceSaltBuckets = getSaltBuckets(e); |
| } |
| |
| } |
| try { |
| metaConnection.createStatement().executeUpdate( |
| QueryConstants.CREATE_STATS_TABLE_METADATA); |
| } catch (NewerTableAlreadyExistsException ignore) { |
| } catch(TableAlreadyExistsException ignore) { |
| metaConnection = addColumnsIfNotExists( |
| metaConnection, |
| PhoenixDatabaseMetaData.SYSTEM_STATS_NAME, |
| MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, |
| PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT + " " |
| + PLong.INSTANCE.getSqlTypeName()); |
| } |
| try { |
| metaConnection.createStatement().executeUpdate( |
| QueryConstants.CREATE_FUNCTION_METADATA); |
| } catch (NewerTableAlreadyExistsException e) { |
| } catch (TableAlreadyExistsException e) { |
| } |
| scheduleRenewLeaseTasks(); |
| } catch (Exception e) { |
| if (e instanceof SQLException) { |
| initializationException = (SQLException)e; |
| } else { |
| // wrap every other exception into a SQLException |
| initializationException = new SQLException(e); |
| } |
| } finally { |
| try { |
| if (metaConnection != null) metaConnection.close(); |
| } catch (SQLException e) { |
| if (initializationException != null) { |
| initializationException.setNextException(e); |
| } else { |
| initializationException = e; |
| } |
| } finally { |
| try { |
| if (initializationException != null) { |
| throw initializationException; |
| } |
| } finally { |
| initialized = true; |
| } |
| } |
| } |
| } |
| return null; |
| } |
| }); |
| } catch (Exception e) { |
| Throwables.propagateIfInstanceOf(e, SQLException.class); |
| throw Throwables.propagate(e); |
| } |
| } |
| |
| private void scheduleRenewLeaseTasks() { |
| if (isRenewingLeasesEnabled()) { |
| ThreadFactory threadFactory = |
| new ThreadFactoryBuilder().setDaemon(true) |
| .setNameFormat("PHOENIX-SCANNER-RENEW-LEASE" + "-thread-%s").build(); |
| renewLeaseExecutor = |
| Executors.newScheduledThreadPool(renewLeasePoolSize, threadFactory); |
| for (LinkedBlockingQueue<WeakReference<PhoenixConnection>> q : connectionQueues) { |
| renewLeaseExecutor.scheduleAtFixedRate(new RenewLeaseTask(q), 0, |
| renewLeaseTaskFrequency, TimeUnit.MILLISECONDS); |
| } |
| } |
| } |
| |
| /** |
| * Set IMMUTABLE_ROWS to true for all index tables over immutable tables. |
| * @param metaConnection connection over which to run the upgrade |
| * @throws SQLException |
| */ |
| private static void setImmutableTableIndexesImmutable(PhoenixConnection metaConnection) throws SQLException { |
| boolean autoCommit = metaConnection.getAutoCommit(); |
| try { |
| metaConnection.setAutoCommit(true); |
| metaConnection.createStatement().execute( |
| "UPSERT INTO SYSTEM.CATALOG(TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, IMMUTABLE_ROWS)\n" + |
| "SELECT A.TENANT_ID, A.TABLE_SCHEM,B.COLUMN_FAMILY,null,null,true\n" + |
| "FROM SYSTEM.CATALOG A JOIN SYSTEM.CATALOG B ON (\n" + |
| " A.TENANT_ID = B.TENANT_ID AND \n" + |
| " A.TABLE_SCHEM = B.TABLE_SCHEM AND\n" + |
| " A.TABLE_NAME = B.TABLE_NAME AND\n" + |
| " A.COLUMN_NAME = B.COLUMN_NAME AND\n" + |
| " B.LINK_TYPE = 1\n" + |
| ")\n" + |
| "WHERE A.COLUMN_FAMILY IS NULL AND\n" + |
| " B.COLUMN_FAMILY IS NOT NULL AND\n" + |
| " A.IMMUTABLE_ROWS = TRUE"); |
| } finally { |
| metaConnection.setAutoCommit(autoCommit); |
| } |
| } |
| |
| private PhoenixConnection dropStatsTable(PhoenixConnection oldMetaConnection, long timestamp) |
| throws SQLException, IOException { |
| Properties props = PropertiesUtil.deepCopy(oldMetaConnection.getClientInfo()); |
| props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timestamp)); |
| PhoenixConnection metaConnection = new PhoenixConnection(oldMetaConnection, this, props); |
| SQLException sqlE = null; |
| boolean wasCommit = metaConnection.getAutoCommit(); |
| try { |
| metaConnection.setAutoCommit(true); |
| metaConnection.createStatement() |
| .executeUpdate("DELETE FROM " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE " |
| + PhoenixDatabaseMetaData.TABLE_NAME + "='" + PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE |
| + "' AND " + PhoenixDatabaseMetaData.TABLE_SCHEM + "='" |
| + PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME + "'"); |
| } catch (SQLException e) { |
| logger.warn("exception during upgrading stats table:" + e); |
| sqlE = e; |
| } finally { |
| try { |
| metaConnection.setAutoCommit(wasCommit); |
| oldMetaConnection.close(); |
| } catch (SQLException e) { |
| if (sqlE != null) { |
| sqlE.setNextException(e); |
| } else { |
| sqlE = e; |
| } |
| } |
| if (sqlE != null) { |
| throw sqlE; |
| } |
| } |
| return metaConnection; |
| } |
| |
| private static int getSaltBuckets(TableAlreadyExistsException e) { |
| PTable table = e.getTable(); |
| Integer sequenceSaltBuckets = table == null ? null : table.getBucketNum(); |
| return sequenceSaltBuckets == null ? 0 : sequenceSaltBuckets; |
| } |
| |
| @Override |
| public MutationState updateData(MutationPlan plan) throws SQLException { |
| MutationState state = plan.execute(); |
| plan.getContext().getConnection().commit(); |
| return state; |
| } |
| |
| @Override |
| public int getLowestClusterHBaseVersion() { |
| return lowestClusterHBaseVersion; |
| } |
| |
| @Override |
| public boolean hasIndexWALCodec() { |
| return hasIndexWALCodec; |
| } |
| |
| /** |
| * Clears the Phoenix meta data cache on each region server |
| * @throws SQLException |
| */ |
| @Override |
| public long clearCache() throws SQLException { |
| try { |
| SQLException sqlE = null; |
| HTableInterface htable = this.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); |
| try { |
| final Map<byte[], Long> results = |
| htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW, |
| HConstants.EMPTY_END_ROW, new Batch.Call<MetaDataService, Long>() { |
| @Override |
| public Long call(MetaDataService instance) throws IOException { |
| ServerRpcController controller = new ServerRpcController(); |
| BlockingRpcCallback<ClearCacheResponse> rpcCallback = |
| new BlockingRpcCallback<ClearCacheResponse>(); |
| ClearCacheRequest.Builder builder = ClearCacheRequest.newBuilder(); |
| builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); |
| instance.clearCache(controller, builder.build(), rpcCallback); |
| if(controller.getFailedOn() != null) { |
| throw controller.getFailedOn(); |
| } |
| return rpcCallback.get().getUnfreedBytes(); |
| } |
| }); |
| long unfreedBytes = 0; |
| for (Map.Entry<byte[],Long> result : results.entrySet()) { |
| if (result.getValue() != null) { |
| unfreedBytes += result.getValue(); |
| } |
| } |
| return unfreedBytes; |
| } catch (IOException e) { |
| throw ServerUtil.parseServerException(e); |
| } catch (Throwable e) { |
| sqlE = new SQLException(e); |
| } finally { |
| try { |
| tableStatsCache.invalidateAll(); |
| htable.close(); |
| } catch (IOException e) { |
| if (sqlE == null) { |
| sqlE = ServerUtil.parseServerException(e); |
| } else { |
| sqlE.setNextException(ServerUtil.parseServerException(e)); |
| } |
| } finally { |
| if (sqlE != null) { |
| throw sqlE; |
| } |
| } |
| } |
| } catch (Exception e) { |
| throw new SQLException(ServerUtil.parseServerException(e)); |
| } |
| return 0; |
| } |
| |
| private void flushTable(byte[] tableName) throws SQLException { |
| HBaseAdmin admin = getAdmin(); |
| try { |
| admin.flush(tableName); |
| } catch (IOException e) { |
| throw new PhoenixIOException(e); |
| } catch (InterruptedException e) { |
| // restore the interrupt status |
| Thread.currentThread().interrupt(); |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build() |
| .buildException(); |
| } finally { |
| Closeables.closeQuietly(admin); |
| } |
| } |
| |
| @Override |
| public HBaseAdmin getAdmin() throws SQLException { |
| try { |
| // do not use the HBaseAdmin(this.config) constructor |
| // since it always establishes a new HConnection which is expensive. |
| return new HBaseAdmin(connection); |
| } catch (IOException e) { |
| throw new PhoenixIOException(e); |
| } |
| } |
| |
| @Override |
| public MetaDataMutationResult updateIndexState(final List<Mutation> tableMetaData, String parentTableName) throws SQLException { |
| byte[][] rowKeyMetadata = new byte[3][]; |
| SchemaUtil.getVarChars(tableMetaData.get(0).getRow(), rowKeyMetadata); |
| byte[] tableKey = SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY, rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX], rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]); |
| return metaDataCoprocessorExec(tableKey, |
| new Batch.Call<MetaDataService, MetaDataResponse>() { |
| @Override |
| public MetaDataResponse call(MetaDataService instance) throws IOException { |
| ServerRpcController controller = new ServerRpcController(); |
| BlockingRpcCallback<MetaDataResponse> rpcCallback = |
| new BlockingRpcCallback<MetaDataResponse>(); |
| UpdateIndexStateRequest.Builder builder = UpdateIndexStateRequest.newBuilder(); |
| for (Mutation m : tableMetaData) { |
| MutationProto mp = ProtobufUtil.toProto(m); |
| builder.addTableMetadataMutations(mp.toByteString()); |
| } |
| builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); |
| instance.updateIndexState(controller, builder.build(), rpcCallback); |
| if(controller.getFailedOn() != null) { |
| throw controller.getFailedOn(); |
| } |
| return rpcCallback.get(); |
| } |
| }); |
| } |
| |
| @Override |
| public long createSequence(String tenantId, String schemaName, String sequenceName, |
| long startWith, long incrementBy, long cacheSize, long minValue, long maxValue, |
| boolean cycle, long timestamp) throws SQLException { |
| SequenceKey sequenceKey = new SequenceKey(tenantId, schemaName, sequenceName, nSequenceSaltBuckets); |
| Sequence newSequences = new Sequence(sequenceKey); |
| Sequence sequence = sequenceMap.putIfAbsent(sequenceKey, newSequences); |
| if (sequence == null) { |
| sequence = newSequences; |
| } |
| try { |
| sequence.getLock().lock(); |
| // Now that we have the lock we need, create the sequence |
| Append append = sequence.createSequence(startWith, incrementBy, cacheSize, timestamp, minValue, maxValue, cycle); |
| HTableInterface htable = |
| this.getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES); |
| htable.setAutoFlush(true); |
| try { |
| Result result = htable.append(append); |
| return sequence.createSequence(result, minValue, maxValue, cycle); |
| } catch (IOException e) { |
| throw ServerUtil.parseServerException(e); |
| } finally { |
| Closeables.closeQuietly(htable); |
| } |
| } finally { |
| sequence.getLock().unlock(); |
| } |
| } |
| |
| @Override |
| public long dropSequence(String tenantId, String schemaName, String sequenceName, long timestamp) throws SQLException { |
| SequenceKey sequenceKey = new SequenceKey(tenantId, schemaName, sequenceName, nSequenceSaltBuckets); |
| Sequence newSequences = new Sequence(sequenceKey); |
| Sequence sequence = sequenceMap.putIfAbsent(sequenceKey, newSequences); |
| if (sequence == null) { |
| sequence = newSequences; |
| } |
| try { |
| sequence.getLock().lock(); |
| // Now that we have the lock we need, create the sequence |
| Append append = sequence.dropSequence(timestamp); |
| HTableInterface htable = this.getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES); |
| try { |
| Result result = htable.append(append); |
| return sequence.dropSequence(result); |
| } catch (IOException e) { |
| throw ServerUtil.parseServerException(e); |
| } finally { |
| Closeables.closeQuietly(htable); |
| } |
| } finally { |
| sequence.getLock().unlock(); |
| } |
| } |
| |
| /** |
| * Gets the current sequence value |
| * @throws SQLException if cached sequence cannot be found |
| */ |
| @Override |
| public long currentSequenceValue(SequenceKey sequenceKey, long timestamp) throws SQLException { |
| Sequence sequence = sequenceMap.get(sequenceKey); |
| if (sequence == null) { |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CALL_CURRENT_BEFORE_NEXT_VALUE) |
| .setSchemaName(sequenceKey.getSchemaName()).setTableName(sequenceKey.getSequenceName()) |
| .build().buildException(); |
| } |
| sequence.getLock().lock(); |
| try { |
| return sequence.currentValue(timestamp); |
| } catch (EmptySequenceCacheException e) { |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CALL_CURRENT_BEFORE_NEXT_VALUE) |
| .setSchemaName(sequenceKey.getSchemaName()).setTableName(sequenceKey.getSequenceName()) |
| .build().buildException(); |
| } finally { |
| sequence.getLock().unlock(); |
| } |
| } |
| |
| /** |
| * Verifies that sequences exist and reserves values for them if reserveValues is true |
| */ |
| @Override |
| public void validateSequences(List<SequenceAllocation> sequenceAllocations, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException { |
| incrementSequenceValues(sequenceAllocations, timestamp, values, exceptions, action); |
| } |
| |
| /** |
| * Increment any of the set of sequences that need more values. These are the sequences |
| * that are asking for the next value within a given statement. The returned sequences |
| * are the ones that were not found because they were deleted by another client. |
| * @param sequenceKeys sorted list of sequence kyes |
| * @param timestamp |
| * @throws SQLException if any of the sequences cannot be found |
| * |
| */ |
| @Override |
| public void incrementSequences(List<SequenceAllocation> sequenceAllocations, long timestamp, long[] values, SQLException[] exceptions) throws SQLException { |
| incrementSequenceValues(sequenceAllocations, timestamp, values, exceptions, Sequence.ValueOp.INCREMENT_SEQUENCE); |
| } |
| |
| @SuppressWarnings("deprecation") |
| private void incrementSequenceValues(List<SequenceAllocation> sequenceAllocations, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp op) throws SQLException { |
| List<Sequence> sequences = Lists.newArrayListWithExpectedSize(sequenceAllocations.size()); |
| for (SequenceAllocation sequenceAllocation : sequenceAllocations) { |
| SequenceKey key = sequenceAllocation.getSequenceKey(); |
| Sequence newSequences = new Sequence(key); |
| Sequence sequence = sequenceMap.putIfAbsent(key, newSequences); |
| if (sequence == null) { |
| sequence = newSequences; |
| } |
| sequences.add(sequence); |
| } |
| try { |
| for (Sequence sequence : sequences) { |
| sequence.getLock().lock(); |
| } |
| // Now that we have all the locks we need, increment the sequences |
| List<Increment> incrementBatch = Lists.newArrayListWithExpectedSize(sequences.size()); |
| List<Sequence> toIncrementList = Lists.newArrayListWithExpectedSize(sequences.size()); |
| int[] indexes = new int[sequences.size()]; |
| for (int i = 0; i < sequences.size(); i++) { |
| Sequence sequence = sequences.get(i); |
| try { |
| values[i] = sequence.incrementValue(timestamp, op, sequenceAllocations.get(i).getNumAllocations()); |
| } catch (EmptySequenceCacheException e) { |
| indexes[toIncrementList.size()] = i; |
| toIncrementList.add(sequence); |
| Increment inc = sequence.newIncrement(timestamp, op, sequenceAllocations.get(i).getNumAllocations()); |
| incrementBatch.add(inc); |
| } catch (SQLException e) { |
| exceptions[i] = e; |
| } |
| } |
| if (toIncrementList.isEmpty()) { |
| return; |
| } |
| HTableInterface hTable = this.getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES); |
| Object[] resultObjects = null; |
| SQLException sqlE = null; |
| try { |
| resultObjects= hTable.batch(incrementBatch); |
| } catch (IOException e) { |
| sqlE = ServerUtil.parseServerException(e); |
| } catch (InterruptedException e) { |
| // restore the interrupt status |
| Thread.currentThread().interrupt(); |
| sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION) |
| .setRootCause(e).build().buildException(); // FIXME ? |
| } finally { |
| try { |
| hTable.close(); |
| } catch (IOException e) { |
| if (sqlE == null) { |
| sqlE = ServerUtil.parseServerException(e); |
| } else { |
| sqlE.setNextException(ServerUtil.parseServerException(e)); |
| } |
| } |
| if (sqlE != null) { |
| throw sqlE; |
| } |
| } |
| for (int i=0;i<resultObjects.length;i++){ |
| Sequence sequence = toIncrementList.get(i); |
| Result result = (Result)resultObjects[i]; |
| try { |
| long numToAllocate = Bytes.toLong(incrementBatch.get(i).getAttribute(SequenceRegionObserver.NUM_TO_ALLOCATE)); |
| values[indexes[i]] = sequence.incrementValue(result, op, numToAllocate); |
| } catch (SQLException e) { |
| exceptions[indexes[i]] = e; |
| } |
| } |
| } finally { |
| for (Sequence sequence : sequences) { |
| sequence.getLock().unlock(); |
| } |
| } |
| } |
| |
| @Override |
| public void clearTableFromCache(final byte[] tenantId, final byte[] schemaName, final byte[] tableName, |
| final long clientTS) throws SQLException { |
| // clear the meta data cache for the table here |
| try { |
| SQLException sqlE = null; |
| HTableInterface htable = this.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); |
| try { |
| htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, |
| new Batch.Call<MetaDataService, ClearTableFromCacheResponse>() { |
| @Override |
| public ClearTableFromCacheResponse call(MetaDataService instance) throws IOException { |
| ServerRpcController controller = new ServerRpcController(); |
| BlockingRpcCallback<ClearTableFromCacheResponse> rpcCallback = new BlockingRpcCallback<ClearTableFromCacheResponse>(); |
| ClearTableFromCacheRequest.Builder builder = ClearTableFromCacheRequest.newBuilder(); |
| builder.setTenantId(ByteStringer.wrap(tenantId)); |
| builder.setTableName(ByteStringer.wrap(tableName)); |
| builder.setSchemaName(ByteStringer.wrap(schemaName)); |
| builder.setClientTimestamp(clientTS); |
| builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); |
| instance.clearTableFromCache(controller, builder.build(), rpcCallback); |
| if (controller.getFailedOn() != null) { throw controller.getFailedOn(); } |
| return rpcCallback.get(); |
| } |
| }); |
| } catch (IOException e) { |
| throw ServerUtil.parseServerException(e); |
| } catch (Throwable e) { |
| sqlE = new SQLException(e); |
| } finally { |
| try { |
| if (tenantId.length == 0) tableStatsCache.invalidate(new ImmutableBytesPtr(SchemaUtil.getTableNameAsBytes(schemaName, tableName))); |
| htable.close(); |
| } catch (IOException e) { |
| if (sqlE == null) { |
| sqlE = ServerUtil.parseServerException(e); |
| } else { |
| sqlE.setNextException(ServerUtil.parseServerException(e)); |
| } |
| } finally { |
| if (sqlE != null) { throw sqlE; } |
| } |
| } |
| } catch (Exception e) { |
| throw new SQLException(ServerUtil.parseServerException(e)); |
| } |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public void returnSequences(List<SequenceKey> keys, long timestamp, SQLException[] exceptions) throws SQLException { |
| List<Sequence> sequences = Lists.newArrayListWithExpectedSize(keys.size()); |
| for (SequenceKey key : keys) { |
| Sequence newSequences = new Sequence(key); |
| Sequence sequence = sequenceMap.putIfAbsent(key, newSequences); |
| if (sequence == null) { |
| sequence = newSequences; |
| } |
| sequences.add(sequence); |
| } |
| try { |
| for (Sequence sequence : sequences) { |
| sequence.getLock().lock(); |
| } |
| // Now that we have all the locks we need, attempt to return the unused sequence values |
| List<Append> mutations = Lists.newArrayListWithExpectedSize(sequences.size()); |
| List<Sequence> toReturnList = Lists.newArrayListWithExpectedSize(sequences.size()); |
| int[] indexes = new int[sequences.size()]; |
| for (int i = 0; i < sequences.size(); i++) { |
| Sequence sequence = sequences.get(i); |
| try { |
| Append append = sequence.newReturn(timestamp); |
| toReturnList.add(sequence); |
| mutations.add(append); |
| } catch (EmptySequenceCacheException ignore) { // Nothing to return, so ignore |
| } |
| } |
| if (toReturnList.isEmpty()) { |
| return; |
| } |
| HTableInterface hTable = this.getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES); |
| Object[] resultObjects = null; |
| SQLException sqlE = null; |
| try { |
| resultObjects= hTable.batch(mutations); |
| } catch (IOException e){ |
| sqlE = ServerUtil.parseServerException(e); |
| } catch (InterruptedException e){ |
| // restore the interrupt status |
| Thread.currentThread().interrupt(); |
| sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION) |
| .setRootCause(e).build().buildException(); // FIXME ? |
| } finally { |
| try { |
| hTable.close(); |
| } catch (IOException e) { |
| if (sqlE == null) { |
| sqlE = ServerUtil.parseServerException(e); |
| } else { |
| sqlE.setNextException(ServerUtil.parseServerException(e)); |
| } |
| } |
| if (sqlE != null) { |
| throw sqlE; |
| } |
| } |
| for (int i=0;i<resultObjects.length;i++){ |
| Sequence sequence = toReturnList.get(i); |
| Result result = (Result)resultObjects[i]; |
| try { |
| sequence.returnValue(result); |
| } catch (SQLException e) { |
| exceptions[indexes[i]] = e; |
| } |
| } |
| } finally { |
| for (Sequence sequence : sequences) { |
| sequence.getLock().unlock(); |
| } |
| } |
| } |
| |
| // Take no locks, as this only gets run when there are no open connections |
| // so there's no danger of contention. |
| @SuppressWarnings("deprecation") |
| private void returnAllSequences(ConcurrentMap<SequenceKey,Sequence> sequenceMap) throws SQLException { |
| List<Append> mutations = Lists.newArrayListWithExpectedSize(sequenceMap.size()); |
| for (Sequence sequence : sequenceMap.values()) { |
| mutations.addAll(sequence.newReturns()); |
| } |
| if (mutations.isEmpty()) { |
| return; |
| } |
| HTableInterface hTable = this.getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES); |
| SQLException sqlE = null; |
| try { |
| hTable.batch(mutations); |
| } catch (IOException e) { |
| sqlE = ServerUtil.parseServerException(e); |
| } catch (InterruptedException e) { |
| // restore the interrupt status |
| Thread.currentThread().interrupt(); |
| sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION) |
| .setRootCause(e).build().buildException(); // FIXME ? |
| } finally { |
| try { |
| hTable.close(); |
| } catch (IOException e) { |
| if (sqlE == null) { |
| sqlE = ServerUtil.parseServerException(e); |
| } else { |
| sqlE.setNextException(ServerUtil.parseServerException(e)); |
| } |
| } |
| if (sqlE != null) { |
| throw sqlE; |
| } |
| } |
| } |
| |
| @Override |
| public void addConnection(PhoenixConnection connection) throws SQLException { |
| connectionQueues.get(getQueueIndex(connection)).add(new WeakReference<PhoenixConnection>(connection)); |
| if (returnSequenceValues) { |
| synchronized (connectionCountLock) { |
| connectionCount++; |
| } |
| } |
| } |
| |
| @Override |
| public void removeConnection(PhoenixConnection connection) throws SQLException { |
| if (returnSequenceValues) { |
| ConcurrentMap<SequenceKey,Sequence> formerSequenceMap = null; |
| synchronized (connectionCountLock) { |
| if (--connectionCount == 0) { |
| if (!this.sequenceMap.isEmpty()) { |
| formerSequenceMap = this.sequenceMap; |
| this.sequenceMap = Maps.newConcurrentMap(); |
| } |
| } |
| } |
| // Since we're using the former sequenceMap, we can do this outside |
| // the lock. |
| if (formerSequenceMap != null) { |
| // When there are no more connections, attempt to return any sequences |
| returnAllSequences(formerSequenceMap); |
| } |
| } |
| } |
| |
| private int getQueueIndex(PhoenixConnection conn) { |
| return ThreadLocalRandom.current().nextInt(renewLeasePoolSize); |
| } |
| |
| @Override |
| public KeyValueBuilder getKeyValueBuilder() { |
| return this.kvBuilder; |
| } |
| |
| @Override |
| public boolean supportsFeature(Feature feature) { |
| FeatureSupported supported = featureMap.get(feature); |
| if (supported == null) { |
| return false; |
| } |
| return supported.isSupported(this); |
| } |
| |
| @Override |
| public String getUserName() { |
| return userName; |
| } |
| |
| private void checkClosed() { |
| if (closed) { |
| throwConnectionClosedException(); |
| } |
| } |
| |
| private void throwConnectionClosedIfNullMetaData() { |
| if (latestMetaData == null) { |
| throwConnectionClosedException(); |
| } |
| } |
| |
| private void throwConnectionClosedException() { |
| throw new IllegalStateException("Connection to the cluster is closed"); |
| } |
| |
| @Override |
| public PTableStats getTableStats(final byte[] physicalName, final long clientTimeStamp) throws SQLException { |
| try { |
| return tableStatsCache.get(new ImmutableBytesPtr(physicalName), new Callable<PTableStats>() { |
| @Override |
| public PTableStats call() throws Exception { |
| /* |
| * The shared view index case is tricky, because we don't have |
| * table metadata for it, only an HBase table. We do have stats, |
| * though, so we'll query them directly here and cache them so |
| * we don't keep querying for them. |
| */ |
| HTableInterface statsHTable = ConnectionQueryServicesImpl.this.getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES); |
| try { |
| return StatisticsUtil.readStatistics(statsHTable, physicalName, clientTimeStamp); |
| } catch (IOException e) { |
| logger.warn("Unable to read from stats table", e); |
| // Just cache empty stats. We'll try again after some time anyway. |
| return PTableStats.EMPTY_STATS; |
| } finally { |
| try { |
| statsHTable.close(); |
| } catch (IOException e) { |
| // Log, but continue. We have our stats anyway now. |
| logger.warn("Unable to close stats table", e); |
| } |
| } |
| } |
| |
| }); |
| } catch (ExecutionException e) { |
| throw ServerUtil.parseServerException(e); |
| } |
| } |
| |
| @Override |
| public int getSequenceSaltBuckets() { |
| return nSequenceSaltBuckets; |
| } |
| |
| @Override |
| public PMetaData addFunction(PFunction function) throws SQLException { |
| synchronized (latestMetaDataLock) { |
| try { |
| throwConnectionClosedIfNullMetaData(); |
| // If existing table isn't older than new table, don't replace |
| // If a client opens a connection at an earlier timestamp, this can happen |
| PFunction existingFunction = latestMetaData.getFunction(new PTableKey(function.getTenantId(), function.getFunctionName())); |
| if (existingFunction.getTimeStamp() >= function.getTimeStamp()) { |
| return latestMetaData; |
| } |
| } catch (FunctionNotFoundException e) {} |
| latestMetaData = latestMetaData.addFunction(function); |
| latestMetaDataLock.notifyAll(); |
| return latestMetaData; |
| } |
| } |
| |
| @Override |
| public PMetaData removeFunction(PName tenantId, String function, long functionTimeStamp) |
| throws SQLException { |
| synchronized (latestMetaDataLock) { |
| throwConnectionClosedIfNullMetaData(); |
| latestMetaData = latestMetaData.removeFunction(tenantId, function, functionTimeStamp); |
| latestMetaDataLock.notifyAll(); |
| return latestMetaData; |
| } |
| } |
| |
| @Override |
| public MetaDataMutationResult getFunctions(PName tenantId, final List<Pair<byte[], Long>> functions, |
| final long clientTimestamp) throws SQLException { |
| final byte[] tenantIdBytes = tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(); |
| return metaDataCoprocessorExec(tenantIdBytes, |
| new Batch.Call<MetaDataService, MetaDataResponse>() { |
| @Override |
| public MetaDataResponse call(MetaDataService instance) throws IOException { |
| ServerRpcController controller = new ServerRpcController(); |
| BlockingRpcCallback<MetaDataResponse> rpcCallback = |
| new BlockingRpcCallback<MetaDataResponse>(); |
| GetFunctionsRequest.Builder builder = GetFunctionsRequest.newBuilder(); |
| builder.setTenantId(ByteStringer.wrap(tenantIdBytes)); |
| for(Pair<byte[], Long> function: functions) { |
| builder.addFunctionNames(ByteStringer.wrap(function.getFirst())); |
| builder.addFunctionTimestamps(function.getSecond().longValue()); |
| } |
| builder.setClientTimestamp(clientTimestamp); |
| builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); |
| instance.getFunctions(controller, builder.build(), rpcCallback); |
| if(controller.getFailedOn() != null) { |
| throw controller.getFailedOn(); |
| } |
| return rpcCallback.get(); |
| } |
| }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES); |
| |
| } |
| |
| // TODO the mutations should be added to System functions table. |
| @Override |
| public MetaDataMutationResult createFunction(final List<Mutation> functionData, |
| final PFunction function, final boolean temporary) throws SQLException { |
| byte[][] rowKeyMetadata = new byte[2][]; |
| Mutation m = MetaDataUtil.getPutOnlyTableHeaderRow(functionData); |
| byte[] key = m.getRow(); |
| SchemaUtil.getVarChars(key, rowKeyMetadata); |
| byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; |
| byte[] functionBytes = rowKeyMetadata[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX]; |
| byte[] functionKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionBytes); |
| MetaDataMutationResult result = metaDataCoprocessorExec(functionKey, |
| new Batch.Call<MetaDataService, MetaDataResponse>() { |
| @Override |
| public MetaDataResponse call(MetaDataService instance) throws IOException { |
| ServerRpcController controller = new ServerRpcController(); |
| BlockingRpcCallback<MetaDataResponse> rpcCallback = |
| new BlockingRpcCallback<MetaDataResponse>(); |
| CreateFunctionRequest.Builder builder = CreateFunctionRequest.newBuilder(); |
| for (Mutation m : functionData) { |
| MutationProto mp = ProtobufUtil.toProto(m); |
| builder.addTableMetadataMutations(mp.toByteString()); |
| } |
| builder.setTemporary(temporary); |
| builder.setReplace(function.isReplace()); |
| builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); |
| instance.createFunction(controller, builder.build(), rpcCallback); |
| if(controller.getFailedOn() != null) { |
| throw controller.getFailedOn(); |
| } |
| return rpcCallback.get(); |
| } |
| }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES); |
| return result; |
| } |
| |
| @VisibleForTesting |
| static class RenewLeaseTask implements Runnable { |
| |
| private final LinkedBlockingQueue<WeakReference<PhoenixConnection>> connectionsQueue; |
| private final Random random = new Random(); |
| private static final int MAX_WAIT_TIME = 1000; |
| |
| RenewLeaseTask(LinkedBlockingQueue<WeakReference<PhoenixConnection>> queue) { |
| this.connectionsQueue = queue; |
| } |
| |
| private void waitForRandomDuration() throws InterruptedException { |
| new CountDownLatch(1).await(random.nextInt(MAX_WAIT_TIME), MILLISECONDS); |
| } |
| |
| @Override |
| public void run() { |
| try { |
| int numConnections = connectionsQueue.size(); |
| boolean wait = true; |
| // We keep adding items to the end of the queue. So to stop the loop, iterate only up to |
| // whatever the current count is. |
| while (numConnections > 0) { |
| if (wait) { |
| // wait for some random duration to prevent all threads from renewing lease at |
| // the same time. |
| waitForRandomDuration(); |
| wait = false; |
| } |
| // It is guaranteed that this poll won't hang indefinitely because this is the |
| // only thread that removes items from the queue. Still adding a 1 ms timeout |
| // for sanity check. |
| WeakReference<PhoenixConnection> connRef = |
| connectionsQueue.poll(1, TimeUnit.MILLISECONDS); |
| if (connRef == null) { |
| throw new IllegalStateException( |
| "Connection ref found to be null. This is a bug. Some other thread removed items from the connection queue."); |
| } |
| PhoenixConnection conn = connRef.get(); |
| if (conn != null && !conn.isClosed()) { |
| LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue = |
| conn.getScanners(); |
| // We keep adding items to the end of the queue. So to stop the loop, |
| // iterate only up to whatever the current count is. |
| int numScanners = scannerQueue.size(); |
| int renewed = 0; |
| long start = System.currentTimeMillis(); |
| while (numScanners > 0) { |
| // It is guaranteed that this poll won't hang indefinitely because this is the |
| // only thread that removes items from the queue. Still adding a 1 ms timeout |
| // for sanity check. |
| WeakReference<TableResultIterator> ref = |
| scannerQueue.poll(1, TimeUnit.MILLISECONDS); |
| if (ref == null) { |
| throw new IllegalStateException( |
| "TableResulIterator ref found to be null. This is a bug. Some other thread removed items from the scanner queue."); |
| } |
| TableResultIterator scanningItr = ref.get(); |
| if (scanningItr != null) { |
| RenewLeaseStatus status = scanningItr.renewLease(); |
| switch (status) { |
| case RENEWED: |
| renewed++; |
| // add it back at the tail |
| scannerQueue.offer(new WeakReference<TableResultIterator>( |
| scanningItr)); |
| logger.info("Lease renewed for scanner: " + scanningItr); |
| break; |
| case UNINITIALIZED: |
| case THRESHOLD_NOT_REACHED: |
| // add it back at the tail |
| scannerQueue.offer(new WeakReference<TableResultIterator>( |
| scanningItr)); |
| break; |
| // if lease wasn't renewed or scanner was closed, don't add the |
| // scanner back to the queue. |
| case CLOSED: |
| case NOT_RENEWED: |
| break; |
| } |
| } |
| numScanners--; |
| } |
| if (renewed > 0) { |
| logger.info("Renewed leases for " + renewed + " scanner/s in " |
| + (System.currentTimeMillis() - start) + " ms "); |
| } |
| connectionsQueue.offer(connRef); |
| } |
| numConnections--; |
| } |
| } catch (InterruptedException e1) { |
| Thread.currentThread().interrupt(); // restore the interrupt status |
| logger.warn("Thread interrupted when renewing lease ", e1); |
| throw new RuntimeException(e1); |
| } catch (Exception e2) { |
| logger.warn("Exception thrown when renewing lease ", e2); |
| throw new RuntimeException(e2); |
| } |
| } |
| } |
| |
| @Override |
| public long getRenewLeaseThresholdMilliSeconds() { |
| return renewLeaseThreshold; |
| } |
| |
| @Override |
| public boolean isRenewingLeasesEnabled() { |
| return supportsFeature(ConnectionQueryServices.Feature.RENEW_LEASE) && renewLeaseEnabled; |
| } |
| |
| |
| @Override |
| public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] row) throws SQLException { |
| /* |
| * Use HConnection.getRegionLocation as it uses the cache in HConnection, to get the region |
| * to which specified row belongs to. |
| */ |
| int retryCount = 0, maxRetryCount = 1; |
| boolean reload =false; |
| while (true) { |
| try { |
| return connection.getRegionLocation(TableName.valueOf(tableName), row, reload); |
| } catch (org.apache.hadoop.hbase.TableNotFoundException e) { |
| String fullName = Bytes.toString(tableName); |
| throw new TableNotFoundException(SchemaUtil.getSchemaNameFromFullName(fullName), SchemaUtil.getTableNameFromFullName(fullName)); |
| } catch (IOException e) { |
| if (retryCount++ < maxRetryCount) { // One retry, in case split occurs while navigating |
| reload = true; |
| continue; |
| } |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.GET_TABLE_REGIONS_FAIL) |
| .setRootCause(e).build().buildException(); |
| } |
| } |
| } |
| |
| } |