| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.query; |
| |
| import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE_BYTES; |
| import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone; |
| |
| import java.sql.SQLException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Objects; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.concurrent.ExecutionException; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.HRegionLocation; |
| import org.apache.hadoop.hbase.ServerName; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.client.Admin; |
| import org.apache.hadoop.hbase.client.Mutation; |
| import org.apache.hadoop.hbase.client.RegionInfo; |
| import org.apache.hadoop.hbase.client.RegionInfoBuilder; |
| import org.apache.hadoop.hbase.client.Table; |
| import org.apache.hadoop.hbase.client.TableDescriptor; |
| import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
| import org.apache.hadoop.hbase.security.User; |
| import org.apache.hadoop.hbase.util.Addressing; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.Pair; |
| import org.apache.phoenix.compile.MutationPlan; |
| import org.apache.phoenix.coprocessorclient.MetaDataProtocol; |
| import org.apache.phoenix.coprocessorclient.MetaDataProtocol.MetaDataMutationResult; |
| import org.apache.phoenix.coprocessorclient.MetaDataProtocol.MutationCode; |
| import org.apache.phoenix.exception.SQLExceptionCode; |
| import org.apache.phoenix.exception.SQLExceptionInfo; |
| import org.apache.phoenix.execute.MutationState; |
| import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; |
| import org.apache.phoenix.hbase.index.util.KeyValueBuilder; |
| import org.apache.phoenix.jdbc.ConnectionInfo; |
| import org.apache.phoenix.jdbc.PhoenixConnection; |
| import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; |
| import org.apache.phoenix.log.QueryLoggerDisruptor; |
| import org.apache.phoenix.parse.PFunction; |
| import org.apache.phoenix.parse.PSchema; |
| import org.apache.phoenix.schema.ConnectionProperty; |
| import org.apache.phoenix.schema.FunctionNotFoundException; |
| import org.apache.phoenix.schema.NewerTableAlreadyExistsException; |
| import org.apache.phoenix.schema.PColumn; |
| import org.apache.phoenix.schema.PIndexState; |
| import org.apache.phoenix.schema.PMetaData; |
| import org.apache.phoenix.schema.PMetaDataImpl; |
| import org.apache.phoenix.schema.PName; |
| import org.apache.phoenix.schema.PNameFactory; |
| import org.apache.phoenix.schema.PTable; |
| import org.apache.phoenix.schema.PTableImpl; |
| import org.apache.phoenix.schema.PTableKey; |
| import org.apache.phoenix.schema.PTableType; |
| import org.apache.phoenix.schema.Sequence; |
| import org.apache.phoenix.schema.SequenceAllocation; |
| import org.apache.phoenix.schema.SequenceAlreadyExistsException; |
| import org.apache.phoenix.schema.SequenceInfo; |
| import org.apache.phoenix.schema.SequenceKey; |
| import org.apache.phoenix.schema.SequenceNotFoundException; |
| import org.apache.phoenix.schema.TableAlreadyExistsException; |
| import org.apache.phoenix.schema.TableNotFoundException; |
| import org.apache.phoenix.schema.stats.GuidePostsInfo; |
| import org.apache.phoenix.schema.stats.GuidePostsKey; |
| import org.apache.phoenix.transaction.PhoenixTransactionClient; |
| import org.apache.phoenix.transaction.TransactionFactory.Provider; |
| import org.apache.phoenix.util.ConfigUtil; |
| import org.apache.phoenix.util.IndexUtil; |
| import org.apache.phoenix.util.JDBCUtil; |
| import org.apache.phoenix.util.MetaDataUtil; |
| import org.apache.phoenix.util.PhoenixRuntime; |
| import org.apache.phoenix.util.PropertiesUtil; |
| import org.apache.phoenix.util.ReadOnlyProps; |
| import org.apache.phoenix.util.SchemaUtil; |
| import org.apache.phoenix.util.SequenceUtil; |
| |
| import org.apache.phoenix.thirdparty.com.google.common.collect.Lists; |
| import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; |
| |
| /** |
| * |
| * Implementation of ConnectionQueryServices used in testing where no connection to |
| * an hbase cluster is necessary. |
| * |
| * |
| * @since 0.1 |
| */ |
| public class ConnectionlessQueryServicesImpl extends DelegateQueryServices implements ConnectionQueryServices { |
| private static ServerName SERVER_NAME = ServerName.parseServerName(HConstants.LOCALHOST + Addressing.HOSTNAME_PORT_SEPARATOR + HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT); |
| private final ReadOnlyProps props; |
| private PMetaData metaData; |
| private final Map<SequenceKey, SequenceInfo> sequenceMap = Maps.newHashMap(); |
| private final String userName; |
| private KeyValueBuilder kvBuilder; |
| private volatile boolean initialized; |
| private volatile SQLException initializationException; |
| private final Map<String, List<HRegionLocation>> tableSplits = Maps.newHashMap(); |
| private GuidePostsCacheWrapper guidePostsCache; |
| private final Configuration config; |
| |
| private User user; |
| |
| public ConnectionlessQueryServicesImpl(QueryServices services, ConnectionInfo connInfo, Properties info) { |
| super(services); |
| userName = connInfo.getPrincipal(); |
| user = connInfo.getUser(); |
| metaData = newEmptyMetaData(); |
| |
| // Use KeyValueBuilder that builds real KeyValues, as our test utils require this |
| this.kvBuilder = GenericKeyValueBuilder.INSTANCE; |
| Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); |
| for (Entry<String,String> entry : services.getProps()) { |
| config.set(entry.getKey(), entry.getValue()); |
| } |
| if (info != null) { |
| for (Object key : info.keySet()) { |
| config.set((String) key, info.getProperty((String) key)); |
| } |
| } |
| for (Entry<String,String> entry : connInfo.asProps()) { |
| config.set(entry.getKey(), entry.getValue()); |
| } |
| |
| // Without making a copy of the configuration we cons up, we lose some of our properties |
| // on the server side during testing. |
| this.config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config); |
| |
| // set replication required parameter |
| ConfigUtil.setReplicationConfigIfAbsent(this.config); |
| this.props = new ReadOnlyProps(this.config.iterator()); |
| } |
| |
| private PMetaData newEmptyMetaData() { |
| long updateCacheFrequency = (Long) ConnectionProperty.UPDATE_CACHE_FREQUENCY.getValue( |
| getProps().get(QueryServices.DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB)); |
| // We cannot have zero update cache frequency for connectionless query services as we need to keep the |
| // metadata in the cache (i.e., in memory) |
| if (updateCacheFrequency == 0) { |
| updateCacheFrequency = Long.MAX_VALUE; |
| } |
| return new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, updateCacheFrequency, getProps()); |
| } |
| |
| protected String getSystemCatalogTableDDL() { |
| return setSystemDDLProperties(QueryConstants.CREATE_TABLE_METADATA); |
| } |
| |
| protected String getSystemSequenceTableDDL(int nSaltBuckets) { |
| String schema = String.format(setSystemDDLProperties(QueryConstants.CREATE_SEQUENCE_METADATA)); |
| return Sequence.getCreateTableStatement(schema, nSaltBuckets); |
| } |
| |
| protected String getFunctionTableDDL() { |
| return setSystemDDLProperties(QueryConstants.CREATE_FUNCTION_METADATA); |
| } |
| |
| protected String getLogTableDDL() { |
| return setSystemLogDDLProperties(QueryConstants.CREATE_LOG_METADATA); |
| } |
| |
| private String setSystemLogDDLProperties(String ddl) { |
| return String.format(ddl, props.getInt(LOG_SALT_BUCKETS_ATTRIB, QueryServicesOptions.DEFAULT_LOG_SALT_BUCKETS)); |
| |
| } |
| |
| protected String getChildLinkDDL() { |
| return setSystemDDLProperties(QueryConstants.CREATE_CHILD_LINK_METADATA); |
| } |
| |
| protected String getMutexDDL() { |
| return setSystemDDLProperties(QueryConstants.CREATE_MUTEX_METADATA); |
| } |
| |
| protected String getTaskDDL() { |
| return setSystemDDLProperties(QueryConstants.CREATE_TASK_METADATA); |
| } |
| |
| protected String getTransformDDL() { |
| return setSystemDDLProperties(QueryConstants.CREATE_TRANSFORM_METADATA); |
| } |
| |
| private String setSystemDDLProperties(String ddl) { |
| return String.format(ddl, |
| props.getInt(DEFAULT_SYSTEM_MAX_VERSIONS_ATTRIB, QueryServicesOptions.DEFAULT_SYSTEM_MAX_VERSIONS), |
| props.getBoolean(DEFAULT_SYSTEM_KEEP_DELETED_CELLS_ATTRIB, QueryServicesOptions.DEFAULT_SYSTEM_KEEP_DELETED_CELLS)); |
| } |
| |
| @Override |
| public ConnectionQueryServices getChildQueryServices(ImmutableBytesWritable childId) { |
| return this; // Just reuse the same query services |
| } |
| |
| @Override |
| public Table getTable(byte[] tableName) throws SQLException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public Table getTableIfExists(byte[] tableName) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** |
| * {@inheritDoc}. |
| */ |
| @Override |
| public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException { |
| return getTableRegions(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, |
| QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS); |
| } |
| |
| /** |
| * {@inheritDoc}. |
| */ |
| @Override |
| public List<HRegionLocation> getAllTableRegions(byte[] tableName, int queryTimeout) |
| throws SQLException { |
| return getTableRegions(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, |
| queryTimeout); |
| } |
| |
| /** |
| * {@inheritDoc}. |
| */ |
| @Override |
| public List<HRegionLocation> getTableRegions(byte[] tableName, byte[] startRowKey, |
| byte[] endRowKey) throws SQLException { |
| return getTableRegions(tableName, startRowKey, endRowKey, |
| QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS); |
| } |
| |
| /** |
| * {@inheritDoc}. |
| */ |
| @Override |
| public List<HRegionLocation> getTableRegions(byte[] tableName, byte[] startRowKey, |
| byte[] endRowKey, int queryTimeout) |
| throws SQLException { |
| List<HRegionLocation> regions = tableSplits.get(Bytes.toString(tableName)); |
| if (regions != null) { |
| return regions; |
| } |
| RegionInfo hri = |
| RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)) |
| .setStartKey(startRowKey) |
| .setStartKey(endRowKey) |
| .build(); |
| return Collections.singletonList(new HRegionLocation(hri, SERVER_NAME, -1)); |
| } |
| |
| @Override |
| public void addTable(PTable table, long resolvedTime) throws SQLException { |
| metaData.addTable(table, resolvedTime); |
| } |
| |
| @Override |
| public void updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException { |
| metaData.updateResolvedTimestamp(table, resolvedTimestamp); |
| } |
| |
| @Override |
| public void removeTable(PName tenantId, String tableName, String parentTableName, long tableTimeStamp) |
| throws SQLException { |
| metaData.removeTable(tenantId, tableName, parentTableName, tableTimeStamp); |
| } |
| |
| @Override |
| public void removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp, |
| long tableSeqNum, long resolvedTime) throws SQLException { |
| metaData.removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime); |
| } |
| |
| |
| @Override |
| public PhoenixConnection connect(String url, Properties info) throws SQLException { |
| return new PhoenixConnection(this, url, info); |
| } |
| |
| @Override |
| public MetaDataMutationResult getTable(PName tenantId, byte[] schemaBytes, byte[] tableBytes, long tableTimestamp, long clientTimestamp) throws SQLException { |
| // Return result that will cause client to use it's own metadata instead of needing |
| // to get anything from the server (since we don't have a connection) |
| try { |
| String fullTableName = SchemaUtil.getTableName(schemaBytes, tableBytes); |
| PTable table = metaData.getTableRef(new PTableKey(tenantId, fullTableName)).getTable(); |
| return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 0, table); |
| } catch (TableNotFoundException e) { |
| return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, 0, null); |
| } |
| } |
| |
| private static byte[] getTableName(List<Mutation> tableMetaData, byte[] physicalTableName) { |
| if (physicalTableName != null) { |
| return physicalTableName; |
| } |
| byte[][] rowKeyMetadata = new byte[3][]; |
| Mutation m = MetaDataUtil.getTableHeaderRow(tableMetaData); |
| byte[] key = m.getRow(); |
| SchemaUtil.getVarChars(key, rowKeyMetadata); |
| byte[] schemaBytes = rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX]; |
| byte[] tableBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]; |
| return SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes); |
| } |
| |
| private static List<HRegionLocation> generateRegionLocations(byte[] physicalName, byte[][] splits) { |
| byte[] startKey = HConstants.EMPTY_START_ROW; |
| List<HRegionLocation> regions = Lists.newArrayListWithExpectedSize(splits.length); |
| for (byte[] split : splits) { |
| regions.add(new HRegionLocation(RegionInfoBuilder |
| .newBuilder(TableName.valueOf(physicalName)).setStartKey(startKey) |
| .setEndKey(split).build(), SERVER_NAME, -1)); |
| startKey = split; |
| } |
| regions.add(new HRegionLocation(RegionInfoBuilder |
| .newBuilder(TableName.valueOf(physicalName)).setStartKey(startKey) |
| .setEndKey(HConstants.EMPTY_END_ROW).build(), SERVER_NAME, -1)); |
| return regions; |
| } |
| |
| @Override |
| public MetaDataMutationResult createTable(List<Mutation> tableMetaData, byte[] physicalName, PTableType tableType, |
| Map<String, Object> tableProps, List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, |
| boolean isNamespaceMapped, boolean allocateIndexId, boolean isDoNotUpgradePropSet, PTable parentTable) throws SQLException { |
| if (tableType == PTableType.INDEX && IndexUtil.isLocalIndexFamily(Bytes.toString(families.iterator().next().getFirst()))) { |
| Object dataTableName = tableProps.get(PhoenixDatabaseMetaData.DATA_TABLE_NAME); |
| List<HRegionLocation> regionLocations = tableSplits.get(dataTableName); |
| byte[] tableName = getTableName(tableMetaData, physicalName); |
| tableSplits.put(Bytes.toString(tableName), regionLocations); |
| } else if (splits != null) { |
| byte[] tableName = getTableName(tableMetaData, physicalName); |
| tableSplits.put(Bytes.toString(tableName), generateRegionLocations(tableName, splits)); |
| } |
| if (!allocateIndexId) { |
| return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, 0, null); |
| } else { |
| return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, 0, null, Long.MIN_VALUE, MetaDataUtil.getViewIndexIdDataType()); |
| } |
| } |
| |
| @Override |
| public MetaDataMutationResult dropTable(List<Mutation> tableMetadata, PTableType tableType, boolean cascade) throws SQLException { |
| byte[] tableName = getTableName(tableMetadata, null); |
| tableSplits.remove(Bytes.toString(tableName)); |
| return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 0, null); |
| } |
| |
| @Override |
| public MetaDataMutationResult addColumn(List<Mutation> tableMetaData, |
| PTable table, |
| PTable parentTable, |
| PTable transformingNewTable, |
| Map<String, List<Pair<String, Object>>> properties, |
| Set<String> colFamiliesForPColumnsToBeAdded, |
| List<PColumn> columnsToBeAdded) throws SQLException { |
| List<PColumn> columns = Lists.newArrayList(table.getColumns()); |
| columns.addAll(columnsToBeAdded); |
| return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 0, |
| PTableImpl.builderWithColumns(table, columns).build()); |
| } |
| |
| @Override |
| public MetaDataMutationResult dropColumn(List<Mutation> tableMetadata, |
| PTableType tableType, |
| PTable parentTable) throws SQLException { |
| return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 0, null); |
| } |
| |
| @Override |
| public void clearTableFromCache(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS) |
| throws SQLException {} |
| // TODO: share this with ConnectionQueryServicesImpl |
| @Override |
| public void init(String url, Properties props) throws SQLException { |
| if (initialized) { |
| if (initializationException != null) { |
| throw initializationException; |
| } |
| return; |
| } |
| synchronized (this) { |
| if (initialized) { |
| if (initializationException != null) { |
| throw initializationException; |
| } |
| return; |
| } |
| guidePostsCache = |
| (new GuidePostsCacheProvider()).getGuidePostsCache( |
| props.getProperty(GUIDE_POSTS_CACHE_FACTORY_CLASS, |
| QueryServicesOptions.DEFAULT_GUIDE_POSTS_CACHE_FACTORY_CLASS), |
| null, config); |
| SQLException sqlE = null; |
| PhoenixConnection metaConnection = null; |
| try { |
| Properties scnProps = PropertiesUtil.deepCopy(props); |
| scnProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP)); |
| scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB); |
| String globalUrl = JDBCUtil.removeProperty(url, PhoenixRuntime.TENANT_ID_ATTRIB); |
| metaConnection = new PhoenixConnection(this, globalUrl, scnProps); |
| metaConnection.setRunningUpgrade(true); |
| try { |
| metaConnection.createStatement().executeUpdate(getSystemCatalogTableDDL()); |
| } catch (TableAlreadyExistsException ignore) { |
| // Ignore, as this will happen if the SYSTEM.TABLE already exists at this fixed timestamp. |
| // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp. |
| } |
| try { |
| int nSaltBuckets = getSequenceSaltBuckets(); |
| String createTableStatement = getSystemSequenceTableDDL(nSaltBuckets); |
| metaConnection.createStatement().executeUpdate(createTableStatement); |
| } catch (NewerTableAlreadyExistsException ignore) { |
| // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed timestamp. |
| // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp. |
| } |
| try { |
| metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_STATS_TABLE_METADATA); |
| } catch (NewerTableAlreadyExistsException ignore) { |
| // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed |
| // timestamp. |
| // A TableAlreadyExistsException is not thrown, since the table only exists *after* this |
| // fixed timestamp. |
| } |
| |
| try { |
| metaConnection.createStatement().executeUpdate(getFunctionTableDDL()); |
| } catch (NewerTableAlreadyExistsException ignore) { |
| } |
| try { |
| metaConnection.createStatement().executeUpdate(getLogTableDDL()); |
| } catch (NewerTableAlreadyExistsException ignore) {} |
| try { |
| metaConnection.createStatement() |
| .executeUpdate(getChildLinkDDL()); |
| } catch (NewerTableAlreadyExistsException ignore) { |
| } |
| try { |
| metaConnection.createStatement() |
| .executeUpdate(getMutexDDL()); |
| } catch (NewerTableAlreadyExistsException ignore) { |
| } |
| try { |
| metaConnection.createStatement() |
| .executeUpdate(getTaskDDL()); |
| } catch (NewerTableAlreadyExistsException ignore) { |
| } |
| try { |
| metaConnection.createStatement() |
| .executeUpdate(getTransformDDL()); |
| } catch (NewerTableAlreadyExistsException ignore) { |
| } |
| } catch (SQLException e) { |
| sqlE = e; |
| } finally { |
| try { |
| if (metaConnection != null) metaConnection.close(); |
| } catch (SQLException e) { |
| if (sqlE != null) { |
| sqlE.setNextException(e); |
| } else { |
| sqlE = e; |
| } |
| } finally { |
| try { |
| if (sqlE != null) { |
| initializationException = sqlE; |
| throw sqlE; |
| } |
| } finally { |
| initialized = true; |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| public MutationState updateData(MutationPlan plan) throws SQLException { |
| return new MutationState(0, 0, plan.getContext().getConnection()); |
| } |
| |
| @Override |
| public int getLowestClusterHBaseVersion() { |
| return Integer.MAX_VALUE; // Allow everything for connectionless |
| } |
| |
| @Override |
| public Admin getAdmin() throws SQLException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public MetaDataMutationResult updateIndexState(List<Mutation> tableMetadata, String parentTableName) throws SQLException { |
| byte[][] rowKeyMetadata = new byte[3][]; |
| SchemaUtil.getVarChars(tableMetadata.get(0).getRow(), rowKeyMetadata); |
| Mutation m = MetaDataUtil.getTableHeaderRow(tableMetadata); |
| ImmutableBytesWritable ptr = new ImmutableBytesWritable(); |
| if (!MetaDataUtil.getMutationValue(m, INDEX_STATE_BYTES, kvBuilder, ptr)) { |
| throw new IllegalStateException(); |
| } |
| PIndexState newState = PIndexState.fromSerializedValue(ptr.get()[ptr.getOffset()]); |
| byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; |
| String schemaName = Bytes.toString(rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX]); |
| String indexName = Bytes.toString(rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]); |
| String indexTableName = SchemaUtil.getTableName(schemaName, indexName); |
| PName tenantId = tenantIdBytes.length == 0 ? null : PNameFactory.newName(tenantIdBytes); |
| PTable index = metaData.getTableRef(new PTableKey(tenantId, indexTableName)).getTable(); |
| index = PTableImpl.builderWithColumns(index, getColumnsToClone(index)) |
| .setState(newState == PIndexState.USABLE ? PIndexState.ACTIVE : |
| newState == PIndexState.UNUSABLE ? PIndexState.INACTIVE : newState) |
| .build(); |
| return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 0, index); |
| } |
| |
| @Override |
| public MetaDataMutationResult updateIndexState(List<Mutation> tableMetadata, |
| String parentTableName, Map<String, List<Pair<String, Object>>> stmtProperties, |
| PTable table) throws SQLException { |
| return updateIndexState(tableMetadata,parentTableName); |
| } |
| |
| @Override |
| public TableDescriptor getTableDescriptor(byte[] tableName) throws SQLException { |
| return null; |
| } |
| |
| @Override |
| public void clearTableRegionCache(TableName tableName) throws SQLException { |
| } |
| |
| @Override |
| public boolean hasIndexWALCodec() { |
| return true; |
| } |
| |
| @Override |
| public long createSequence(String tenantId, String schemaName, String sequenceName, |
| long startWith, long incrementBy, long cacheSize, long minValue, long maxValue, |
| boolean cycle, long timestamp) throws SQLException { |
| SequenceKey key = new SequenceKey(tenantId, schemaName, sequenceName, getSequenceSaltBuckets()); |
| if (sequenceMap.get(key) != null) { |
| throw new SequenceAlreadyExistsException(schemaName, sequenceName); |
| } |
| sequenceMap.put(key, new SequenceInfo(startWith, incrementBy, minValue, maxValue, 1l, cycle)) ; |
| return timestamp; |
| } |
| |
| @Override |
| public long dropSequence(String tenantId, String schemaName, String sequenceName, long timestamp) throws SQLException { |
| SequenceKey key = new SequenceKey(tenantId, schemaName, sequenceName, getSequenceSaltBuckets()); |
| if (sequenceMap.remove(key) == null) { |
| throw new SequenceNotFoundException(schemaName, sequenceName); |
| } |
| return timestamp; |
| } |
| |
| @Override |
| public void validateSequences(List<SequenceAllocation> sequenceAllocations, long timestamp, |
| long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException { |
| int i = 0; |
| for (SequenceAllocation sequenceAllocation : sequenceAllocations) { |
| SequenceInfo info = sequenceMap.get(sequenceAllocation.getSequenceKey()); |
| if (info == null) { |
| exceptions[i] = new SequenceNotFoundException(sequenceAllocation.getSequenceKey().getSchemaName(), sequenceAllocation.getSequenceKey().getSequenceName()); |
| } else { |
| values[i] = info.sequenceValue; |
| } |
| i++; |
| } |
| } |
| |
| @Override |
| public void incrementSequences(List<SequenceAllocation> sequenceAllocations, long timestamp, |
| long[] values, SQLException[] exceptions) throws SQLException { |
| int i = 0; |
| for (SequenceAllocation sequenceAllocation : sequenceAllocations) { |
| SequenceKey key = sequenceAllocation.getSequenceKey(); |
| SequenceInfo info = sequenceMap.get(key); |
| if (info == null) { |
| exceptions[i] = new SequenceNotFoundException( |
| key.getSchemaName(), key.getSequenceName()); |
| } else { |
| boolean increaseSeq = info.incrementBy > 0; |
| if (info.limitReached) { |
| SQLExceptionCode code = increaseSeq ? SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE |
| : SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE; |
| exceptions[i] = new SQLExceptionInfo.Builder(code).build().buildException(); |
| } else { |
| values[i] = info.sequenceValue; |
| info.sequenceValue += info.incrementBy * info.cacheSize; |
| info.limitReached = SequenceUtil.checkIfLimitReached(info); |
| if (info.limitReached && info.cycle) { |
| info.sequenceValue = increaseSeq ? info.minValue : info.maxValue; |
| info.limitReached = false; |
| } |
| } |
| } |
| i++; |
| } |
| i = 0; |
| for (SQLException e : exceptions) { |
| if (e != null) { |
| sequenceMap.remove(sequenceAllocations.get(i).getSequenceKey()); |
| } |
| i++; |
| } |
| } |
| |
| @Override |
| public long currentSequenceValue(SequenceKey sequenceKey, long timestamp) throws SQLException { |
| SequenceInfo info = sequenceMap.get(sequenceKey); |
| if (info == null) { |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CALL_CURRENT_BEFORE_NEXT_VALUE) |
| .setSchemaName(sequenceKey.getSchemaName()).setTableName(sequenceKey.getSequenceName()) |
| .build().buildException(); |
| } |
| return info.sequenceValue; |
| } |
| |
| @Override |
| public void returnSequences(List<SequenceKey> sequenceKeys, long timestamp, SQLException[] exceptions) |
| throws SQLException { |
| } |
| |
| @Override |
| public void addConnection(PhoenixConnection connection) throws SQLException { |
| } |
| |
| @Override |
| public void removeConnection(PhoenixConnection connection) throws SQLException { |
| } |
| |
| @Override |
| public KeyValueBuilder getKeyValueBuilder() { |
| return this.kvBuilder; |
| } |
| |
| @Override |
| public boolean supportsFeature(Feature feature) { |
| return true; |
| } |
| |
| @Override |
| public String getUserName() { |
| return userName; |
| } |
| |
| @Override |
| public GuidePostsInfo getTableStats(GuidePostsKey key) { |
| GuidePostsInfo info = null; |
| try { |
| info = guidePostsCache.get(key); |
| } catch(ExecutionException e){ |
| return GuidePostsInfo.NO_GUIDEPOST; |
| } |
| if (null == info) { |
| return GuidePostsInfo.NO_GUIDEPOST; |
| } |
| return info; |
| } |
| |
| @Override |
| public long clearCache() throws SQLException { |
| return 0; |
| } |
| |
| @Override |
| public int getSequenceSaltBuckets() { |
| return getProps().getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB, |
| QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS); |
| } |
| |
| public MetaDataMutationResult createFunction(List<Mutation> functionData, PFunction function, boolean temporary) |
| throws SQLException { |
| return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, 0l, null); |
| } |
| |
| @Override |
| public void addFunction(PFunction function) throws SQLException { |
| this.metaData.addFunction(function); |
| } |
| |
| @Override |
| public void removeFunction(PName tenantId, String function, long functionTimeStamp) |
| throws SQLException { |
| this.metaData.removeFunction(tenantId, function, functionTimeStamp); |
| } |
| |
| @Override |
| public MetaDataMutationResult getFunctions(PName tenantId, |
| List<Pair<byte[], Long>> functionNameAndTimeStampPairs, long clientTimestamp) |
| throws SQLException { |
| List<PFunction> functions = new ArrayList<PFunction>(functionNameAndTimeStampPairs.size()); |
| for(Pair<byte[], Long> functionInfo: functionNameAndTimeStampPairs) { |
| try { |
| PFunction function2 = metaData.getFunction(new PTableKey(tenantId, Bytes.toString(functionInfo.getFirst()))); |
| functions.add(function2); |
| } catch (FunctionNotFoundException e) { |
| return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, 0, null); |
| } |
| } |
| if(functions.isEmpty()) { |
| return null; |
| } |
| return new MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS, 0, functions, true); |
| } |
| |
| @Override |
| public MetaDataMutationResult dropFunction(List<Mutation> tableMetadata, boolean ifExists) |
| throws SQLException { |
| return new MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS, 0, null); |
| } |
| |
| @Override |
| public long getRenewLeaseThresholdMilliSeconds() { |
| return 0; |
| } |
| |
| @Override |
| public boolean isRenewingLeasesEnabled() { |
| return false; |
| } |
| |
| public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] row) throws SQLException { |
| List<HRegionLocation> regions = tableSplits.get(Bytes.toString(tableName)); |
| if (regions != null) { |
| for (HRegionLocation region : regions) { |
| if (Bytes.compareTo(region.getRegion().getStartKey(), row) <= 0 |
| && Bytes.compareTo(region.getRegion().getEndKey(), row) > 0) { |
| return region; |
| } |
| } |
| } |
| return new HRegionLocation(RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)) |
| .setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.EMPTY_END_ROW) |
| .build(), SERVER_NAME, -1); |
| } |
| |
| @Override |
| public MetaDataMutationResult createSchema(List<Mutation> schemaMutations, String schemaName) { |
| return new MetaDataMutationResult(MutationCode.SCHEMA_NOT_FOUND, 0l, null); |
| } |
| |
| @Override |
| public void addSchema(PSchema schema) throws SQLException { |
| this.metaData.addSchema(schema); |
| } |
| |
| @Override |
| public MetaDataMutationResult getSchema(String schemaName, long clientTimestamp) throws SQLException { |
| return new MetaDataMutationResult(MutationCode.SCHEMA_NOT_FOUND, 0, null); |
| } |
| |
| @Override |
| public void removeSchema(PSchema schema, long schemaTimeStamp) { |
| metaData.removeSchema(schema, schemaTimeStamp); |
| } |
| |
| @Override |
| public MetaDataMutationResult dropSchema(List<Mutation> schemaMetaData, String schemaName) { |
| return new MetaDataMutationResult(MutationCode.SCHEMA_ALREADY_EXISTS, 0, null); |
| } |
| |
| /** |
| * Manually adds {@link GuidePostsInfo} for a table to the client-side cache. Not a |
| * {@link ConnectionQueryServices} method. Exposed for testing purposes. |
| * |
| * @param key |
| * @param info |
| */ |
| public void addTableStats(GuidePostsKey key, GuidePostsInfo info) { |
| this.guidePostsCache.put(Objects.requireNonNull(key), info); |
| } |
| |
| @Override |
| public void invalidateStats(GuidePostsKey key) { |
| this.guidePostsCache.invalidate(Objects.requireNonNull(key)); |
| } |
| |
| @Override |
| public void upgradeSystemTables(String url, Properties props) throws SQLException {} |
| |
| @Override |
| public boolean isUpgradeRequired() { |
| return false; |
| } |
| |
| @Override |
| public void clearUpgradeRequired() {} |
| |
| @Override |
| public Configuration getConfiguration() { |
| return config; |
| } |
| |
| @Override |
| public User getUser() { |
| return user; |
| } |
| |
| @Override |
| public QueryLoggerDisruptor getQueryDisruptor() { |
| return null; |
| } |
| |
| @Override |
| public PhoenixTransactionClient initTransactionClient(Provider provider) { |
| return null; // Client is not necessary |
| } |
| |
| @Override |
| public boolean writeMutexCell(String tenantId, String schemaName, String tableName, |
| String columnName, String familyName) throws SQLException { |
| return true; |
| } |
| |
| @Override |
| public void deleteMutexCell(String tenantId, String schemaName, String tableName, |
| String columnName, String familyName) throws SQLException { |
| } |
| @Override |
| public PMetaData getMetaDataCache() { |
| return metaData; |
| } |
| |
| @Override |
| public int getConnectionCount(boolean isInternal) { |
| return 0; |
| } |
| } |