| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.query; |
| |
| import static com.google.common.io.Closeables.closeQuietly; |
| import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG; |
| import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG; |
| import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE; |
| import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE; |
| import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES; |
| import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA; |
| |
| import java.io.IOException; |
| import java.sql.SQLException; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| |
| import javax.annotation.concurrent.GuardedBy; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.HColumnDescriptor; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.HRegionLocation; |
| import org.apache.hadoop.hbase.HTableDescriptor; |
| import org.apache.hadoop.hbase.TableExistsException; |
| import org.apache.hadoop.hbase.client.Append; |
| import org.apache.hadoop.hbase.client.HBaseAdmin; |
| import org.apache.hadoop.hbase.client.HConnection; |
| import org.apache.hadoop.hbase.client.HTableInterface; |
| import org.apache.hadoop.hbase.client.Increment; |
| import org.apache.hadoop.hbase.client.Mutation; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.client.coprocessor.Batch; |
| import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
| import org.apache.hadoop.hbase.security.User; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.Pair; |
| import org.apache.hadoop.hbase.util.VersionInfo; |
| import org.apache.hadoop.hbase.zookeeper.ZKConfig; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.phoenix.compile.MutationPlan; |
| import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver; |
| import org.apache.phoenix.coprocessor.MetaDataEndpointImpl; |
| import org.apache.phoenix.coprocessor.MetaDataProtocol; |
| import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; |
| import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; |
| import org.apache.phoenix.coprocessor.MetaDataRegionObserver; |
| import org.apache.phoenix.coprocessor.ScanRegionObserver; |
| import org.apache.phoenix.coprocessor.SequenceRegionObserver; |
| import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl; |
| import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; |
| import org.apache.phoenix.exception.PhoenixIOException; |
| import org.apache.phoenix.exception.SQLExceptionCode; |
| import org.apache.phoenix.exception.SQLExceptionInfo; |
| import org.apache.phoenix.execute.MutationState; |
| import org.apache.phoenix.hbase.index.Indexer; |
| import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder; |
| import org.apache.phoenix.hbase.index.util.KeyValueBuilder; |
| import org.apache.phoenix.index.PhoenixIndexBuilder; |
| import org.apache.phoenix.index.PhoenixIndexCodec; |
| import org.apache.phoenix.jdbc.PhoenixConnection; |
| import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; |
| import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; |
| import org.apache.phoenix.schema.EmptySequenceCacheException; |
| import org.apache.phoenix.schema.MetaDataSplitPolicy; |
| import org.apache.phoenix.schema.NewerTableAlreadyExistsException; |
| import org.apache.phoenix.schema.PColumn; |
| import org.apache.phoenix.schema.PColumnFamily; |
| import org.apache.phoenix.schema.PDataType; |
| import org.apache.phoenix.schema.PMetaData; |
| import org.apache.phoenix.schema.PMetaDataImpl; |
| import org.apache.phoenix.schema.PName; |
| import org.apache.phoenix.schema.PNameFactory; |
| import org.apache.phoenix.schema.PTable; |
| import org.apache.phoenix.schema.PTableKey; |
| import org.apache.phoenix.schema.PTableType; |
| import org.apache.phoenix.schema.ReadOnlyTableException; |
| import org.apache.phoenix.schema.SaltingUtil; |
| import org.apache.phoenix.schema.Sequence; |
| import org.apache.phoenix.schema.SequenceKey; |
| import org.apache.phoenix.schema.TableAlreadyExistsException; |
| import org.apache.phoenix.schema.TableNotFoundException; |
| import org.apache.phoenix.util.ByteUtil; |
| import org.apache.phoenix.util.MetaDataUtil; |
| import org.apache.phoenix.util.PhoenixContextExecutor; |
| import org.apache.phoenix.util.PhoenixRuntime; |
| import org.apache.phoenix.util.PropertiesUtil; |
| import org.apache.phoenix.util.ReadOnlyProps; |
| import org.apache.phoenix.util.SchemaUtil; |
| import org.apache.phoenix.util.ServerUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Throwables; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| import com.google.common.io.Closeables; |
| |
| public class ConnectionQueryServicesImpl extends DelegateQueryServices implements ConnectionQueryServices { |
| private static final Logger logger = LoggerFactory.getLogger(ConnectionQueryServicesImpl.class); |
| private static final int INITIAL_CHILD_SERVICES_CAPACITY = 100; |
| private static final int DEFAULT_OUT_OF_ORDER_MUTATIONS_WAIT_TIME_MS = 1000; |
| protected final Configuration config; |
| // Copy of config.getProps(), but read-only to prevent synchronization that we |
| // don't need. |
| private final ReadOnlyProps props; |
| private final String userName; |
| private final ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices> childServices; |
| private final StatsManager statsManager; |
| |
| // Cache the latest meta data here for future connections |
| @GuardedBy("latestMetaDataLock") |
| private PMetaData latestMetaData; |
| private final Object latestMetaDataLock = new Object(); |
| |
| // Lowest HBase version on the cluster. |
| private int lowestClusterHBaseVersion = Integer.MAX_VALUE; |
| private boolean hasInvalidIndexConfiguration = false; |
| private int connectionCount = 0; |
| |
| private HConnection connection; |
| private volatile boolean initialized; |
| |
| // writes guarded by "this" |
| private volatile boolean closed; |
| |
| private volatile SQLException initializationException; |
| protected ConcurrentMap<SequenceKey,Sequence> sequenceMap = Maps.newConcurrentMap(); |
| private KeyValueBuilder kvBuilder; |
| |
| private PMetaData newEmptyMetaData() { |
| long maxSizeBytes = props.getLong(QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB, |
| QueryServicesOptions.DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE); |
| return new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, maxSizeBytes); |
| } |
| /** |
| * Construct a ConnectionQueryServicesImpl that represents a connection to an HBase |
| * cluster. |
| * @param services base services from where we derive our default configuration |
| * @param connectionInfo to provide connection information |
| * @throws SQLException |
| */ |
| public ConnectionQueryServicesImpl(QueryServices services, ConnectionInfo connectionInfo) { |
| super(services); |
| Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); |
| for (Entry<String,String> entry : services.getProps()) { |
| config.set(entry.getKey(), entry.getValue()); |
| } |
| for (Entry<String,String> entry : connectionInfo.asProps()) { |
| config.set(entry.getKey(), entry.getValue()); |
| } |
| // Without making a copy of the configuration we cons up, we lose some of our properties |
| // on the server side during testing. |
| this.config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config); |
| this.props = new ReadOnlyProps(this.config.iterator()); |
| this.userName = connectionInfo.getPrincipal(); |
| this.latestMetaData = newEmptyMetaData(); |
| // TODO: should we track connection wide memory usage or just org-wide usage? |
| // If connection-wide, create a MemoryManager here, otherwise just use the one from the delegate |
| this.childServices = new ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices>(INITIAL_CHILD_SERVICES_CAPACITY); |
| int statsUpdateFrequencyMs = this.getProps().getInt(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, QueryServicesOptions.DEFAULT_STATS_UPDATE_FREQ_MS); |
| int maxStatsAgeMs = this.getProps().getInt(QueryServices.MAX_STATS_AGE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_STATS_AGE_MS); |
| this.statsManager = new StatsManagerImpl(this, statsUpdateFrequencyMs, maxStatsAgeMs); |
| |
| // find the HBase version and use that to determine the KeyValueBuilder that should be used |
| String hbaseVersion = VersionInfo.getVersion(); |
| this.kvBuilder = KeyValueBuilder.get(hbaseVersion); |
| } |
| |
| private void openConnection() throws SQLException { |
| try { |
| // check if we need to authenticate with kerberos |
| String clientKeytab = this.getProps().get(HBASE_CLIENT_KEYTAB); |
| String clientPrincipal = this.getProps().get(HBASE_CLIENT_PRINCIPAL); |
| if (clientKeytab != null && clientPrincipal != null) { |
| logger.info("Trying to connect to a secure cluster with keytab:" + clientKeytab); |
| UserGroupInformation.setConfiguration(config); |
| User.login(config, HBASE_CLIENT_KEYTAB, HBASE_CLIENT_PRINCIPAL, null); |
| logger.info("Successfull login to secure cluster!!"); |
| } |
| this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config); |
| } catch (IOException e) { |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION) |
| .setRootCause(e).build().buildException(); |
| } |
| if (this.connection.isClosed()) { // TODO: why the heck doesn't this throw above? |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION).build().buildException(); |
| } |
| } |
| |
| @Override |
| public StatsManager getStatsManager() { |
| return this.statsManager; |
| } |
| |
| @Override |
| public HTableInterface getTable(byte[] tableName) throws SQLException { |
| try { |
| return HBaseFactoryProvider.getHTableFactory().getTable(tableName, connection, getExecutor()); |
| } catch (org.apache.hadoop.hbase.TableNotFoundException e) { |
| byte[][] schemaAndTableName = new byte[2][]; |
| SchemaUtil.getVarChars(tableName, schemaAndTableName); |
| throw new TableNotFoundException(Bytes.toString(schemaAndTableName[0]), Bytes.toString(schemaAndTableName[1])); |
| } catch (IOException e) { |
| throw new SQLException(e); |
| } |
| } |
| |
| @Override |
| public HTableDescriptor getTableDescriptor(byte[] tableName) throws SQLException { |
| HTableInterface htable = getTable(tableName); |
| try { |
| return htable.getTableDescriptor(); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } finally { |
| Closeables.closeQuietly(htable); |
| } |
| } |
| |
| @Override |
| public ReadOnlyProps getProps() { |
| return props; |
| } |
| |
| /** |
| * Closes the underlying connection to zookeeper. The QueryServices |
| * may not be used after that point. When a Connection is closed, |
| * this is not called, since these instances are pooled by the |
| * Driver. Instead, the Driver should call this if the QueryServices |
| * is ever removed from the pool |
| */ |
| @Override |
| public void close() throws SQLException { |
| if (closed) { |
| return; |
| } |
| synchronized (this) { |
| if (closed) { |
| return; |
| } |
| closed = true; |
| SQLException sqlE = null; |
| try { |
| // Attempt to return any unused sequences. |
| if (connection != null) returnAllSequences(this.sequenceMap); |
| } catch (SQLException e) { |
| sqlE = e; |
| } finally { |
| try { |
| // Clear any client-side caches. |
| statsManager.clearStats(); |
| } catch (SQLException e) { |
| if (sqlE == null) { |
| sqlE = e; |
| } else { |
| sqlE.setNextException(e); |
| } |
| } finally { |
| try { |
| childServices.clear(); |
| synchronized (latestMetaDataLock) { |
| latestMetaData = null; |
| latestMetaDataLock.notifyAll(); |
| } |
| if (connection != null) connection.close(); |
| } catch (IOException e) { |
| if (sqlE == null) { |
| sqlE = ServerUtil.parseServerException(e); |
| } else { |
| sqlE.setNextException(ServerUtil.parseServerException(e)); |
| } |
| } finally { |
| try { |
| super.close(); |
| } catch (SQLException e) { |
| if (sqlE == null) { |
| sqlE = e; |
| } else { |
| sqlE.setNextException(e); |
| } |
| } finally { |
| if (sqlE != null) { |
| throw sqlE; |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| protected ConnectionQueryServices newChildQueryService() { |
| return new ChildQueryServices(this); |
| } |
| |
| /** |
| * Get (and create if necessary) a child QueryService for a given tenantId. |
| * The QueryService will be cached for the lifetime of the parent QueryService |
| * @param tenantId the tenant ID |
| * @return the child QueryService |
| */ |
| @Override |
| public ConnectionQueryServices getChildQueryServices(ImmutableBytesWritable tenantId) { |
| ConnectionQueryServices childQueryService = childServices.get(tenantId); |
| if (childQueryService == null) { |
| childQueryService = newChildQueryService(); |
| ConnectionQueryServices prevQueryService = childServices.putIfAbsent(tenantId, childQueryService); |
| return prevQueryService == null ? childQueryService : prevQueryService; |
| } |
| return childQueryService; |
| } |
| |
| @Override |
| public void clearTableRegionCache(byte[] tableName) throws SQLException { |
| connection.clearRegionCache(tableName); |
| } |
| |
| @Override |
| public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException { |
| /* |
| * Use HConnection.getRegionLocation as it uses the cache in HConnection, while getting |
| * all region locations from the HTable doesn't. |
| */ |
| int retryCount = 0, maxRetryCount = 1; |
| boolean reload =false; |
| while (true) { |
| try { |
| // We could surface the package projected HConnectionImplementation.getNumberOfCachedRegionLocations |
| // to get the sizing info we need, but this would require a new class in the same package and a cast |
| // to this implementation class, so it's probably not worth it. |
| List<HRegionLocation> locations = Lists.newArrayList(); |
| byte[] currentKey = HConstants.EMPTY_START_ROW; |
| do { |
| HRegionLocation regionLocation = connection.getRegionLocation(tableName, currentKey, reload); |
| locations.add(regionLocation); |
| currentKey = regionLocation.getRegionInfo().getEndKey(); |
| } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)); |
| return locations; |
| } catch (org.apache.hadoop.hbase.TableNotFoundException e) { |
| String fullName = Bytes.toString(tableName); |
| throw new TableNotFoundException(SchemaUtil.getSchemaNameFromFullName(fullName), SchemaUtil.getTableNameFromFullName(fullName)); |
| } catch (IOException e) { |
| if (retryCount++ < maxRetryCount) { // One retry, in case split occurs while navigating |
| reload = true; |
| continue; |
| } |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.GET_TABLE_REGIONS_FAIL) |
| .setRootCause(e).build().buildException(); |
| } |
| } |
| } |
| |
| @Override |
| public PMetaData addTable(PTable table) throws SQLException { |
| synchronized (latestMetaDataLock) { |
| try { |
| throwConnectionClosedIfNullMetaData(); |
| // If existing table isn't older than new table, don't replace |
| // If a client opens a connection at an earlier timestamp, this can happen |
| PTable existingTable = latestMetaData.getTable(new PTableKey(table.getTenantId(), table.getName().getString())); |
| if (existingTable.getTimeStamp() >= table.getTimeStamp()) { |
| return latestMetaData; |
| } |
| } catch (TableNotFoundException e) {} |
| latestMetaData = latestMetaData.addTable(table); |
| latestMetaDataLock.notifyAll(); |
| return latestMetaData; |
| } |
| } |
| |
| private static interface Mutator { |
| PMetaData mutate(PMetaData metaData) throws SQLException; |
| } |
| |
| /** |
| * Ensures that metaData mutations are handled in the correct order |
| * @param tenantId TODO |
| */ |
| private PMetaData metaDataMutated(PName tenantId, String tableName, long tableSeqNum, Mutator mutator) throws SQLException { |
| synchronized (latestMetaDataLock) { |
| throwConnectionClosedIfNullMetaData(); |
| PMetaData metaData = latestMetaData; |
| PTable table; |
| long endTime = System.currentTimeMillis() + DEFAULT_OUT_OF_ORDER_MUTATIONS_WAIT_TIME_MS; |
| while (true) { |
| try { |
| try { |
| table = metaData.getTable(new PTableKey(tenantId, tableName)); |
| /* If the table is at the prior sequence number, then we're good to go. |
| * We know if we've got this far, that the server validated the mutations, |
| * so we'd just need to wait until the other connection that mutated the same |
| * table is processed. |
| */ |
| if (table.getSequenceNumber() + 1 == tableSeqNum) { |
| // TODO: assert that timeStamp is bigger that table timeStamp? |
| metaData = mutator.mutate(metaData); |
| break; |
| } else if (table.getSequenceNumber() >= tableSeqNum) { |
| logger.warn("Attempt to cache older version of " + tableName + ": current= " + table.getSequenceNumber() + ", new=" + tableSeqNum); |
| break; |
| } |
| } catch (TableNotFoundException e) { |
| } |
| long waitTime = endTime - System.currentTimeMillis(); |
| // We waited long enough - just remove the table from the cache |
| // and the next time it's used it'll be pulled over from the server. |
| if (waitTime <= 0) { |
| logger.warn("Unable to update meta data repo within " + (DEFAULT_OUT_OF_ORDER_MUTATIONS_WAIT_TIME_MS/1000) + " seconds for " + tableName); |
| // There will never be a parentTableName here, as that would only |
| // be non null for an index an we never add/remove columns from an index. |
| metaData = metaData.removeTable(tenantId, tableName, null, HConstants.LATEST_TIMESTAMP); |
| break; |
| } |
| latestMetaDataLock.wait(waitTime); |
| } catch (InterruptedException e) { |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION) |
| .setRootCause(e).build().buildException(); // FIXME |
| } |
| } |
| latestMetaData = metaData; |
| latestMetaDataLock.notifyAll(); |
| return metaData; |
| } |
| } |
| |
| @Override |
| public PMetaData addColumn(final PName tenantId, final String tableName, final List<PColumn> columns, final long tableTimeStamp, final long tableSeqNum, final boolean isImmutableRows) throws SQLException { |
| return metaDataMutated(tenantId, tableName, tableSeqNum, new Mutator() { |
| @Override |
| public PMetaData mutate(PMetaData metaData) throws SQLException { |
| try { |
| return metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows); |
| } catch (TableNotFoundException e) { |
| // The DROP TABLE may have been processed first, so just ignore. |
| return metaData; |
| } |
| } |
| }); |
| } |
| |
| @Override |
| public PMetaData removeTable(PName tenantId, final String tableName, String parentTableName, long tableTimeStamp) throws SQLException { |
| synchronized (latestMetaDataLock) { |
| throwConnectionClosedIfNullMetaData(); |
| latestMetaData = latestMetaData.removeTable(tenantId, tableName, parentTableName, tableTimeStamp); |
| latestMetaDataLock.notifyAll(); |
| return latestMetaData; |
| } |
| } |
| |
| @Override |
| public PMetaData removeColumn(final PName tenantId, final String tableName, final String familyName, final String columnName, final long tableTimeStamp, final long tableSeqNum) throws SQLException { |
| return metaDataMutated(tenantId, tableName, tableSeqNum, new Mutator() { |
| @Override |
| public PMetaData mutate(PMetaData metaData) throws SQLException { |
| try { |
| return metaData.removeColumn(tenantId, tableName, familyName, columnName, tableTimeStamp, tableSeqNum); |
| } catch (TableNotFoundException e) { |
| // The DROP TABLE may have been processed first, so just ignore. |
| return metaData; |
| } |
| } |
| }); |
| } |
| |
| |
| @Override |
| public PhoenixConnection connect(String url, Properties info) throws SQLException { |
| checkClosed(); |
| synchronized (latestMetaDataLock) { |
| throwConnectionClosedIfNullMetaData(); |
| latestMetaDataLock.notifyAll(); |
| return new PhoenixConnection(this, url, info, latestMetaData); |
| } |
| } |
| |
| |
| private HColumnDescriptor generateColumnFamilyDescriptor(Pair<byte[],Map<String,Object>> family, PTableType tableType) throws SQLException { |
| HColumnDescriptor columnDesc = new HColumnDescriptor(family.getFirst()); |
| if (tableType != PTableType.VIEW) { |
| columnDesc.setKeepDeletedCells(true); |
| columnDesc.setDataBlockEncoding(SchemaUtil.DEFAULT_DATA_BLOCK_ENCODING); |
| for (Entry<String,Object> entry : family.getSecond().entrySet()) { |
| String key = entry.getKey(); |
| Object value = entry.getValue(); |
| columnDesc.setValue(key, value == null ? null : value.toString()); |
| } |
| } |
| return columnDesc; |
| } |
| |
| private void modifyColumnFamilyDescriptor(HColumnDescriptor hcd, Pair<byte[],Map<String,Object>> family) throws SQLException { |
| for (Entry<String, Object> entry : family.getSecond().entrySet()) { |
| String key = entry.getKey(); |
| Object value = entry.getValue(); |
| hcd.setValue(key, value == null ? null : value.toString()); |
| } |
| } |
| |
| private void addCoprocessors(byte[] tableName, HTableDescriptor descriptor, PTableType tableType) throws SQLException { |
| // The phoenix jar must be available on HBase classpath |
| try { |
| if (!descriptor.hasCoprocessor(ScanRegionObserver.class.getName())) { |
| descriptor.addCoprocessor(ScanRegionObserver.class.getName(), null, 1, null); |
| } |
| if (!descriptor.hasCoprocessor(UngroupedAggregateRegionObserver.class.getName())) { |
| descriptor.addCoprocessor(UngroupedAggregateRegionObserver.class.getName(), null, 1, null); |
| } |
| if (!descriptor.hasCoprocessor(GroupedAggregateRegionObserver.class.getName())) { |
| descriptor.addCoprocessor(GroupedAggregateRegionObserver.class.getName(), null, 1, null); |
| } |
| if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) { |
| descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, 1, null); |
| } |
| |
| // TODO: better encapsulation for this |
| // Since indexes can't have indexes, don't install our indexing coprocessor for indexes. Also, |
| // don't install on the metadata table until we fix the TODO there. |
| if ((tableType != PTableType.INDEX && tableType != PTableType.VIEW) && !SchemaUtil.isMetaTable(tableName) && !descriptor.hasCoprocessor(Indexer.class.getName())) { |
| Map<String, String> opts = Maps.newHashMapWithExpectedSize(1); |
| opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName()); |
| Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts); |
| } |
| |
| // Setup split policy on Phoenix metadata table to ensure that the key values of a Phoenix table |
| // stay on the same region. |
| if (SchemaUtil.isMetaTable(tableName)) { |
| if (!descriptor.hasCoprocessor(MetaDataEndpointImpl.class.getName())) { |
| descriptor.addCoprocessor(MetaDataEndpointImpl.class.getName(), null, 1, null); |
| } |
| if (!descriptor.hasCoprocessor(MetaDataRegionObserver.class.getName())) { |
| descriptor.addCoprocessor(MetaDataRegionObserver.class.getName(), null, 2, null); |
| } |
| } else if (SchemaUtil.isSequenceTable(tableName)) { |
| if (!descriptor.hasCoprocessor(SequenceRegionObserver.class.getName())) { |
| descriptor.addCoprocessor(SequenceRegionObserver.class.getName(), null, 1, null); |
| } |
| } |
| } catch (IOException e) { |
| throw ServerUtil.parseServerException(e); |
| } |
| } |
| |
| private HTableDescriptor generateTableDescriptor(byte[] tableName, HTableDescriptor existingDesc, PTableType tableType, Map<String,Object> tableProps, List<Pair<byte[],Map<String,Object>>> families, byte[][] splits) throws SQLException { |
| String defaultFamilyName = (String)tableProps.remove(PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME); |
| HTableDescriptor descriptor = (existingDesc != null) ? new HTableDescriptor(existingDesc) : new HTableDescriptor(tableName); |
| for (Entry<String,Object> entry : tableProps.entrySet()) { |
| String key = entry.getKey(); |
| Object value = entry.getValue(); |
| descriptor.setValue(key, value == null ? null : value.toString()); |
| } |
| if (families.isEmpty()) { |
| if (tableType != PTableType.VIEW) { |
| byte[] defaultFamilyByes = defaultFamilyName == null ? QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES : Bytes.toBytes(defaultFamilyName); |
| // Add dummy column family so we have key values for tables that |
| HColumnDescriptor columnDescriptor = generateColumnFamilyDescriptor(new Pair<byte[],Map<String,Object>>(defaultFamilyByes,Collections.<String,Object>emptyMap()), tableType); |
| descriptor.addFamily(columnDescriptor); |
| } |
| } else { |
| for (Pair<byte[],Map<String,Object>> family : families) { |
| // If family is only in phoenix description, add it. otherwise, modify its property accordingly. |
| byte[] familyByte = family.getFirst(); |
| if (descriptor.getFamily(familyByte) == null) { |
| if (tableType == PTableType.VIEW) { |
| String fullTableName = Bytes.toString(tableName); |
| throw new ReadOnlyTableException( |
| "The HBase column families for a read-only table must already exist", |
| SchemaUtil.getSchemaNameFromFullName(fullTableName), |
| SchemaUtil.getTableNameFromFullName(fullTableName), |
| Bytes.toString(familyByte)); |
| } |
| HColumnDescriptor columnDescriptor = generateColumnFamilyDescriptor(family, tableType); |
| descriptor.addFamily(columnDescriptor); |
| } else { |
| if (tableType != PTableType.VIEW) { |
| modifyColumnFamilyDescriptor(descriptor.getFamily(familyByte), family); |
| } |
| } |
| } |
| } |
| addCoprocessors(tableName, descriptor, tableType); |
| return descriptor; |
| } |
| |
| private void ensureFamilyCreated(byte[] tableName, PTableType tableType , Pair<byte[],Map<String,Object>> family) throws SQLException { |
| HBaseAdmin admin = null; |
| SQLException sqlE = null; |
| try { |
| admin = new HBaseAdmin(config); |
| try { |
| HTableDescriptor existingDesc = admin.getTableDescriptor(tableName); |
| HColumnDescriptor oldDescriptor = existingDesc.getFamily(family.getFirst()); |
| HColumnDescriptor columnDescriptor = null; |
| |
| if (oldDescriptor == null) { |
| if (tableType == PTableType.VIEW) { |
| String fullTableName = Bytes.toString(tableName); |
| throw new ReadOnlyTableException( |
| "The HBase column families for a VIEW must already exist", |
| SchemaUtil.getSchemaNameFromFullName(fullTableName), |
| SchemaUtil.getTableNameFromFullName(fullTableName), |
| Bytes.toString(family.getFirst())); |
| } |
| columnDescriptor = generateColumnFamilyDescriptor(family, tableType ); |
| } else { |
| columnDescriptor = new HColumnDescriptor(oldDescriptor); |
| // Don't attempt to make any metadata changes for a VIEW |
| if (tableType == PTableType.VIEW) { |
| return; |
| } |
| modifyColumnFamilyDescriptor(columnDescriptor, family); |
| } |
| |
| if (columnDescriptor.equals(oldDescriptor)) { |
| // Table already has family and it's the same. |
| return; |
| } |
| admin.disableTable(tableName); |
| if (oldDescriptor == null) { |
| admin.addColumn(tableName, columnDescriptor); |
| } else { |
| admin.modifyColumn(tableName, columnDescriptor); |
| } |
| admin.enableTable(tableName); |
| } catch (org.apache.hadoop.hbase.TableNotFoundException e) { |
| sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.TABLE_UNDEFINED).setRootCause(e).build().buildException(); |
| } |
| } catch (IOException e) { |
| sqlE = ServerUtil.parseServerException(e); |
| } finally { |
| try { |
| if (admin != null) { |
| admin.close(); |
| } |
| } catch (IOException e) { |
| if (sqlE == null) { |
| sqlE = ServerUtil.parseServerException(e); |
| } else { |
| sqlE.setNextException(ServerUtil.parseServerException(e)); |
| } |
| } finally { |
| if (sqlE != null) { |
| throw sqlE; |
| } |
| } |
| } |
| } |
| |
| /** |
| * |
| * @param tableName |
| * @param splits |
| * @param modifyExistingMetaData TODO |
| * @param familyNames |
| * @return true if table was created and false if it already exists |
| * @throws SQLException |
| */ |
| private HTableDescriptor ensureTableCreated(byte[] tableName, PTableType tableType , Map<String,Object> props, List<Pair<byte[],Map<String,Object>>> families, byte[][] splits, boolean modifyExistingMetaData) throws SQLException { |
| HBaseAdmin admin = null; |
| SQLException sqlE = null; |
| HTableDescriptor existingDesc = null; |
| boolean isMetaTable = SchemaUtil.isMetaTable(tableName); |
| boolean tableExist = true; |
| try { |
| logger.info("Found quorum: " + ZKConfig.getZKQuorumServersString(config)); |
| admin = new HBaseAdmin(config); |
| try { |
| existingDesc = admin.getTableDescriptor(tableName); |
| } catch (org.apache.hadoop.hbase.TableNotFoundException e) { |
| tableExist = false; |
| if (tableType == PTableType.VIEW) { |
| String fullTableName = Bytes.toString(tableName); |
| throw new ReadOnlyTableException( |
| "An HBase table for a VIEW must already exist", |
| SchemaUtil.getSchemaNameFromFullName(fullTableName), |
| SchemaUtil.getTableNameFromFullName(fullTableName)); |
| } |
| } |
| |
| HTableDescriptor newDesc = generateTableDescriptor(tableName, existingDesc, tableType , props, families, splits); |
| |
| if (!tableExist) { |
| /* |
| * Remove the splitPolicy attribute due to an HBase bug (see below) |
| */ |
| if (isMetaTable) { |
| newDesc.remove(HTableDescriptor.SPLIT_POLICY); |
| } |
| try { |
| if (splits == null) { |
| admin.createTable(newDesc); |
| } else { |
| admin.createTable(newDesc, splits); |
| } |
| } catch (TableExistsException e) { |
| // We can ignore this, as it just means that another client beat us |
| // to creating the HBase metadata. |
| return null; |
| } |
| if (isMetaTable) { |
| checkClientServerCompatibility(); |
| /* |
| * Now we modify the table to add the split policy, since we know that the client and |
| * server and compatible. This works around a nasty, known HBase bug where if a split |
| * policy class cannot be found on the server, the HBase table is left in a horrible |
| * "ghost" state where it can't be used and can't be deleted without bouncing the master. |
| */ |
| newDesc.setValue(HTableDescriptor.SPLIT_POLICY, MetaDataSplitPolicy.class.getName()); |
| admin.disableTable(tableName); |
| admin.modifyTable(tableName, newDesc); |
| admin.enableTable(tableName); |
| } |
| return null; |
| } else { |
| if (!modifyExistingMetaData || existingDesc.equals(newDesc)) { |
| // Table is already created. Note that the presplits are ignored in this case |
| if (isMetaTable) { |
| checkClientServerCompatibility(); |
| } |
| return existingDesc; |
| } |
| |
| if (isMetaTable) { |
| checkClientServerCompatibility(); |
| } |
| |
| // TODO: Take advantage of online schema change ability by setting "hbase.online.schema.update.enable" to true |
| admin.disableTable(tableName); |
| admin.modifyTable(tableName, newDesc); |
| admin.enableTable(tableName); |
| |
| return newDesc; |
| } |
| |
| } catch (IOException e) { |
| sqlE = ServerUtil.parseServerException(e); |
| } finally { |
| try { |
| if (admin != null) { |
| admin.close(); |
| } |
| } catch (IOException e) { |
| if (sqlE == null) { |
| sqlE = ServerUtil.parseServerException(e); |
| } else { |
| sqlE.setNextException(ServerUtil.parseServerException(e)); |
| } |
| } finally { |
| if (sqlE != null) { |
| throw sqlE; |
| } |
| } |
| } |
| return null; // will never make it here |
| } |
| |
| private static boolean isInvalidMutableIndexConfig(Long serverVersion) { |
| if (serverVersion == null) { |
| return false; |
| } |
| return !MetaDataUtil.decodeMutableIndexConfiguredProperly(serverVersion); |
| } |
| |
| private static boolean isCompatible(Long serverVersion) { |
| if (serverVersion == null) { |
| return false; |
| } |
| return MetaDataUtil.areClientAndServerCompatible(serverVersion); |
| } |
| |
| private void checkClientServerCompatibility() throws SQLException { |
| StringBuilder buf = new StringBuilder("The following servers require an updated " + QueryConstants.DEFAULT_COPROCESS_PATH + " to be put in the classpath of HBase: "); |
| boolean isIncompatible = false; |
| int minHBaseVersion = Integer.MAX_VALUE; |
| try { |
| List<HRegionLocation> locations = this.getAllTableRegions(SYSTEM_CATALOG_NAME_BYTES); |
| Set<HRegionLocation> serverMap = Sets.newHashSetWithExpectedSize(locations.size()); |
| TreeMap<byte[], HRegionLocation> regionMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); |
| List<byte[]> regionKeys = Lists.newArrayListWithExpectedSize(locations.size()); |
| for (HRegionLocation entry : locations) { |
| if (!serverMap.contains(entry)) { |
| regionKeys.add(entry.getRegionInfo().getStartKey()); |
| regionMap.put(entry.getRegionInfo().getRegionName(), entry); |
| serverMap.add(entry); |
| } |
| } |
| final TreeMap<byte[],Long> results = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); |
| connection.processExecs(MetaDataProtocol.class, regionKeys, |
| PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, this.getDelegate().getExecutor(), new Batch.Call<MetaDataProtocol,Long>() { |
| @Override |
| public Long call(MetaDataProtocol instance) throws IOException { |
| return instance.getVersion(); |
| } |
| }, |
| new Batch.Callback<Long>(){ |
| @Override |
| public void update(byte[] region, byte[] row, Long value) { |
| results.put(region, value); |
| } |
| }); |
| for (Map.Entry<byte[],Long> result : results.entrySet()) { |
| // This is the "phoenix.jar" is in-place, but server is out-of-sync with client case. |
| if (!isCompatible(result.getValue())) { |
| isIncompatible = true; |
| HRegionLocation name = regionMap.get(result.getKey()); |
| buf.append(name); |
| buf.append(';'); |
| } |
| hasInvalidIndexConfiguration |= isInvalidMutableIndexConfig(result.getValue()); |
| if (minHBaseVersion > MetaDataUtil.decodeHBaseVersion(result.getValue())) { |
| minHBaseVersion = MetaDataUtil.decodeHBaseVersion(result.getValue()); |
| } |
| } |
| lowestClusterHBaseVersion = minHBaseVersion; |
| } catch (Throwable t) { |
| // This is the case if the "phoenix.jar" is not on the classpath of HBase on the region server |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.INCOMPATIBLE_CLIENT_SERVER_JAR).setRootCause(t) |
| .setMessage("Ensure that " + QueryConstants.DEFAULT_COPROCESS_PATH + " is put on the classpath of HBase in every region server: " + t.getMessage()) |
| .build().buildException(); |
| } |
| if (isIncompatible) { |
| buf.setLength(buf.length()-1); |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.OUTDATED_JARS).setMessage(buf.toString()).build().buildException(); |
| } |
| } |
| |
| /** |
| * Invoke meta data coprocessor with one retry if the key was found to not be in the regions |
| * (due to a table split) |
| */ |
| private MetaDataMutationResult metaDataCoprocessorExec(byte[] tableKey, Batch.Call<MetaDataProtocol, MetaDataMutationResult> callable) throws SQLException { |
| try { |
| boolean retried = false; |
| while (true) { |
| HRegionLocation regionLocation = retried ? connection.relocateRegion(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, tableKey) : connection.locateRegion(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, tableKey); |
| List<byte[]> regionKeys = Collections.singletonList(regionLocation.getRegionInfo().getStartKey()); |
| final Map<byte[],MetaDataMutationResult> results = Maps.newHashMapWithExpectedSize(1); |
| connection.processExecs(MetaDataProtocol.class, regionKeys, |
| PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, this.getDelegate().getExecutor(), callable, |
| new Batch.Callback<MetaDataMutationResult>(){ |
| @Override |
| public void update(byte[] region, byte[] row, MetaDataMutationResult value) { |
| results.put(region, value); |
| } |
| }); |
| assert(results.size() == 1); |
| MetaDataMutationResult result = results.values().iterator().next(); |
| if (result.getMutationCode() == MutationCode.TABLE_NOT_IN_REGION) { |
| if (retried) return result; |
| retried = true; |
| continue; |
| } |
| return result; |
| } |
| } catch (IOException e) { |
| throw ServerUtil.parseServerException(e); |
| } catch (Throwable t) { |
| throw new SQLException(t); |
| } |
| } |
| |
| // Our property values are translated using toString, so we need to "string-ify" this. |
| private static final String TRUE_BYTES_AS_STRING = Bytes.toString(PDataType.TRUE_BYTES); |
| |
| private void ensureViewIndexTableCreated(byte[] physicalTableName, Map<String,Object> tableProps, List<Pair<byte[],Map<String,Object>>> families, byte[][] splits, long timestamp) throws SQLException { |
| Long maxFileSize = (Long)tableProps.get(HTableDescriptor.MAX_FILESIZE); |
| if (maxFileSize == null) { |
| maxFileSize = this.config.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE); |
| } |
| byte[] physicalIndexName = MetaDataUtil.getViewIndexPhysicalName(physicalTableName); |
| |
| int indexMaxFileSizePerc; |
| // Get percentage to use from table props first and then fallback to config |
| Integer indexMaxFileSizePercProp = (Integer)tableProps.remove(QueryServices.INDEX_MAX_FILESIZE_PERC_ATTRIB); |
| if (indexMaxFileSizePercProp == null) { |
| indexMaxFileSizePerc = config.getInt(QueryServices.INDEX_MAX_FILESIZE_PERC_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_MAX_FILESIZE_PERC); |
| } else { |
| indexMaxFileSizePerc = indexMaxFileSizePercProp; |
| } |
| long indexMaxFileSize = maxFileSize * indexMaxFileSizePerc / 100; |
| tableProps.put(HTableDescriptor.MAX_FILESIZE, indexMaxFileSize); |
| tableProps.put(MetaDataUtil.IS_VIEW_INDEX_TABLE_PROP_NAME, TRUE_BYTES_AS_STRING); |
| HTableDescriptor desc = ensureTableCreated(physicalIndexName, PTableType.TABLE, tableProps, families, splits, false); |
| if (desc != null) { |
| if (!Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(desc.getValue(MetaDataUtil.IS_VIEW_INDEX_TABLE_PROP_BYTES)))) { |
| String fullTableName = Bytes.toString(physicalIndexName); |
| throw new TableAlreadyExistsException( |
| "Unable to create shared physical table for indexes on views.", |
| SchemaUtil.getSchemaNameFromFullName(fullTableName), |
| SchemaUtil.getTableNameFromFullName(fullTableName)); |
| } |
| } |
| } |
| |
| |
| private boolean ensureViewIndexTableDropped(byte[] physicalTableName, long timestamp) throws SQLException { |
| byte[] physicalIndexName = MetaDataUtil.getViewIndexPhysicalName(physicalTableName); |
| HTableDescriptor desc = null; |
| HBaseAdmin admin = null; |
| boolean wasDeleted = false; |
| try { |
| admin = new HBaseAdmin(config); |
| try { |
| desc = admin.getTableDescriptor(physicalIndexName); |
| if (Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(desc.getValue(MetaDataUtil.IS_VIEW_INDEX_TABLE_PROP_BYTES)))) { |
| final ReadOnlyProps props = this.getProps(); |
| final boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA); |
| if (dropMetadata) { |
| admin.disableTable(physicalIndexName); |
| admin.deleteTable(physicalIndexName); |
| clearTableRegionCache(physicalIndexName); |
| wasDeleted = true; |
| } |
| } |
| } catch (org.apache.hadoop.hbase.TableNotFoundException ignore) { |
| // Ignore, as we may never have created a view index table |
| } |
| } catch (IOException e) { |
| throw ServerUtil.parseServerException(e); |
| } finally { |
| try { |
| if (admin != null) admin.close(); |
| } catch (IOException e) { |
| logger.warn("",e); |
| } |
| } |
| return wasDeleted; |
| } |
| |
| @Override |
| public MetaDataMutationResult createTable(final List<Mutation> tableMetaData, byte[] physicalTableName, PTableType tableType, |
| Map<String,Object> tableProps, final List<Pair<byte[],Map<String,Object>>> families, byte[][] splits) throws SQLException { |
| byte[][] rowKeyMetadata = new byte[3][]; |
| Mutation m = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetaData); |
| byte[] key = m.getRow(); |
| SchemaUtil.getVarChars(key, rowKeyMetadata); |
| byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; |
| byte[] schemaBytes = rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX]; |
| byte[] tableBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]; |
| byte[] tableName = physicalTableName != null ? physicalTableName : SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes); |
| if ((tableType == PTableType.VIEW && physicalTableName != null) || (tableType != PTableType.VIEW && physicalTableName == null)) { |
| // For views this will ensure that metadata already exists |
| ensureTableCreated(tableName, tableType, tableProps, families, splits, true); |
| } |
| ImmutableBytesWritable ptr = new ImmutableBytesWritable(); |
| if (tableType == PTableType.INDEX && physicalTableName != null) { // Index on view |
| // Physical index table created up front for multi tenant |
| // TODO: if viewIndexId is Short.MIN_VALUE, then we don't need to attempt to create it |
| if (!MetaDataUtil.isMultiTenant(m, kvBuilder, ptr)) { |
| ensureViewIndexTableCreated(tenantIdBytes.length == 0 ? null : PNameFactory.newName(tenantIdBytes), physicalTableName, MetaDataUtil.getClientTimeStamp(m)); |
| } |
| } else if (tableType == PTableType.TABLE && MetaDataUtil.isMultiTenant(m, kvBuilder, ptr)) { // Create view index table up front for multi tenant tables |
| ptr.set(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES); |
| MetaDataUtil.getMutationValue(m, PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME_BYTES, kvBuilder, ptr); |
| List<Pair<byte[],Map<String,Object>>> familiesPlusDefault = null; |
| for (Pair<byte[],Map<String,Object>> family : families) { |
| byte[] cf = family.getFirst(); |
| if (Bytes.compareTo(cf, 0, cf.length, ptr.get(), ptr.getOffset(),ptr.getLength()) == 0) { |
| familiesPlusDefault = families; |
| break; |
| } |
| } |
| // Don't override if default family already present |
| if (familiesPlusDefault == null) { |
| byte[] defaultCF = ByteUtil.copyKeyBytesIfNecessary(ptr); |
| // Only use splits if table is salted, otherwise it may not be applicable |
| // Always add default column family, as we don't know in advance if we'll need it |
| familiesPlusDefault = Lists.newArrayList(families); |
| familiesPlusDefault.add(new Pair<byte[],Map<String,Object>>(defaultCF,Collections.<String,Object>emptyMap())); |
| } |
| ensureViewIndexTableCreated(tableName, tableProps, familiesPlusDefault, MetaDataUtil.isSalted(m, kvBuilder, ptr) ? splits : null, MetaDataUtil.getClientTimeStamp(m)); |
| } |
| |
| byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes); |
| MetaDataMutationResult result = metaDataCoprocessorExec(tableKey, |
| new Batch.Call<MetaDataProtocol, MetaDataMutationResult>() { |
| @Override |
| public MetaDataMutationResult call(MetaDataProtocol instance) throws IOException { |
| return instance.createTable(tableMetaData); |
| } |
| }); |
| return result; |
| } |
| |
| @Override |
| public MetaDataMutationResult getTable(final PName tenantId, final byte[] schemaBytes, final byte[] tableBytes, |
| final long tableTimestamp, final long clientTimestamp) throws SQLException { |
| final byte[] tenantIdBytes = tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(); |
| byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes); |
| return metaDataCoprocessorExec(tableKey, |
| new Batch.Call<MetaDataProtocol, MetaDataMutationResult>() { |
| @Override |
| public MetaDataMutationResult call(MetaDataProtocol instance) throws IOException { |
| return instance.getTable(tenantIdBytes, schemaBytes, tableBytes, tableTimestamp, clientTimestamp); |
| } |
| }); |
| } |
| |
| @Override |
| public MetaDataMutationResult dropTable(final List<Mutation> tableMetaData, final PTableType tableType) throws SQLException { |
| byte[][] rowKeyMetadata = new byte[3][]; |
| SchemaUtil.getVarChars(tableMetaData.get(0).getRow(), rowKeyMetadata); |
| byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; |
| byte[] schemaBytes = rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX]; |
| byte[] tableBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]; |
| byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantIdBytes, schemaBytes, tableBytes); |
| final MetaDataMutationResult result = metaDataCoprocessorExec(tableKey, |
| new Batch.Call<MetaDataProtocol, MetaDataMutationResult>() { |
| @Override |
| public MetaDataMutationResult call(MetaDataProtocol instance) throws IOException { |
| return instance.dropTable(tableMetaData, tableType.getSerializedValue()); |
| } |
| }); |
| |
| final MutationCode code = result.getMutationCode(); |
| switch(code) { |
| case TABLE_ALREADY_EXISTS: |
| ReadOnlyProps props = this.getProps(); |
| boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA); |
| if (dropMetadata) { |
| dropTables(result.getTableNamesToDelete()); |
| } |
| if (tableType == PTableType.TABLE) { |
| byte[] physicalTableName = SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes); |
| long timestamp = MetaDataUtil.getClientTimeStamp(tableMetaData); |
| ensureViewIndexTableDropped(physicalTableName, timestamp); |
| } |
| break; |
| default: |
| break; |
| } |
| return result; |
| } |
| |
| private void dropTables(final List<byte[]> tableNamesToDelete) throws SQLException { |
| HBaseAdmin admin = null; |
| SQLException sqlE = null; |
| try{ |
| admin = new HBaseAdmin(config); |
| if (tableNamesToDelete != null){ |
| for ( byte[] tableName : tableNamesToDelete ) { |
| if ( admin.tableExists(tableName) ) { |
| admin.disableTable(tableName); |
| admin.deleteTable(tableName); |
| clearTableRegionCache(tableName); |
| } |
| } |
| } |
| |
| } catch (IOException e) { |
| sqlE = ServerUtil.parseServerException(e); |
| } finally { |
| try { |
| if (admin != null) { |
| admin.close(); |
| } |
| } catch (IOException e) { |
| if (sqlE == null) { |
| sqlE = ServerUtil.parseServerException(e); |
| } else { |
| sqlE.setNextException(ServerUtil.parseServerException(e)); |
| } |
| } finally { |
| if (sqlE != null) { |
| throw sqlE; |
| } |
| } |
| } |
| } |
| |
| private static Map<String,Object> createPropertiesMap(Map<ImmutableBytesWritable,ImmutableBytesWritable> htableProps) { |
| Map<String,Object> props = Maps.newHashMapWithExpectedSize(htableProps.size()); |
| for (Map.Entry<ImmutableBytesWritable,ImmutableBytesWritable> entry : htableProps.entrySet()) { |
| ImmutableBytesWritable key = entry.getKey(); |
| ImmutableBytesWritable value = entry.getValue(); |
| props.put(Bytes.toString(key.get(), key.getOffset(), key.getLength()), Bytes.toString(value.get(), value.getOffset(), value.getLength())); |
| } |
| return props; |
| } |
| |
| private void ensureViewIndexTableCreated(PName tenantId, byte[] physicalIndexTableName, long timestamp) throws SQLException { |
| PTable table; |
| String name = Bytes.toString( |
| physicalIndexTableName, |
| MetaDataUtil.VIEW_INDEX_TABLE_PREFIX_BYTES.length, |
| physicalIndexTableName.length-MetaDataUtil.VIEW_INDEX_TABLE_PREFIX_BYTES.length); |
| try { |
| synchronized (latestMetaDataLock) { |
| throwConnectionClosedIfNullMetaData(); |
| table = latestMetaData.getTable(new PTableKey(tenantId, name)); |
| latestMetaDataLock.notifyAll(); |
| } |
| if (table.getTimeStamp() >= timestamp) { // Table in cache is newer than client timestamp which shouldn't be the case |
| throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString()); |
| } |
| } catch (TableNotFoundException e) { |
| byte[] schemaName = Bytes.toBytes(SchemaUtil.getSchemaNameFromFullName(name)); |
| byte[] tableName = Bytes.toBytes(SchemaUtil.getTableNameFromFullName(name)); |
| MetaDataMutationResult result = this.getTable(null, schemaName, tableName, HConstants.LATEST_TIMESTAMP, timestamp); |
| table = result.getTable(); |
| if (table == null) { |
| throw e; |
| } |
| } |
| ensureViewIndexTableCreated(table, timestamp); |
| } |
| |
| private void ensureViewIndexTableCreated(PTable table, long timestamp) throws SQLException { |
| byte[] physicalTableName = table.getPhysicalName().getBytes(); |
| HTableDescriptor htableDesc = this.getTableDescriptor(physicalTableName); |
| Map<String,Object> tableProps = createPropertiesMap(htableDesc.getValues()); |
| List<Pair<byte[],Map<String,Object>>> families = Lists.newArrayListWithExpectedSize(Math.max(1, table.getColumnFamilies().size()+1)); |
| if (families.isEmpty()) { |
| byte[] familyName = SchemaUtil.getEmptyColumnFamily(table); |
| Map<String,Object> familyProps = createPropertiesMap(htableDesc.getFamily(familyName).getValues()); |
| families.add(new Pair<byte[],Map<String,Object>>(familyName, familyProps)); |
| } else { |
| for (PColumnFamily family : table.getColumnFamilies()) { |
| byte[] familyName = family.getName().getBytes(); |
| Map<String,Object> familyProps = createPropertiesMap(htableDesc.getFamily(familyName).getValues()); |
| families.add(new Pair<byte[],Map<String,Object>>(familyName, familyProps)); |
| } |
| // Always create default column family, because we don't know in advance if we'll |
| // need it for an index with no covered columns. |
| families.add(new Pair<byte[],Map<String,Object>>(table.getDefaultFamilyName().getBytes(), Collections.<String,Object>emptyMap())); |
| } |
| byte[][] splits = null; |
| if (table.getBucketNum() != null) { |
| splits = SaltingUtil.getSalteByteSplitPoints(table.getBucketNum()); |
| } |
| |
| ensureViewIndexTableCreated(physicalTableName, tableProps, families, splits, timestamp); |
| } |
| |
| @Override |
| public MetaDataMutationResult addColumn(final List<Mutation> tableMetaData, List<Pair<byte[],Map<String,Object>>> families, PTable table) throws SQLException { |
| byte[][] rowKeyMetaData = new byte[3][]; |
| PTableType tableType = table.getType(); |
| |
| Mutation m = tableMetaData.get(0); |
| byte[] rowKey = m.getRow(); |
| SchemaUtil.getVarChars(rowKey, rowKeyMetaData); |
| byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; |
| byte[] schemaBytes = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX]; |
| byte[] tableBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]; |
| byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes); |
| for ( Pair<byte[],Map<String,Object>> family : families) { |
| ensureFamilyCreated(table.getPhysicalName().getBytes(), tableType, family); |
| } |
| // Special case for call during drop table to ensure that the empty column family exists. |
| // In this, case we only include the table header row, as until we add schemaBytes and tableBytes |
| // as args to this function, we have no way of getting them in this case. |
| // TODO: change to if (tableMetaData.isEmpty()) once we pass through schemaBytes and tableBytes |
| // Also, could be used to update property values on ALTER TABLE t SET prop=xxx |
| if (tableMetaData.size() == 1 && tableMetaData.get(0).isEmpty()) { |
| return null; |
| } |
| ImmutableBytesWritable ptr = new ImmutableBytesWritable(); |
| MetaDataMutationResult result = metaDataCoprocessorExec(tableKey, |
| new Batch.Call<MetaDataProtocol, MetaDataMutationResult>() { |
| @Override |
| public MetaDataMutationResult call(MetaDataProtocol instance) throws IOException { |
| return instance.addColumn(tableMetaData); |
| } |
| }); |
| |
| if (result.getMutationCode() == MutationCode.COLUMN_NOT_FOUND) { // Success |
| // Flush the table if transitioning DISABLE_WAL from TRUE to FALSE |
| if ( MetaDataUtil.getMutationValue(m,PhoenixDatabaseMetaData.DISABLE_WAL_BYTES, kvBuilder, ptr) |
| && Boolean.FALSE.equals(PDataType.BOOLEAN.toObject(ptr))) { |
| flushTable(table.getPhysicalName().getBytes()); |
| } |
| |
| if (tableType == PTableType.TABLE) { |
| // If we're changing MULTI_TENANT to true or false, create or drop the view index table |
| if (MetaDataUtil.getMutationValue(m, PhoenixDatabaseMetaData.MULTI_TENANT_BYTES, kvBuilder, ptr)){ |
| long timestamp = MetaDataUtil.getClientTimeStamp(m); |
| if (Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(ptr.get(), ptr.getOffset(), ptr.getLength()))) { |
| this.ensureViewIndexTableCreated(table, timestamp); |
| } else { |
| this.ensureViewIndexTableDropped(table.getPhysicalName().getBytes(), timestamp); |
| } |
| } |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| public MetaDataMutationResult dropColumn(final List<Mutation> tableMetaData, PTableType tableType) throws SQLException { |
| byte[][] rowKeyMetadata = new byte[3][]; |
| SchemaUtil.getVarChars(tableMetaData.get(0).getRow(), rowKeyMetadata); |
| byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; |
| byte[] schemaBytes = rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX]; |
| byte[] tableBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]; |
| byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes); |
| MetaDataMutationResult result = metaDataCoprocessorExec(tableKey, |
| new Batch.Call<MetaDataProtocol, MetaDataMutationResult>() { |
| @Override |
| public MetaDataMutationResult call(MetaDataProtocol instance) throws IOException { |
| return instance.dropColumn(tableMetaData); |
| } |
| }); |
| final MutationCode code = result.getMutationCode(); |
| switch(code) { |
| case TABLE_ALREADY_EXISTS: |
| final ReadOnlyProps props = this.getProps(); |
| final boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA); |
| if (dropMetadata) { |
| dropTables(result.getTableNamesToDelete()); |
| } |
| break; |
| default: |
| break; |
| } |
| return result; |
| |
| } |
| |
| // Keeping this to use for further upgrades |
| protected PhoenixConnection addColumnsIfNotExists(PhoenixConnection oldMetaConnection, |
| String tableName, long timestamp, String columns) throws SQLException { |
| |
| Properties props = new Properties(oldMetaConnection.getClientInfo()); |
| props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timestamp)); |
| // Cannot go through DriverManager or you end up in an infinite loop because it'll call init again |
| PhoenixConnection metaConnection = new PhoenixConnection(this, oldMetaConnection.getURL(), props, oldMetaConnection.getMetaDataCache()); |
| SQLException sqlE = null; |
| try { |
| metaConnection.createStatement().executeUpdate("ALTER TABLE " + tableName + " ADD IF NOT EXISTS " + columns ); |
| } catch (SQLException e) { |
| logger.warn("addColumnsIfNotExists failed due to:" + e); |
| sqlE = e; |
| } finally { |
| try { |
| oldMetaConnection.close(); |
| } catch (SQLException e) { |
| if (sqlE != null) { |
| sqlE.setNextException(e); |
| } else { |
| sqlE = e; |
| } |
| } |
| if (sqlE != null) { |
| throw sqlE; |
| } |
| } |
| return metaConnection; |
| } |
| |
| @Override |
| public void init(final String url, final Properties props) throws SQLException { |
| try { |
| PhoenixContextExecutor.call(new Callable<Void>() { |
| @Override |
| public Void call() throws Exception { |
| if (initialized) { |
| if (initializationException != null) { |
| // Throw previous initialization exception, as we won't resuse this instance |
| throw initializationException; |
| } |
| return null; |
| } |
| synchronized (this) { |
| if (initialized) { |
| if (initializationException != null) { |
| // Throw previous initialization exception, as we won't resuse this instance |
| throw initializationException; |
| } |
| return null; |
| } |
| checkClosed(); |
| SQLException sqlE = null; |
| PhoenixConnection metaConnection = null; |
| try { |
| openConnection(); |
| Properties scnProps = PropertiesUtil.deepCopy(props); |
| scnProps.setProperty( |
| PhoenixRuntime.CURRENT_SCN_ATTRIB, |
| Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP)); |
| scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB); |
| metaConnection = new PhoenixConnection( |
| ConnectionQueryServicesImpl.this, url, scnProps, newEmptyMetaData()); |
| try { |
| metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA); |
| } catch (NewerTableAlreadyExistsException ignore) { |
| // Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed timestamp. |
| // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp. |
| } catch (TableAlreadyExistsException ignore) { |
| // This will occur if we have an older SYSTEM.CATALOG and we need to update it to include |
| // any new columns we've added. |
| metaConnection = addColumnsIfNotExists(metaConnection, |
| PhoenixDatabaseMetaData.SYSTEM_CATALOG, |
| MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, |
| PhoenixDatabaseMetaData.INDEX_TYPE + " " + PDataType.UNSIGNED_TINYINT.getSqlTypeName() + |
| ", " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + PDataType.LONG.getSqlTypeName()); |
| } |
| try { |
| metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_SEQUENCE_METADATA); |
| } catch (NewerTableAlreadyExistsException ignore) { |
| // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed timestamp. |
| // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp. |
| } catch (TableAlreadyExistsException ignore) { |
| // This will occur if we have an older SYSTEM.SEQUENCE, so we need to update it to include |
| // any new columns we've added. |
| String newColumns = |
| MIN_VALUE + " " + PDataType.LONG.getSqlTypeName() + ", " |
| + MAX_VALUE + " " + PDataType.LONG.getSqlTypeName() + ", " |
| + CYCLE_FLAG + " " + PDataType.BOOLEAN.getSqlTypeName() + ", " |
| + LIMIT_REACHED_FLAG + " " + PDataType.BOOLEAN.getSqlTypeName(); |
| metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME, |
| MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, newColumns); |
| } |
| } catch (SQLException e) { |
| sqlE = e; |
| } finally { |
| try { |
| if (metaConnection != null) metaConnection.close(); |
| } catch (SQLException e) { |
| if (sqlE != null) { |
| sqlE.setNextException(e); |
| } else { |
| sqlE = e; |
| } |
| } finally { |
| try { |
| if (sqlE != null) { |
| initializationException = sqlE; |
| throw sqlE; |
| } |
| } finally { |
| initialized = true; |
| } |
| } |
| } |
| } |
| return null; |
| } |
| }); |
| } catch (Exception e) { |
| Throwables.propagateIfInstanceOf(e, SQLException.class); |
| throw Throwables.propagate(e); |
| } |
| } |
| |
| @Override |
| public MutationState updateData(MutationPlan plan) throws SQLException { |
| return plan.execute(); |
| } |
| |
| @Override |
| public int getLowestClusterHBaseVersion() { |
| return lowestClusterHBaseVersion; |
| } |
| |
| @Override |
| public boolean hasInvalidIndexConfiguration() { |
| return hasInvalidIndexConfiguration; |
| } |
| |
| /** |
| * Clears the Phoenix meta data cache on each region server |
| * @throws SQLException |
| */ |
| protected void clearCache() throws SQLException { |
| try { |
| SQLException sqlE = null; |
| HTableInterface htable = this.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); |
| try { |
| htable.coprocessorExec(MetaDataProtocol.class, HConstants.EMPTY_START_ROW, |
| HConstants.EMPTY_END_ROW, new Batch.Call<MetaDataProtocol, Void>() { |
| @Override |
| public Void call(MetaDataProtocol instance) throws IOException { |
| instance.clearCache(); |
| return null; |
| } |
| }); |
| } catch (IOException e) { |
| throw ServerUtil.parseServerException(e); |
| } catch (Throwable e) { |
| sqlE = new SQLException(e); |
| } finally { |
| try { |
| htable.close(); |
| } catch (IOException e) { |
| if (sqlE == null) { |
| sqlE = ServerUtil.parseServerException(e); |
| } else { |
| sqlE.setNextException(ServerUtil.parseServerException(e)); |
| } |
| } finally { |
| if (sqlE != null) { |
| throw sqlE; |
| } |
| } |
| } |
| } catch (Exception e) { |
| throw new SQLException(ServerUtil.parseServerException(e)); |
| } |
| } |
| |
| private void flushTable(byte[] tableName) throws SQLException { |
| HBaseAdmin admin = getAdmin(); |
| try { |
| admin.flush(tableName); |
| } catch (IOException e) { |
| throw new PhoenixIOException(e); |
| } catch (InterruptedException e) { |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build() |
| .buildException(); |
| } finally { |
| Closeables.closeQuietly(admin); |
| } |
| } |
| |
| @Override |
| public HBaseAdmin getAdmin() throws SQLException { |
| try { |
| return new HBaseAdmin(config); |
| } catch (IOException e) { |
| throw new PhoenixIOException(e); |
| } |
| } |
| |
| @Override |
| public MetaDataMutationResult updateIndexState(final List<Mutation> tableMetaData, String parentTableName) throws SQLException { |
| byte[][] rowKeyMetadata = new byte[3][]; |
| SchemaUtil.getVarChars(tableMetaData.get(0).getRow(), rowKeyMetadata); |
| byte[] tableKey = SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY, rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX], rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]); |
| return metaDataCoprocessorExec(tableKey, |
| new Batch.Call<MetaDataProtocol, MetaDataMutationResult>() { |
| @Override |
| public MetaDataMutationResult call(MetaDataProtocol instance) throws IOException { |
| return instance.updateIndexState(tableMetaData); |
| } |
| }); |
| } |
| |
| @Override |
| public long createSequence(String tenantId, String schemaName, String sequenceName, |
| long startWith, long incrementBy, long cacheSize, long minValue, long maxValue, |
| boolean cycle, long timestamp) throws SQLException { |
| SequenceKey sequenceKey = new SequenceKey(tenantId, schemaName, sequenceName); |
| Sequence newSequences = new Sequence(sequenceKey); |
| Sequence sequence = sequenceMap.putIfAbsent(sequenceKey, newSequences); |
| if (sequence == null) { |
| sequence = newSequences; |
| } |
| try { |
| sequence.getLock().lock(); |
| // Now that we have the lock we need, create the sequence |
| Append append = sequence.createSequence(startWith, incrementBy, cacheSize, timestamp, minValue, maxValue, cycle); |
| HTableInterface htable = |
| this.getTable(PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES); |
| try { |
| Result result = htable.append(append); |
| return sequence.createSequence(result, minValue, maxValue, cycle); |
| } catch (IOException e) { |
| throw ServerUtil.parseServerException(e); |
| } finally { |
| closeQuietly(htable); |
| } |
| } finally { |
| sequence.getLock().unlock(); |
| } |
| } |
| |
| @Override |
| public long dropSequence(String tenantId, String schemaName, String sequenceName, long timestamp) throws SQLException { |
| SequenceKey sequenceKey = new SequenceKey(tenantId, schemaName, sequenceName); |
| Sequence newSequences = new Sequence(sequenceKey); |
| Sequence sequence = sequenceMap.putIfAbsent(sequenceKey, newSequences); |
| if (sequence == null) { |
| sequence = newSequences; |
| } |
| try { |
| sequence.getLock().lock(); |
| // Now that we have the lock we need, create the sequence |
| Append append = sequence.dropSequence(timestamp); |
| HTableInterface htable = this.getTable(PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES); |
| try { |
| Result result = htable.append(append); |
| return sequence.dropSequence(result); |
| } catch (IOException e) { |
| throw ServerUtil.parseServerException(e); |
| } finally { |
| closeQuietly(htable); |
| } |
| } finally { |
| sequence.getLock().unlock(); |
| } |
| } |
| |
| /** |
| * Gets the current sequence value |
| * @param tenantId |
| * @param sequence |
| * @throws SQLException if cached sequence cannot be found |
| */ |
| @Override |
| public long currentSequenceValue(SequenceKey sequenceKey, long timestamp) throws SQLException { |
| Sequence sequence = sequenceMap.get(sequenceKey); |
| if (sequence == null) { |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CALL_CURRENT_BEFORE_NEXT_VALUE) |
| .setSchemaName(sequenceKey.getSchemaName()).setTableName(sequenceKey.getSequenceName()) |
| .build().buildException(); |
| } |
| sequence.getLock().lock(); |
| try { |
| return sequence.currentValue(timestamp); |
| } catch (EmptySequenceCacheException e) { |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CALL_CURRENT_BEFORE_NEXT_VALUE) |
| .setSchemaName(sequenceKey.getSchemaName()).setTableName(sequenceKey.getSequenceName()) |
| .build().buildException(); |
| } finally { |
| sequence.getLock().unlock(); |
| } |
| } |
| |
| /** |
| * Verifies that sequences exist and reserves values for them if reserveValues is true |
| */ |
| @Override |
| public void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException { |
| incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, 0, action); |
| } |
| |
| /** |
| * Increment any of the set of sequences that need more values. These are the sequences |
| * that are asking for the next value within a given statement. The returned sequences |
| * are the ones that were not found because they were deleted by another client. |
| * @param tenantId |
| * @param sequenceKeys sorted list of sequence kyes |
| * @param batchSize |
| * @param timestamp |
| * @throws SQLException if any of the sequences cannot be found |
| * |
| */ |
| @Override |
| public void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions) throws SQLException { |
| incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, 1, Sequence.ValueOp.RESERVE_SEQUENCE); |
| } |
| |
| private void incrementSequenceValues(List<SequenceKey> keys, long timestamp, long[] values, SQLException[] exceptions, int factor, Sequence.ValueOp action) throws SQLException { |
| List<Sequence> sequences = Lists.newArrayListWithExpectedSize(keys.size()); |
| for (SequenceKey key : keys) { |
| Sequence newSequences = new Sequence(key); |
| Sequence sequence = sequenceMap.putIfAbsent(key, newSequences); |
| if (sequence == null) { |
| sequence = newSequences; |
| } |
| sequences.add(sequence); |
| } |
| try { |
| for (Sequence sequence : sequences) { |
| sequence.getLock().lock(); |
| } |
| // Now that we have all the locks we need, increment the sequences |
| List<Increment> incrementBatch = Lists.newArrayListWithExpectedSize(sequences.size()); |
| List<Sequence> toIncrementList = Lists.newArrayListWithExpectedSize(sequences.size()); |
| int[] indexes = new int[sequences.size()]; |
| for (int i = 0; i < sequences.size(); i++) { |
| Sequence sequence = sequences.get(i); |
| try { |
| values[i] = sequence.incrementValue(timestamp, factor, action); |
| } catch (EmptySequenceCacheException e) { |
| indexes[toIncrementList.size()] = i; |
| toIncrementList.add(sequence); |
| Increment inc = sequence.newIncrement(timestamp, action); |
| incrementBatch.add(inc); |
| } catch (SQLException e) { |
| exceptions[i] = e; |
| } |
| } |
| if (toIncrementList.isEmpty()) { |
| return; |
| } |
| HTableInterface hTable = this.getTable(PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES); |
| Object[] resultObjects = null; |
| SQLException sqlE = null; |
| try { |
| resultObjects= hTable.batch(incrementBatch); |
| } catch (IOException e){ |
| sqlE = ServerUtil.parseServerException(e); |
| } catch (InterruptedException e){ |
| sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION) |
| .setRootCause(e).build().buildException(); // FIXME ? |
| } finally { |
| try { |
| hTable.close(); |
| } catch (IOException e) { |
| if (sqlE == null) { |
| sqlE = ServerUtil.parseServerException(e); |
| } else { |
| sqlE.setNextException(ServerUtil.parseServerException(e)); |
| } |
| } |
| if (sqlE != null) { |
| throw sqlE; |
| } |
| } |
| for (int i=0;i<resultObjects.length;i++){ |
| Sequence sequence = toIncrementList.get(i); |
| Result result = (Result)resultObjects[i]; |
| try { |
| values[indexes[i]] = sequence.incrementValue(result, factor); |
| } catch (SQLException e) { |
| exceptions[indexes[i]] = e; |
| } |
| } |
| } finally { |
| for (Sequence sequence : sequences) { |
| sequence.getLock().unlock(); |
| } |
| } |
| } |
| |
| @Override |
| public void returnSequences(List<SequenceKey> keys, long timestamp, SQLException[] exceptions) throws SQLException { |
| List<Sequence> sequences = Lists.newArrayListWithExpectedSize(keys.size()); |
| for (SequenceKey key : keys) { |
| Sequence newSequences = new Sequence(key); |
| Sequence sequence = sequenceMap.putIfAbsent(key, newSequences); |
| if (sequence == null) { |
| sequence = newSequences; |
| } |
| sequences.add(sequence); |
| } |
| try { |
| for (Sequence sequence : sequences) { |
| sequence.getLock().lock(); |
| } |
| // Now that we have all the locks we need, attempt to return the unused sequence values |
| List<Append> mutations = Lists.newArrayListWithExpectedSize(sequences.size()); |
| List<Sequence> toReturnList = Lists.newArrayListWithExpectedSize(sequences.size()); |
| int[] indexes = new int[sequences.size()]; |
| for (int i = 0; i < sequences.size(); i++) { |
| Sequence sequence = sequences.get(i); |
| try { |
| Append append = sequence.newReturn(timestamp); |
| toReturnList.add(sequence); |
| mutations.add(append); |
| } catch (EmptySequenceCacheException ignore) { // Nothing to return, so ignore |
| } |
| } |
| if (toReturnList.isEmpty()) { |
| return; |
| } |
| HTableInterface hTable = this.getTable(PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES); |
| Object[] resultObjects = null; |
| SQLException sqlE = null; |
| try { |
| resultObjects= hTable.batch(mutations); |
| } catch (IOException e){ |
| sqlE = ServerUtil.parseServerException(e); |
| } catch (InterruptedException e){ |
| sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION) |
| .setRootCause(e).build().buildException(); // FIXME ? |
| } finally { |
| try { |
| hTable.close(); |
| } catch (IOException e) { |
| if (sqlE == null) { |
| sqlE = ServerUtil.parseServerException(e); |
| } else { |
| sqlE.setNextException(ServerUtil.parseServerException(e)); |
| } |
| } |
| if (sqlE != null) { |
| throw sqlE; |
| } |
| } |
| for (int i=0;i<resultObjects.length;i++){ |
| Sequence sequence = toReturnList.get(i); |
| Result result = (Result)resultObjects[i]; |
| try { |
| sequence.returnValue(result); |
| } catch (SQLException e) { |
| exceptions[indexes[i]] = e; |
| } |
| } |
| } finally { |
| for (Sequence sequence : sequences) { |
| sequence.getLock().unlock(); |
| } |
| } |
| } |
| |
| // Take no locks, as this only gets run when there are no open connections |
| // so there's no danger of contention. |
| private void returnAllSequences(ConcurrentMap<SequenceKey,Sequence> sequenceMap) throws SQLException { |
| List<Append> mutations = Lists.newArrayListWithExpectedSize(sequenceMap.size()); |
| for (Sequence sequence : sequenceMap.values()) { |
| mutations.addAll(sequence.newReturns()); |
| } |
| if (mutations.isEmpty()) { |
| return; |
| } |
| HTableInterface hTable = this.getTable(PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES); |
| SQLException sqlE = null; |
| try { |
| hTable.batch(mutations); |
| } catch (IOException e){ |
| sqlE = ServerUtil.parseServerException(e); |
| } catch (InterruptedException e){ |
| sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION) |
| .setRootCause(e).build().buildException(); // FIXME ? |
| } finally { |
| try { |
| hTable.close(); |
| } catch (IOException e) { |
| if (sqlE == null) { |
| sqlE = ServerUtil.parseServerException(e); |
| } else { |
| sqlE.setNextException(ServerUtil.parseServerException(e)); |
| } |
| } |
| if (sqlE != null) { |
| throw sqlE; |
| } |
| } |
| } |
| |
| @Override |
| public synchronized void addConnection(PhoenixConnection connection) throws SQLException { |
| connectionCount++; |
| } |
| |
| @Override |
| public void removeConnection(PhoenixConnection connection) throws SQLException { |
| ConcurrentMap<SequenceKey,Sequence> formerSequenceMap = null; |
| synchronized(this) { |
| if (--connectionCount == 0) { |
| if (!this.sequenceMap.isEmpty()) { |
| formerSequenceMap = this.sequenceMap; |
| this.sequenceMap = Maps.newConcurrentMap(); |
| } |
| } |
| } |
| // Since we're using the former sequenceMap, we can do this outside |
| // the lock. |
| if (formerSequenceMap != null) { |
| // When there are no more connections, attempt to return any sequences |
| returnAllSequences(formerSequenceMap); |
| } |
| } |
| |
| @Override |
| public KeyValueBuilder getKeyValueBuilder() { |
| return this.kvBuilder; |
| } |
| |
| @Override |
| public boolean supportsFeature(Feature feature) { |
| // TODO: Keep map of Feature -> min HBase version |
| // For now, only Feature is REVERSE_SCAN and it's not supported in any version yet |
| return false; |
| } |
| |
| @Override |
| public String getUserName() { |
| return userName; |
| } |
| |
| private void checkClosed() { |
| if (closed) { |
| throwConnectionClosedException(); |
| } |
| } |
| |
| private void throwConnectionClosedIfNullMetaData() { |
| if (latestMetaData == null) { |
| throwConnectionClosedException(); |
| } |
| } |
| |
| private void throwConnectionClosedException() { |
| throw new IllegalStateException("Connection to the cluster is closed"); |
| } |
| |
| } |