blob: e21ef04c5c719bf7410cb42b27e66ceee5f3906f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.coprocessor;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.hadoop.hbase.KeyValueUtil.createFirstOnRow;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME_INDEX;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FAMILY_NAME_INDEX;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME_INDEX;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID_INDEX;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
import static org.apache.phoenix.query.QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT;
import static org.apache.phoenix.query.QueryConstants.SEPARATOR_BYTE_ARRAY;
import static org.apache.phoenix.schema.PTableType.INDEX;
import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
import static org.apache.phoenix.util.SchemaUtil.getVarChars;
import java.io.IOException;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Region.RowLock;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.GlobalCache.FunctionBytesPtr;
import org.apache.phoenix.compile.ColumnResolver;
import org.apache.phoenix.compile.FromCompiler;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.compile.WhereCompiler;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateFunctionRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateSchemaRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateTableRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropColumnRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropFunctionRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropSchemaRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropTableRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetFunctionsRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetSchemaRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetTableRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.KeyValueColumnExpression;
import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.expression.ProjectedColumnExpression;
import org.apache.phoenix.expression.RowKeyColumnExpression;
import org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.metrics.Metrics;
import org.apache.phoenix.parse.LiteralParseNode;
import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.parse.PFunction.FunctionArgument;
import org.apache.phoenix.parse.PSchema;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.SQLParser;
import org.apache.phoenix.protobuf.ProtobufUtil;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.ColumnRef;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnFamily;
import org.apache.phoenix.schema.PColumnImpl;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PMetaDataEntity;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.EncodedCQCounter;
import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTable.LinkType;
import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
import org.apache.phoenix.schema.PTable.ViewType;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.SequenceAllocation;
import org.apache.phoenix.schema.SequenceAlreadyExistsException;
import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.SequenceNotFoundException;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableProperty;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.types.PBinary;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.types.PSmallint;
import org.apache.phoenix.schema.types.PTinyint;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.KeyValueUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.UpgradeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.cache.Cache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
/**
*
* Endpoint co-processor through which all Phoenix metadata mutations flow.
* We only allow mutations to the latest version of a Phoenix table (i.e. the
* timeStamp must be increasing).
* For adding/dropping columns use a sequence number on the table to ensure that
* the client has the latest version.
* The timeStamp on the table correlates with the timeStamp on the data row.
* TODO: we should enforce that a metadata mutation uses a timeStamp bigger than
* any in use on the data table, b/c otherwise we can end up with data rows that
* are not valid against a schema row.
*
*
* @since 0.1
*/
@SuppressWarnings("deprecation")
public class MetaDataEndpointImpl extends MetaDataProtocol implements CoprocessorService, Coprocessor {
private static final Logger logger = LoggerFactory.getLogger(MetaDataEndpointImpl.class);
// Column to track tables that have been upgraded based on PHOENIX-2067
public static final String ROW_KEY_ORDER_OPTIMIZABLE = "ROW_KEY_ORDER_OPTIMIZABLE";
public static final byte[] ROW_KEY_ORDER_OPTIMIZABLE_BYTES = Bytes.toBytes(ROW_KEY_ORDER_OPTIMIZABLE);
// KeyValues for Table
private static final KeyValue TABLE_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES);
private static final KeyValue TABLE_SEQ_NUM_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
private static final KeyValue COLUMN_COUNT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_COUNT_BYTES);
private static final KeyValue SALT_BUCKETS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, SALT_BUCKETS_BYTES);
private static final KeyValue PK_NAME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PK_NAME_BYTES);
private static final KeyValue DATA_TABLE_NAME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
private static final KeyValue INDEX_STATE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
private static final KeyValue IMMUTABLE_ROWS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IMMUTABLE_ROWS_BYTES);
private static final KeyValue VIEW_EXPRESSION_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_STATEMENT_BYTES);
private static final KeyValue DEFAULT_COLUMN_FAMILY_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DEFAULT_COLUMN_FAMILY_NAME_BYTES);
private static final KeyValue DISABLE_WAL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DISABLE_WAL_BYTES);
private static final KeyValue MULTI_TENANT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MULTI_TENANT_BYTES);
private static final KeyValue VIEW_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_TYPE_BYTES);
private static final KeyValue VIEW_INDEX_ID_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_INDEX_ID_BYTES);
private static final KeyValue INDEX_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_TYPE_BYTES);
private static final KeyValue INDEX_DISABLE_TIMESTAMP_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
private static final KeyValue STORE_NULLS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STORE_NULLS_BYTES);
private static final KeyValue EMPTY_KEYVALUE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES);
private static final KeyValue BASE_COLUMN_COUNT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES);
private static final KeyValue ROW_KEY_ORDER_OPTIMIZABLE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES);
private static final KeyValue TRANSACTIONAL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TRANSACTIONAL_BYTES);
private static final KeyValue UPDATE_CACHE_FREQUENCY_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, UPDATE_CACHE_FREQUENCY_BYTES);
private static final KeyValue IS_NAMESPACE_MAPPED_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
TABLE_FAMILY_BYTES, IS_NAMESPACE_MAPPED_BYTES);
private static final KeyValue AUTO_PARTITION_SEQ_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, AUTO_PARTITION_SEQ_BYTES);
private static final KeyValue APPEND_ONLY_SCHEMA_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, APPEND_ONLY_SCHEMA_BYTES);
private static final KeyValue STORAGE_SCHEME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STORAGE_SCHEME_BYTES);
private static final KeyValue ENCODING_SCHEME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ENCODING_SCHEME_BYTES);
private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList(
EMPTY_KEYVALUE_KV,
TABLE_TYPE_KV,
TABLE_SEQ_NUM_KV,
COLUMN_COUNT_KV,
SALT_BUCKETS_KV,
PK_NAME_KV,
DATA_TABLE_NAME_KV,
INDEX_STATE_KV,
IMMUTABLE_ROWS_KV,
VIEW_EXPRESSION_KV,
DEFAULT_COLUMN_FAMILY_KV,
DISABLE_WAL_KV,
MULTI_TENANT_KV,
VIEW_TYPE_KV,
VIEW_INDEX_ID_KV,
INDEX_TYPE_KV,
INDEX_DISABLE_TIMESTAMP_KV,
STORE_NULLS_KV,
BASE_COLUMN_COUNT_KV,
ROW_KEY_ORDER_OPTIMIZABLE_KV,
TRANSACTIONAL_KV,
UPDATE_CACHE_FREQUENCY_KV,
IS_NAMESPACE_MAPPED_KV,
AUTO_PARTITION_SEQ_KV,
APPEND_ONLY_SCHEMA_KV,
STORAGE_SCHEME_KV,
ENCODING_SCHEME_KV
);
static {
Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR);
}
private static final int TABLE_TYPE_INDEX = TABLE_KV_COLUMNS.indexOf(TABLE_TYPE_KV);
private static final int TABLE_SEQ_NUM_INDEX = TABLE_KV_COLUMNS.indexOf(TABLE_SEQ_NUM_KV);
private static final int COLUMN_COUNT_INDEX = TABLE_KV_COLUMNS.indexOf(COLUMN_COUNT_KV);
private static final int SALT_BUCKETS_INDEX = TABLE_KV_COLUMNS.indexOf(SALT_BUCKETS_KV);
private static final int PK_NAME_INDEX = TABLE_KV_COLUMNS.indexOf(PK_NAME_KV);
private static final int DATA_TABLE_NAME_INDEX = TABLE_KV_COLUMNS.indexOf(DATA_TABLE_NAME_KV);
private static final int INDEX_STATE_INDEX = TABLE_KV_COLUMNS.indexOf(INDEX_STATE_KV);
private static final int IMMUTABLE_ROWS_INDEX = TABLE_KV_COLUMNS.indexOf(IMMUTABLE_ROWS_KV);
private static final int VIEW_STATEMENT_INDEX = TABLE_KV_COLUMNS.indexOf(VIEW_EXPRESSION_KV);
private static final int DEFAULT_COLUMN_FAMILY_INDEX = TABLE_KV_COLUMNS.indexOf(DEFAULT_COLUMN_FAMILY_KV);
private static final int DISABLE_WAL_INDEX = TABLE_KV_COLUMNS.indexOf(DISABLE_WAL_KV);
private static final int MULTI_TENANT_INDEX = TABLE_KV_COLUMNS.indexOf(MULTI_TENANT_KV);
private static final int VIEW_TYPE_INDEX = TABLE_KV_COLUMNS.indexOf(VIEW_TYPE_KV);
private static final int VIEW_INDEX_ID_INDEX = TABLE_KV_COLUMNS.indexOf(VIEW_INDEX_ID_KV);
private static final int INDEX_TYPE_INDEX = TABLE_KV_COLUMNS.indexOf(INDEX_TYPE_KV);
private static final int STORE_NULLS_INDEX = TABLE_KV_COLUMNS.indexOf(STORE_NULLS_KV);
private static final int BASE_COLUMN_COUNT_INDEX = TABLE_KV_COLUMNS.indexOf(BASE_COLUMN_COUNT_KV);
private static final int ROW_KEY_ORDER_OPTIMIZABLE_INDEX = TABLE_KV_COLUMNS.indexOf(ROW_KEY_ORDER_OPTIMIZABLE_KV);
private static final int TRANSACTIONAL_INDEX = TABLE_KV_COLUMNS.indexOf(TRANSACTIONAL_KV);
private static final int UPDATE_CACHE_FREQUENCY_INDEX = TABLE_KV_COLUMNS.indexOf(UPDATE_CACHE_FREQUENCY_KV);
private static final int INDEX_DISABLE_TIMESTAMP = TABLE_KV_COLUMNS.indexOf(INDEX_DISABLE_TIMESTAMP_KV);
private static final int IS_NAMESPACE_MAPPED_INDEX = TABLE_KV_COLUMNS.indexOf(IS_NAMESPACE_MAPPED_KV);
private static final int AUTO_PARTITION_SEQ_INDEX = TABLE_KV_COLUMNS.indexOf(AUTO_PARTITION_SEQ_KV);
private static final int APPEND_ONLY_SCHEMA_INDEX = TABLE_KV_COLUMNS.indexOf(APPEND_ONLY_SCHEMA_KV);
private static final int STORAGE_SCHEME_INDEX = TABLE_KV_COLUMNS.indexOf(STORAGE_SCHEME_KV);
private static final int QUALIFIER_ENCODING_SCHEME_INDEX = TABLE_KV_COLUMNS.indexOf(ENCODING_SCHEME_KV);
// KeyValues for Column
private static final KeyValue DECIMAL_DIGITS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
private static final KeyValue COLUMN_SIZE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_SIZE_BYTES);
private static final KeyValue NULLABLE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, NULLABLE_BYTES);
private static final KeyValue DATA_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DATA_TYPE_BYTES);
private static final KeyValue ORDINAL_POSITION_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ORDINAL_POSITION_BYTES);
private static final KeyValue SORT_ORDER_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, SORT_ORDER_BYTES);
private static final KeyValue ARRAY_SIZE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ARRAY_SIZE_BYTES);
private static final KeyValue VIEW_CONSTANT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_CONSTANT_BYTES);
private static final KeyValue IS_VIEW_REFERENCED_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_VIEW_REFERENCED_BYTES);
private static final KeyValue COLUMN_DEF_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_DEF_BYTES);
private static final KeyValue IS_ROW_TIMESTAMP_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_ROW_TIMESTAMP_BYTES);
private static final KeyValue COLUMN_QUALIFIER_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES);
private static final List<KeyValue> COLUMN_KV_COLUMNS = Arrays.<KeyValue>asList(
DECIMAL_DIGITS_KV,
COLUMN_SIZE_KV,
NULLABLE_KV,
DATA_TYPE_KV,
ORDINAL_POSITION_KV,
SORT_ORDER_KV,
DATA_TABLE_NAME_KV, // included in both column and table row for metadata APIs
ARRAY_SIZE_KV,
VIEW_CONSTANT_KV,
IS_VIEW_REFERENCED_KV,
COLUMN_DEF_KV,
IS_ROW_TIMESTAMP_KV,
COLUMN_QUALIFIER_KV
);
static {
Collections.sort(COLUMN_KV_COLUMNS, KeyValue.COMPARATOR);
}
private static final KeyValue QUALIFIER_COUNTER_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_QUALIFIER_COUNTER_BYTES);
private static final int DECIMAL_DIGITS_INDEX = COLUMN_KV_COLUMNS.indexOf(DECIMAL_DIGITS_KV);
private static final int COLUMN_SIZE_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_SIZE_KV);
private static final int NULLABLE_INDEX = COLUMN_KV_COLUMNS.indexOf(NULLABLE_KV);
private static final int DATA_TYPE_INDEX = COLUMN_KV_COLUMNS.indexOf(DATA_TYPE_KV);
private static final int ORDINAL_POSITION_INDEX = COLUMN_KV_COLUMNS.indexOf(ORDINAL_POSITION_KV);
private static final int SORT_ORDER_INDEX = COLUMN_KV_COLUMNS.indexOf(SORT_ORDER_KV);
private static final int ARRAY_SIZE_INDEX = COLUMN_KV_COLUMNS.indexOf(ARRAY_SIZE_KV);
private static final int VIEW_CONSTANT_INDEX = COLUMN_KV_COLUMNS.indexOf(VIEW_CONSTANT_KV);
private static final int IS_VIEW_REFERENCED_INDEX = COLUMN_KV_COLUMNS.indexOf(IS_VIEW_REFERENCED_KV);
private static final int COLUMN_DEF_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_DEF_KV);
private static final int IS_ROW_TIMESTAMP_INDEX = COLUMN_KV_COLUMNS.indexOf(IS_ROW_TIMESTAMP_KV);
private static final int COLUMN_QUALIFIER_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_QUALIFIER_KV);
private static final int LINK_TYPE_INDEX = 0;
private static final KeyValue CLASS_NAME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, CLASS_NAME_BYTES);
private static final KeyValue JAR_PATH_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, JAR_PATH_BYTES);
private static final KeyValue RETURN_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, RETURN_TYPE_BYTES);
private static final KeyValue NUM_ARGS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, NUM_ARGS_BYTES);
private static final KeyValue TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TYPE_BYTES);
private static final KeyValue IS_CONSTANT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_CONSTANT_BYTES);
private static final KeyValue DEFAULT_VALUE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DEFAULT_VALUE_BYTES);
private static final KeyValue MIN_VALUE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MIN_VALUE_BYTES);
private static final KeyValue MAX_VALUE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MAX_VALUE_BYTES);
private static final KeyValue IS_ARRAY_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_ARRAY_BYTES);
private static final List<KeyValue> FUNCTION_KV_COLUMNS = Arrays.<KeyValue>asList(
EMPTY_KEYVALUE_KV,
CLASS_NAME_KV,
JAR_PATH_KV,
RETURN_TYPE_KV,
NUM_ARGS_KV
);
static {
Collections.sort(FUNCTION_KV_COLUMNS, KeyValue.COMPARATOR);
}
private static final int CLASS_NAME_INDEX = FUNCTION_KV_COLUMNS.indexOf(CLASS_NAME_KV);
private static final int JAR_PATH_INDEX = FUNCTION_KV_COLUMNS.indexOf(JAR_PATH_KV);
private static final int RETURN_TYPE_INDEX = FUNCTION_KV_COLUMNS.indexOf(RETURN_TYPE_KV);
private static final int NUM_ARGS_INDEX = FUNCTION_KV_COLUMNS.indexOf(NUM_ARGS_KV);
private static final List<KeyValue> FUNCTION_ARG_KV_COLUMNS = Arrays.<KeyValue>asList(
TYPE_KV,
IS_ARRAY_KV,
IS_CONSTANT_KV,
DEFAULT_VALUE_KV,
MIN_VALUE_KV,
MAX_VALUE_KV
);
static {
Collections.sort(FUNCTION_ARG_KV_COLUMNS, KeyValue.COMPARATOR);
}
private static final int IS_ARRAY_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(IS_ARRAY_KV);
private static final int IS_CONSTANT_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(IS_CONSTANT_KV);
private static final int DEFAULT_VALUE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(DEFAULT_VALUE_KV);
private static final int MIN_VALUE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(MIN_VALUE_KV);
private static final int MAX_VALUE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(MAX_VALUE_KV);
private static PName newPName(byte[] keyBuffer, int keyOffset, int keyLength) {
if (keyLength <= 0) {
return null;
}
int length = getVarCharLength(keyBuffer, keyOffset, keyLength);
return PNameFactory.newName(keyBuffer, keyOffset, length);
}
private RegionCoprocessorEnvironment env;
/**
* Stores a reference to the coprocessor environment provided by the
* {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
* coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded
* on a table region, so always expects this to be an instance of
* {@link RegionCoprocessorEnvironment}.
* @param env the environment provided by the coprocessor host
* @throws IOException if the provided environment is not an instance of
* {@code RegionCoprocessorEnvironment}
*/
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment) env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
logger.info("Starting Tracing-Metrics Systems");
// Start the phoenix trace collection
Tracing.addTraceMetricsSource();
Metrics.ensureConfigured();
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
// nothing to do
}
@Override
public Service getService() {
return this;
}
@Override
public void getTable(RpcController controller, GetTableRequest request,
RpcCallback<MetaDataResponse> done) {
MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
byte[] tenantId = request.getTenantId().toByteArray();
byte[] schemaName = request.getSchemaName().toByteArray();
byte[] tableName = request.getTableName().toByteArray();
byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
long tableTimeStamp = request.getTableTimestamp();
try {
// TODO: check that key is within region.getStartKey() and region.getEndKey()
// and return special code to force client to lookup region from meta.
Region region = env.getRegion();
MetaDataMutationResult result = checkTableKeyInRegion(key, region);
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
return;
}
long currentTime = EnvironmentEdgeManager.currentTimeMillis();
PTable table = doGetTable(key, request.getClientTimestamp());
if (table == null) {
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
builder.setMutationTime(currentTime);
done.run(builder.build());
return;
}
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
long disableIndexTimestamp = table.getIndexDisableTimestamp();
long minNonZerodisableIndexTimestamp = disableIndexTimestamp > 0 ? disableIndexTimestamp : Long.MAX_VALUE;
for (PTable index : table.getIndexes()) {
disableIndexTimestamp = index.getIndexDisableTimestamp();
if (disableIndexTimestamp > 0 && index.getIndexState() == PIndexState.ACTIVE && disableIndexTimestamp < minNonZerodisableIndexTimestamp) {
minNonZerodisableIndexTimestamp = disableIndexTimestamp;
}
}
// Freeze time for table at min non-zero value of INDEX_DISABLE_TIMESTAMP
// This will keep the table consistent with index as the table has had one more
// batch applied to it.
if (minNonZerodisableIndexTimestamp == Long.MAX_VALUE) {
builder.setMutationTime(currentTime);
} else {
// Subtract one because we add one due to timestamp granularity in Windows
builder.setMutationTime(minNonZerodisableIndexTimestamp - 1);
}
if (table.getTimeStamp() != tableTimeStamp) {
builder.setTable(PTableImpl.toProto(table));
}
done.run(builder.build());
return;
} catch (Throwable t) {
logger.error("getTable failed", t);
ProtobufUtil.setControllerException(controller,
ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
}
}
private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, Region region,
long clientTimeStamp) throws IOException, SQLException {
Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp);
RegionScanner scanner = region.getScanner(scan);
Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
try {
PTable oldTable = (PTable)metaDataCache.getIfPresent(cacheKey);
long tableTimeStamp = oldTable == null ? MIN_TABLE_TIMESTAMP-1 : oldTable.getTimeStamp();
PTable newTable;
boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
newTable = getTable(scanner, clientTimeStamp, tableTimeStamp);
if (newTable == null) {
return null;
}
if (oldTable == null || tableTimeStamp < newTable.getTimeStamp()
|| (blockWriteRebuildIndex && newTable.getIndexDisableTimestamp() > 0)) {
if (logger.isDebugEnabled()) {
logger.debug("Caching table "
+ Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(),
cacheKey.getLength()) + " at seqNum "
+ newTable.getSequenceNumber() + " with newer timestamp "
+ newTable.getTimeStamp() + " versus " + tableTimeStamp);
}
metaDataCache.put(cacheKey, newTable);
}
return newTable;
} finally {
scanner.close();
}
}
private List<PFunction> buildFunctions(List<byte[]> keys, Region region,
long clientTimeStamp, boolean isReplace, List<Mutation> deleteMutationsForReplace) throws IOException, SQLException {
List<KeyRange> keyRanges = Lists.newArrayListWithExpectedSize(keys.size());
for (byte[] key : keys) {
byte[] stopKey = ByteUtil.concat(key, QueryConstants.SEPARATOR_BYTE_ARRAY);
ByteUtil.nextKey(stopKey, stopKey.length);
keyRanges.add(PVarbinary.INSTANCE.getKeyRange(key, true, stopKey, false));
}
Scan scan = new Scan();
scan.setTimeRange(MIN_TABLE_TIMESTAMP, clientTimeStamp);
ScanRanges scanRanges = ScanRanges.createPointLookup(keyRanges);
scanRanges.initializeScan(scan);
scan.setFilter(scanRanges.getSkipScanFilter());
RegionScanner scanner = region.getScanner(scan);
Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
List<PFunction> functions = new ArrayList<PFunction>();
PFunction function = null;
try {
for(int i = 0; i< keys.size(); i++) {
function = null;
function =
getFunction(scanner, isReplace, clientTimeStamp, deleteMutationsForReplace);
if (function == null) {
return null;
}
byte[] functionKey =
SchemaUtil.getFunctionKey(
function.getTenantId() == null ? ByteUtil.EMPTY_BYTE_ARRAY : function
.getTenantId().getBytes(), Bytes.toBytes(function
.getFunctionName()));
metaDataCache.put(new FunctionBytesPtr(functionKey), function);
functions.add(function);
}
return functions;
} finally {
scanner.close();
}
}
private List<PSchema> buildSchemas(List<byte[]> keys, Region region, long clientTimeStamp,
ImmutableBytesPtr cacheKey) throws IOException, SQLException {
List<KeyRange> keyRanges = Lists.newArrayListWithExpectedSize(keys.size());
for (byte[] key : keys) {
byte[] stopKey = ByteUtil.concat(key, QueryConstants.SEPARATOR_BYTE_ARRAY);
ByteUtil.nextKey(stopKey, stopKey.length);
keyRanges.add(PVarbinary.INSTANCE.getKeyRange(key, true, stopKey, false));
}
Scan scan = new Scan();
scan.setTimeRange(MIN_TABLE_TIMESTAMP, clientTimeStamp);
ScanRanges scanRanges = ScanRanges.createPointLookup(keyRanges);
scanRanges.initializeScan(scan);
scan.setFilter(scanRanges.getSkipScanFilter());
RegionScanner scanner = region.getScanner(scan);
Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
List<PSchema> schemas = new ArrayList<PSchema>();
PSchema schema = null;
try {
for (int i = 0; i < keys.size(); i++) {
schema = null;
schema = getSchema(scanner, clientTimeStamp);
if (schema == null) { return null; }
metaDataCache.put(cacheKey, schema);
schemas.add(schema);
}
return schemas;
} finally {
scanner.close();
}
}
private void addIndexToTable(PName tenantId, PName schemaName, PName indexName, PName tableName, long clientTimeStamp, List<PTable> indexes) throws IOException, SQLException {
byte[] key = SchemaUtil.getTableKey(tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(), schemaName.getBytes(), indexName.getBytes());
PTable indexTable = doGetTable(key, clientTimeStamp);
if (indexTable == null) {
ServerUtil.throwIOException("Index not found", new TableNotFoundException(schemaName.getString(), indexName.getString()));
return;
}
indexes.add(indexTable);
}
private void addColumnToTable(List<Cell> results, PName colName, PName famName,
Cell[] colKeyValues, List<PColumn> columns, boolean isSalted) {
int i = 0;
int j = 0;
while (i < results.size() && j < COLUMN_KV_COLUMNS.size()) {
Cell kv = results.get(i);
Cell searchKv = COLUMN_KV_COLUMNS.get(j);
int cmp =
Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
kv.getQualifierLength(), searchKv.getQualifierArray(),
searchKv.getQualifierOffset(), searchKv.getQualifierLength());
if (cmp == 0) {
colKeyValues[j++] = kv;
i++;
} else if (cmp > 0) {
colKeyValues[j++] = null;
} else {
i++; // shouldn't happen - means unexpected KV in system table column row
}
}
if (colKeyValues[DATA_TYPE_INDEX] == null || colKeyValues[NULLABLE_INDEX] == null
|| colKeyValues[ORDINAL_POSITION_INDEX] == null) {
throw new IllegalStateException("Didn't find all required key values in '"
+ colName.getString() + "' column metadata row");
}
Cell columnSizeKv = colKeyValues[COLUMN_SIZE_INDEX];
Integer maxLength =
columnSizeKv == null ? null : PInteger.INSTANCE.getCodec().decodeInt(
columnSizeKv.getValueArray(), columnSizeKv.getValueOffset(), SortOrder.getDefault());
Cell decimalDigitKv = colKeyValues[DECIMAL_DIGITS_INDEX];
Integer scale =
decimalDigitKv == null ? null : PInteger.INSTANCE.getCodec().decodeInt(
decimalDigitKv.getValueArray(), decimalDigitKv.getValueOffset(), SortOrder.getDefault());
Cell ordinalPositionKv = colKeyValues[ORDINAL_POSITION_INDEX];
int position =
PInteger.INSTANCE.getCodec().decodeInt(ordinalPositionKv.getValueArray(),
ordinalPositionKv.getValueOffset(), SortOrder.getDefault()) + (isSalted ? 1 : 0);
Cell nullableKv = colKeyValues[NULLABLE_INDEX];
boolean isNullable =
PInteger.INSTANCE.getCodec().decodeInt(nullableKv.getValueArray(),
nullableKv.getValueOffset(), SortOrder.getDefault()) != ResultSetMetaData.columnNoNulls;
Cell dataTypeKv = colKeyValues[DATA_TYPE_INDEX];
PDataType dataType =
PDataType.fromTypeId(PInteger.INSTANCE.getCodec().decodeInt(
dataTypeKv.getValueArray(), dataTypeKv.getValueOffset(), SortOrder.getDefault()));
if (maxLength == null && dataType == PBinary.INSTANCE) dataType = PVarbinary.INSTANCE; // For
// backward
// compatibility.
Cell sortOrderKv = colKeyValues[SORT_ORDER_INDEX];
SortOrder sortOrder =
sortOrderKv == null ? SortOrder.getDefault() : SortOrder.fromSystemValue(PInteger.INSTANCE
.getCodec().decodeInt(sortOrderKv.getValueArray(),
sortOrderKv.getValueOffset(), SortOrder.getDefault()));
Cell arraySizeKv = colKeyValues[ARRAY_SIZE_INDEX];
Integer arraySize = arraySizeKv == null ? null :
PInteger.INSTANCE.getCodec().decodeInt(arraySizeKv.getValueArray(), arraySizeKv.getValueOffset(), SortOrder.getDefault());
Cell viewConstantKv = colKeyValues[VIEW_CONSTANT_INDEX];
byte[] viewConstant = viewConstantKv == null ? null : viewConstantKv.getValue();
Cell isViewReferencedKv = colKeyValues[IS_VIEW_REFERENCED_INDEX];
boolean isViewReferenced = isViewReferencedKv != null && Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isViewReferencedKv.getValueArray(), isViewReferencedKv.getValueOffset(), isViewReferencedKv.getValueLength()));
Cell columnDefKv = colKeyValues[COLUMN_DEF_INDEX];
String expressionStr = columnDefKv==null ? null : (String)PVarchar.INSTANCE.toObject(columnDefKv.getValueArray(), columnDefKv.getValueOffset(), columnDefKv.getValueLength());
Cell isRowTimestampKV = colKeyValues[IS_ROW_TIMESTAMP_INDEX];
boolean isRowTimestamp =
isRowTimestampKV == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(
isRowTimestampKV.getValueArray(), isRowTimestampKV.getValueOffset(),
isRowTimestampKV.getValueLength()));
boolean isPkColumn = famName == null || famName.getString() == null;
Cell columnQualifierKV = colKeyValues[COLUMN_QUALIFIER_INDEX];
// Older tables won't have column qualifier metadata present. To make things simpler, just set the
// column qualifier bytes by using the column name.
byte[] columnQualifierBytes = columnQualifierKV != null ?
Arrays.copyOfRange(columnQualifierKV.getValueArray(),
columnQualifierKV.getValueOffset(), columnQualifierKV.getValueOffset()
+ columnQualifierKV.getValueLength()) : (isPkColumn ? null : colName.getBytes());
PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, false, columnQualifierBytes);
columns.add(column);
}
private void addArgumentToFunction(List<Cell> results, PName functionName, PName type,
Cell[] functionKeyValues, List<FunctionArgument> arguments, short argPosition) throws SQLException {
int i = 0;
int j = 0;
while (i < results.size() && j < FUNCTION_ARG_KV_COLUMNS.size()) {
Cell kv = results.get(i);
Cell searchKv = FUNCTION_ARG_KV_COLUMNS.get(j);
int cmp =
Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
kv.getQualifierLength(), searchKv.getQualifierArray(),
searchKv.getQualifierOffset(), searchKv.getQualifierLength());
if (cmp == 0) {
functionKeyValues[j++] = kv;
i++;
} else if (cmp > 0) {
functionKeyValues[j++] = null;
} else {
i++; // shouldn't happen - means unexpected KV in system table column row
}
}
Cell isArrayKv = functionKeyValues[IS_ARRAY_INDEX];
boolean isArrayType =
isArrayKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(
isArrayKv.getValueArray(), isArrayKv.getValueOffset(),
isArrayKv.getValueLength()));
Cell isConstantKv = functionKeyValues[IS_CONSTANT_INDEX];
boolean isConstant =
isConstantKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(
isConstantKv.getValueArray(), isConstantKv.getValueOffset(),
isConstantKv.getValueLength()));
Cell defaultValueKv = functionKeyValues[DEFAULT_VALUE_INDEX];
String defaultValue =
defaultValueKv == null ? null : (String) PVarchar.INSTANCE.toObject(
defaultValueKv.getValueArray(), defaultValueKv.getValueOffset(),
defaultValueKv.getValueLength());
Cell minValueKv = functionKeyValues[MIN_VALUE_INDEX];
String minValue =
minValueKv == null ? null : (String) PVarchar.INSTANCE.toObject(
minValueKv.getValueArray(), minValueKv.getValueOffset(),
minValueKv.getValueLength());
Cell maxValueKv = functionKeyValues[MAX_VALUE_INDEX];
String maxValue =
maxValueKv == null ? null : (String) PVarchar.INSTANCE.toObject(
maxValueKv.getValueArray(), maxValueKv.getValueOffset(),
maxValueKv.getValueLength());
FunctionArgument arg =
new FunctionArgument(type.getString(), isArrayType, isConstant,
defaultValue == null ? null : LiteralExpression.newConstant((new LiteralParseNode(defaultValue)).getValue()),
minValue == null ? null : LiteralExpression.newConstant((new LiteralParseNode(minValue)).getValue()),
maxValue == null ? null : LiteralExpression.newConstant((new LiteralParseNode(maxValue)).getValue()),
argPosition);
arguments.add(arg);
}
private PTable getTable(RegionScanner scanner, long clientTimeStamp, long tableTimeStamp)
throws IOException, SQLException {
List<Cell> results = Lists.newArrayList();
scanner.next(results);
if (results.isEmpty()) {
return null;
}
Cell[] tableKeyValues = new Cell[TABLE_KV_COLUMNS.size()];
Cell[] colKeyValues = new Cell[COLUMN_KV_COLUMNS.size()];
// Create PTable based on KeyValues from scan
Cell keyValue = results.get(0);
byte[] keyBuffer = keyValue.getRowArray();
int keyLength = keyValue.getRowLength();
int keyOffset = keyValue.getRowOffset();
PName tenantId = newPName(keyBuffer, keyOffset, keyLength);
int tenantIdLength = (tenantId == null) ? 0 : tenantId.getBytes().length;
if (tenantIdLength == 0) {
tenantId = null;
}
PName schemaName = newPName(keyBuffer, keyOffset+tenantIdLength+1, keyLength);
int schemaNameLength = schemaName.getBytes().length;
int tableNameLength = keyLength - schemaNameLength - 1 - tenantIdLength - 1;
byte[] tableNameBytes = new byte[tableNameLength];
System.arraycopy(keyBuffer, keyOffset + schemaNameLength + 1 + tenantIdLength + 1,
tableNameBytes, 0, tableNameLength);
PName tableName = PNameFactory.newName(tableNameBytes);
int offset = tenantIdLength + schemaNameLength + tableNameLength + 3;
// This will prevent the client from continually looking for the current
// table when we know that there will never be one since we disallow updates
// unless the table is the latest
// If we already have a table newer than the one we just found and
// the client timestamp is less that the existing table time stamp,
// bump up the timeStamp to right before the client time stamp, since
// we know it can't possibly change.
long timeStamp = keyValue.getTimestamp();
// long timeStamp = tableTimeStamp > keyValue.getTimestamp() &&
// clientTimeStamp < tableTimeStamp
// ? clientTimeStamp-1
// : keyValue.getTimestamp();
int i = 0;
int j = 0;
while (i < results.size() && j < TABLE_KV_COLUMNS.size()) {
Cell kv = results.get(i);
Cell searchKv = TABLE_KV_COLUMNS.get(j);
int cmp =
Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
kv.getQualifierLength(), searchKv.getQualifierArray(),
searchKv.getQualifierOffset(), searchKv.getQualifierLength());
if (cmp == 0) {
timeStamp = Math.max(timeStamp, kv.getTimestamp()); // Find max timestamp of table
// header row
tableKeyValues[j++] = kv;
i++;
} else if (cmp > 0) {
timeStamp = Math.max(timeStamp, kv.getTimestamp());
tableKeyValues[j++] = null;
} else {
i++; // shouldn't happen - means unexpected KV in system table header row
}
}
// TABLE_TYPE, TABLE_SEQ_NUM and COLUMN_COUNT are required.
if (tableKeyValues[TABLE_TYPE_INDEX] == null || tableKeyValues[TABLE_SEQ_NUM_INDEX] == null
|| tableKeyValues[COLUMN_COUNT_INDEX] == null) {
throw new IllegalStateException(
"Didn't find expected key values for table row in metadata row");
}
Cell tableTypeKv = tableKeyValues[TABLE_TYPE_INDEX];
PTableType tableType =
PTableType
.fromSerializedValue(tableTypeKv.getValueArray()[tableTypeKv.getValueOffset()]);
Cell tableSeqNumKv = tableKeyValues[TABLE_SEQ_NUM_INDEX];
long tableSeqNum =
PLong.INSTANCE.getCodec().decodeLong(tableSeqNumKv.getValueArray(),
tableSeqNumKv.getValueOffset(), SortOrder.getDefault());
Cell columnCountKv = tableKeyValues[COLUMN_COUNT_INDEX];
int columnCount =
PInteger.INSTANCE.getCodec().decodeInt(columnCountKv.getValueArray(),
columnCountKv.getValueOffset(), SortOrder.getDefault());
Cell pkNameKv = tableKeyValues[PK_NAME_INDEX];
PName pkName =
pkNameKv != null ? newPName(pkNameKv.getValueArray(), pkNameKv.getValueOffset(),
pkNameKv.getValueLength()) : null;
Cell saltBucketNumKv = tableKeyValues[SALT_BUCKETS_INDEX];
Integer saltBucketNum =
saltBucketNumKv != null ? (Integer) PInteger.INSTANCE.getCodec().decodeInt(
saltBucketNumKv.getValueArray(), saltBucketNumKv.getValueOffset(), SortOrder.getDefault()) : null;
if (saltBucketNum != null && saltBucketNum.intValue() == 0) {
saltBucketNum = null; // Zero salt buckets means not salted
}
Cell dataTableNameKv = tableKeyValues[DATA_TABLE_NAME_INDEX];
PName dataTableName =
dataTableNameKv != null ? newPName(dataTableNameKv.getValueArray(),
dataTableNameKv.getValueOffset(), dataTableNameKv.getValueLength()) : null;
Cell indexStateKv = tableKeyValues[INDEX_STATE_INDEX];
PIndexState indexState =
indexStateKv == null ? null : PIndexState.fromSerializedValue(indexStateKv
.getValueArray()[indexStateKv.getValueOffset()]);
Cell immutableRowsKv = tableKeyValues[IMMUTABLE_ROWS_INDEX];
boolean isImmutableRows =
immutableRowsKv == null ? false : (Boolean) PBoolean.INSTANCE.toObject(
immutableRowsKv.getValueArray(), immutableRowsKv.getValueOffset(),
immutableRowsKv.getValueLength());
Cell defaultFamilyNameKv = tableKeyValues[DEFAULT_COLUMN_FAMILY_INDEX];
PName defaultFamilyName = defaultFamilyNameKv != null ? newPName(defaultFamilyNameKv.getValueArray(), defaultFamilyNameKv.getValueOffset(), defaultFamilyNameKv.getValueLength()) : null;
Cell viewStatementKv = tableKeyValues[VIEW_STATEMENT_INDEX];
String viewStatement = viewStatementKv != null ? (String) PVarchar.INSTANCE.toObject(viewStatementKv.getValueArray(), viewStatementKv.getValueOffset(),
viewStatementKv.getValueLength()) : null;
Cell disableWALKv = tableKeyValues[DISABLE_WAL_INDEX];
boolean disableWAL = disableWALKv == null ? PTable.DEFAULT_DISABLE_WAL : Boolean.TRUE.equals(
PBoolean.INSTANCE.toObject(disableWALKv.getValueArray(), disableWALKv.getValueOffset(), disableWALKv.getValueLength()));
Cell multiTenantKv = tableKeyValues[MULTI_TENANT_INDEX];
boolean multiTenant = multiTenantKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(multiTenantKv.getValueArray(), multiTenantKv.getValueOffset(), multiTenantKv.getValueLength()));
Cell storeNullsKv = tableKeyValues[STORE_NULLS_INDEX];
boolean storeNulls = storeNullsKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(storeNullsKv.getValueArray(), storeNullsKv.getValueOffset(), storeNullsKv.getValueLength()));
Cell transactionalKv = tableKeyValues[TRANSACTIONAL_INDEX];
boolean transactional = transactionalKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(transactionalKv.getValueArray(), transactionalKv.getValueOffset(), transactionalKv.getValueLength()));
Cell viewTypeKv = tableKeyValues[VIEW_TYPE_INDEX];
ViewType viewType = viewTypeKv == null ? null : ViewType.fromSerializedValue(viewTypeKv.getValueArray()[viewTypeKv.getValueOffset()]);
Cell viewIndexIdKv = tableKeyValues[VIEW_INDEX_ID_INDEX];
Short viewIndexId = viewIndexIdKv == null ? null : (Short)MetaDataUtil.getViewIndexIdDataType().getCodec().decodeShort(viewIndexIdKv.getValueArray(), viewIndexIdKv.getValueOffset(), SortOrder.getDefault());
Cell indexTypeKv = tableKeyValues[INDEX_TYPE_INDEX];
IndexType indexType = indexTypeKv == null ? null : IndexType.fromSerializedValue(indexTypeKv.getValueArray()[indexTypeKv.getValueOffset()]);
Cell baseColumnCountKv = tableKeyValues[BASE_COLUMN_COUNT_INDEX];
int baseColumnCount = baseColumnCountKv == null ? 0 : PInteger.INSTANCE.getCodec().decodeInt(baseColumnCountKv.getValueArray(),
baseColumnCountKv.getValueOffset(), SortOrder.getDefault());
Cell rowKeyOrderOptimizableKv = tableKeyValues[ROW_KEY_ORDER_OPTIMIZABLE_INDEX];
boolean rowKeyOrderOptimizable = rowKeyOrderOptimizableKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(rowKeyOrderOptimizableKv.getValueArray(), rowKeyOrderOptimizableKv.getValueOffset(), rowKeyOrderOptimizableKv.getValueLength()));
Cell updateCacheFrequencyKv = tableKeyValues[UPDATE_CACHE_FREQUENCY_INDEX];
long updateCacheFrequency = updateCacheFrequencyKv == null ? 0 :
PLong.INSTANCE.getCodec().decodeLong(updateCacheFrequencyKv.getValueArray(),
updateCacheFrequencyKv.getValueOffset(), SortOrder.getDefault());
Cell indexDisableTimestampKv = tableKeyValues[INDEX_DISABLE_TIMESTAMP];
long indexDisableTimestamp = indexDisableTimestampKv == null ? 0L : PLong.INSTANCE.getCodec().decodeLong(indexDisableTimestampKv.getValueArray(),
indexDisableTimestampKv.getValueOffset(), SortOrder.getDefault());
Cell isNamespaceMappedKv = tableKeyValues[IS_NAMESPACE_MAPPED_INDEX];
boolean isNamespaceMapped = isNamespaceMappedKv == null ? false
: Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isNamespaceMappedKv.getValueArray(),
isNamespaceMappedKv.getValueOffset(), isNamespaceMappedKv.getValueLength()));
Cell autoPartitionSeqKv = tableKeyValues[AUTO_PARTITION_SEQ_INDEX];
String autoPartitionSeq = autoPartitionSeqKv != null ? (String) PVarchar.INSTANCE.toObject(autoPartitionSeqKv.getValueArray(), autoPartitionSeqKv.getValueOffset(),
autoPartitionSeqKv.getValueLength()) : null;
Cell isAppendOnlySchemaKv = tableKeyValues[APPEND_ONLY_SCHEMA_INDEX];
boolean isAppendOnlySchema = isAppendOnlySchemaKv == null ? false
: Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isAppendOnlySchemaKv.getValueArray(),
isAppendOnlySchemaKv.getValueOffset(), isAppendOnlySchemaKv.getValueLength()));
Cell storageSchemeKv = tableKeyValues[STORAGE_SCHEME_INDEX];
//TODO: change this once we start having other values for storage schemes
ImmutableStorageScheme storageScheme = storageSchemeKv == null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : ImmutableStorageScheme
.fromSerializedValue((byte)PTinyint.INSTANCE.toObject(storageSchemeKv.getValueArray(),
storageSchemeKv.getValueOffset(), storageSchemeKv.getValueLength()));
Cell encodingSchemeKv = tableKeyValues[QUALIFIER_ENCODING_SCHEME_INDEX];
QualifierEncodingScheme encodingScheme = encodingSchemeKv == null ? QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : QualifierEncodingScheme
.fromSerializedValue((byte)PTinyint.INSTANCE.toObject(encodingSchemeKv.getValueArray(),
encodingSchemeKv.getValueOffset(), encodingSchemeKv.getValueLength()));
List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount);
List<PTable> indexes = Lists.newArrayList();
List<PName> physicalTables = Lists.newArrayList();
PName parentTableName = tableType == INDEX ? dataTableName : null;
PName parentSchemaName = tableType == INDEX ? schemaName : null;
EncodedCQCounter cqCounter =
(!EncodedColumnsUtil.usesEncodedColumnNames(encodingScheme) || tableType == PTableType.VIEW) ? PTable.EncodedCQCounter.NULL_COUNTER
: new EncodedCQCounter();
while (true) {
results.clear();
scanner.next(results);
if (results.isEmpty()) {
break;
}
Cell colKv = results.get(LINK_TYPE_INDEX);
if (colKv != null) {
int colKeyLength = colKv.getRowLength();
PName colName = newPName(colKv.getRowArray(), colKv.getRowOffset() + offset, colKeyLength-offset);
int colKeyOffset = offset + colName.getBytes().length + 1;
PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset);
if (isQualifierCounterKV(colKv)) {
Integer value = PInteger.INSTANCE.getCodec().decodeInt(colKv.getValueArray(), colKv.getValueOffset(), SortOrder.ASC);
cqCounter.setValue(famName.getString(), value);
} else {
if (colName.getString().isEmpty() && famName != null) {
LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
if (linkType == LinkType.INDEX_TABLE) {
addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes);
} else if (linkType == LinkType.PHYSICAL_TABLE) {
physicalTables.add(famName);
} else if (linkType == LinkType.PARENT_TABLE) {
parentTableName = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(famName.getBytes()));
parentSchemaName = PNameFactory.newName(SchemaUtil.getSchemaNameFromFullName(famName.getBytes()));
}
} else {
addColumnToTable(results, colName, famName, colKeyValues, columns, saltBucketNum != null);
}
}
}
}
// Avoid querying the stats table because we're holding the rowLock here. Issuing an RPC to a remote
// server while holding this lock is a bad idea and likely to cause contention.
return PTableImpl.makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp, tableSeqNum,
pkName, saltBucketNum, columns, parentSchemaName, parentTableName, indexes, isImmutableRows, physicalTables, defaultFamilyName,
viewStatement, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType,
rowKeyOrderOptimizable, transactional, updateCacheFrequency, baseColumnCount,
indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, storageScheme, encodingScheme, cqCounter);
}
private boolean isQualifierCounterKV(Cell kv) {
int cmp =
Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
kv.getQualifierLength(), QUALIFIER_COUNTER_KV.getQualifierArray(),
QUALIFIER_COUNTER_KV.getQualifierOffset(), QUALIFIER_COUNTER_KV.getQualifierLength());
return cmp == 0;
}
private PSchema getSchema(RegionScanner scanner, long clientTimeStamp) throws IOException, SQLException {
List<Cell> results = Lists.newArrayList();
scanner.next(results);
if (results.isEmpty()) { return null; }
Cell keyValue = results.get(0);
byte[] keyBuffer = keyValue.getRowArray();
int keyLength = keyValue.getRowLength();
int keyOffset = keyValue.getRowOffset();
PName tenantId = newPName(keyBuffer, keyOffset, keyLength);
int tenantIdLength = (tenantId == null) ? 0 : tenantId.getBytes().length;
if (tenantIdLength == 0) {
tenantId = null;
}
PName schemaName = newPName(keyBuffer, keyOffset + tenantIdLength + 1, keyLength - tenantIdLength - 1);
long timeStamp = keyValue.getTimestamp();
return new PSchema(schemaName.getString(), timeStamp);
}
private PFunction getFunction(RegionScanner scanner, final boolean isReplace, long clientTimeStamp, List<Mutation> deleteMutationsForReplace)
throws IOException, SQLException {
List<Cell> results = Lists.newArrayList();
scanner.next(results);
if (results.isEmpty()) {
return null;
}
Cell[] functionKeyValues = new Cell[FUNCTION_KV_COLUMNS.size()];
Cell[] functionArgKeyValues = new Cell[FUNCTION_ARG_KV_COLUMNS.size()];
// Create PFunction based on KeyValues from scan
Cell keyValue = results.get(0);
byte[] keyBuffer = keyValue.getRowArray();
int keyLength = keyValue.getRowLength();
int keyOffset = keyValue.getRowOffset();
long currentTimeMillis = EnvironmentEdgeManager.currentTimeMillis();
if(isReplace) {
long deleteTimeStamp =
clientTimeStamp == HConstants.LATEST_TIMESTAMP ? currentTimeMillis - 1
: (keyValue.getTimestamp() < clientTimeStamp ? clientTimeStamp - 1
: keyValue.getTimestamp());
deleteMutationsForReplace.add(new Delete(keyBuffer, keyOffset, keyLength, deleteTimeStamp));
}
PName tenantId = newPName(keyBuffer, keyOffset, keyLength);
int tenantIdLength = (tenantId == null) ? 0 : tenantId.getBytes().length;
if (tenantIdLength == 0) {
tenantId = null;
}
PName functionName =
newPName(keyBuffer, keyOffset + tenantIdLength + 1, keyLength - tenantIdLength - 1);
int functionNameLength = functionName.getBytes().length+1;
int offset = tenantIdLength + functionNameLength + 1;
long timeStamp = keyValue.getTimestamp();
int i = 0;
int j = 0;
while (i < results.size() && j < FUNCTION_KV_COLUMNS.size()) {
Cell kv = results.get(i);
Cell searchKv = FUNCTION_KV_COLUMNS.get(j);
int cmp =
Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
kv.getQualifierLength(), searchKv.getQualifierArray(),
searchKv.getQualifierOffset(), searchKv.getQualifierLength());
if (cmp == 0) {
timeStamp = Math.max(timeStamp, kv.getTimestamp()); // Find max timestamp of table
// header row
functionKeyValues[j++] = kv;
i++;
} else if (cmp > 0) {
timeStamp = Math.max(timeStamp, kv.getTimestamp());
functionKeyValues[j++] = null;
} else {
i++; // shouldn't happen - means unexpected KV in system table header row
}
}
// CLASS_NAME,NUM_ARGS and JAR_PATH are required.
if (functionKeyValues[CLASS_NAME_INDEX] == null || functionKeyValues[NUM_ARGS_INDEX] == null) {
throw new IllegalStateException(
"Didn't find expected key values for function row in metadata row");
}
Cell classNameKv = functionKeyValues[CLASS_NAME_INDEX];
PName className = newPName(classNameKv.getValueArray(), classNameKv.getValueOffset(),
classNameKv.getValueLength());
Cell jarPathKv = functionKeyValues[JAR_PATH_INDEX];
PName jarPath = null;
if(jarPathKv != null) {
jarPath = newPName(jarPathKv.getValueArray(), jarPathKv.getValueOffset(),
jarPathKv.getValueLength());
}
Cell numArgsKv = functionKeyValues[NUM_ARGS_INDEX];
int numArgs =
PInteger.INSTANCE.getCodec().decodeInt(numArgsKv.getValueArray(),
numArgsKv.getValueOffset(), SortOrder.getDefault());
Cell returnTypeKv = functionKeyValues[RETURN_TYPE_INDEX];
PName returnType =
returnTypeKv == null ? null : newPName(returnTypeKv.getValueArray(),
returnTypeKv.getValueOffset(), returnTypeKv.getValueLength());
List<FunctionArgument> arguments = Lists.newArrayListWithExpectedSize(numArgs);
for (int k = 0; k < numArgs; k++) {
results.clear();
scanner.next(results);
if (results.isEmpty()) {
break;
}
Cell typeKv = results.get(0);
if(isReplace) {
long deleteTimeStamp =
clientTimeStamp == HConstants.LATEST_TIMESTAMP ? currentTimeMillis - 1
: (typeKv.getTimestamp() < clientTimeStamp ? clientTimeStamp - 1
: typeKv.getTimestamp());
deleteMutationsForReplace.add(new Delete(typeKv.getRowArray(), typeKv
.getRowOffset(), typeKv.getRowLength(), deleteTimeStamp));
}
int typeKeyLength = typeKv.getRowLength();
PName typeName =
newPName(typeKv.getRowArray(), typeKv.getRowOffset() + offset, typeKeyLength
- offset - 3);
int argPositionOffset = offset + typeName.getBytes().length + 1;
short argPosition = Bytes.toShort(typeKv.getRowArray(), typeKv.getRowOffset() + argPositionOffset, typeKeyLength
- argPositionOffset);
addArgumentToFunction(results, functionName, typeName, functionArgKeyValues, arguments, argPosition);
}
Collections.sort(arguments, new Comparator<FunctionArgument>() {
@Override
public int compare(FunctionArgument o1, FunctionArgument o2) {
return o1.getArgPosition() - o2.getArgPosition();
}
});
return new PFunction(tenantId, functionName.getString(), arguments, returnType.getString(),
className.getString(), jarPath == null ? null : jarPath.getString(), timeStamp);
}
private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, Region region,
long clientTimeStamp) throws IOException {
if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
return null;
}
Scan scan = MetaDataUtil.newTableRowsScan(key, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
scan.setFilter(new FirstKeyOnlyFilter());
scan.setRaw(true);
List<Cell> results = Lists.<Cell> newArrayList();
try (RegionScanner scanner = region.getScanner(scan)) {
scanner.next(results);
}
for (Cell kv : results) {
KeyValue.Type type = Type.codeToType(kv.getTypeByte());
if (type == Type.DeleteFamily) { // Row was deleted
Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
GlobalCache.getInstance(this.env).getMetaDataCache();
PTable table = newDeletedTableMarker(kv.getTimestamp());
metaDataCache.put(cacheKey, table);
return table;
}
}
return null;
}
private PFunction buildDeletedFunction(byte[] key, ImmutableBytesPtr cacheKey, Region region,
long clientTimeStamp) throws IOException {
if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
return null;
}
Scan scan = MetaDataUtil.newTableRowsScan(key, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
scan.setFilter(new FirstKeyOnlyFilter());
scan.setRaw(true);
List<Cell> results = Lists.<Cell> newArrayList();
try (RegionScanner scanner = region.getScanner(scan);) {
scanner.next(results);
}
// HBase ignores the time range on a raw scan (HBASE-7362)
if (!results.isEmpty() && results.get(0).getTimestamp() > clientTimeStamp) {
Cell kv = results.get(0);
if (kv.getTypeByte() == Type.Delete.getCode()) {
Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
GlobalCache.getInstance(this.env).getMetaDataCache();
PFunction function = newDeletedFunctionMarker(kv.getTimestamp());
metaDataCache.put(cacheKey, function);
return function;
}
}
return null;
}
private PSchema buildDeletedSchema(byte[] key, ImmutableBytesPtr cacheKey, Region region, long clientTimeStamp)
throws IOException {
if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) { return null; }
Scan scan = MetaDataUtil.newTableRowsScan(key, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
scan.setFilter(new FirstKeyOnlyFilter());
scan.setRaw(true);
List<Cell> results = Lists.<Cell> newArrayList();
try (RegionScanner scanner = region.getScanner(scan);) {
scanner.next(results);
}
// HBase ignores the time range on a raw scan (HBASE-7362)
if (!results.isEmpty() && results.get(0).getTimestamp() > clientTimeStamp) {
Cell kv = results.get(0);
if (kv.getTypeByte() == Type.Delete.getCode()) {
Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env)
.getMetaDataCache();
PSchema schema = newDeletedSchemaMarker(kv.getTimestamp());
metaDataCache.put(cacheKey, schema);
return schema;
}
}
return null;
}
private static PTable newDeletedTableMarker(long timestamp) {
return new PTableImpl(timestamp);
}
private static PFunction newDeletedFunctionMarker(long timestamp) {
return new PFunction(timestamp);
}
private static PSchema newDeletedSchemaMarker(long timestamp) {
return new PSchema(timestamp);
}
private static boolean isTableDeleted(PTable table) {
return table.getName() == null;
}
private static boolean isSchemaDeleted(PSchema schema) {
return schema.getSchemaName() == null;
}
private static boolean isFunctionDeleted(PFunction function) {
return function.getFunctionName() == null;
}
private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key,
ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp)
throws IOException, SQLException {
Region region = env.getRegion();
Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
// We always cache the latest version - fault in if not in cache
if (table != null || (table = buildTable(key, cacheKey, region, asOfTimeStamp)) != null) {
return table;
}
// if not found then check if newer table already exists and add delete marker for timestamp
// found
if (table == null
&& (table = buildDeletedTable(key, cacheKey, region, clientTimeStamp)) != null) {
return table;
}
return null;
}
private PFunction loadFunction(RegionCoprocessorEnvironment env, byte[] key,
ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp, boolean isReplace, List<Mutation> deleteMutationsForReplace)
throws IOException, SQLException {
Region region = env.getRegion();
Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
PFunction function = (PFunction)metaDataCache.getIfPresent(cacheKey);
// We always cache the latest version - fault in if not in cache
if (function != null && !isReplace) {
return function;
}
ArrayList<byte[]> arrayList = new ArrayList<byte[]>(1);
arrayList.add(key);
List<PFunction> functions = buildFunctions(arrayList, region, asOfTimeStamp, isReplace, deleteMutationsForReplace);
if(functions != null) return functions.get(0);
// if not found then check if newer table already exists and add delete marker for timestamp
// found
if (function == null
&& (function = buildDeletedFunction(key, cacheKey, region, clientTimeStamp)) != null) {
return function;
}
return null;
}
private PSchema loadSchema(RegionCoprocessorEnvironment env, byte[] key, ImmutableBytesPtr cacheKey,
long clientTimeStamp, long asOfTimeStamp) throws IOException, SQLException {
Region region = env.getRegion();
Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
PSchema schema = (PSchema)metaDataCache.getIfPresent(cacheKey);
// We always cache the latest version - fault in if not in cache
if (schema != null) { return schema; }
ArrayList<byte[]> arrayList = new ArrayList<byte[]>(1);
arrayList.add(key);
List<PSchema> schemas = buildSchemas(arrayList, region, asOfTimeStamp, cacheKey);
if (schemas != null) return schemas.get(0);
// if not found then check if newer schema already exists and add delete marker for timestamp
// found
if (schema == null
&& (schema = buildDeletedSchema(key, cacheKey, region, clientTimeStamp)) != null) { return schema; }
return null;
}
/**
*
* @return null if the physical table row information is not present.
*
*/
private static Mutation getPhysicalTableForView(List<Mutation> tableMetadata, byte[][] parentSchemaTableNames) {
int size = tableMetadata.size();
byte[][] rowKeyMetaData = new byte[3][];
MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
Mutation physicalTableRow = null;
boolean physicalTableLinkFound = false;
if (size >= 2) {
int i = size - 1;
while (i >= 1) {
Mutation m = tableMetadata.get(i);
if (m instanceof Put) {
LinkType linkType = MetaDataUtil.getLinkType(m);
if (linkType == LinkType.PHYSICAL_TABLE) {
physicalTableRow = m;
physicalTableLinkFound = true;
break;
}
}
i--;
}
}
if (!physicalTableLinkFound) {
parentSchemaTableNames[0] = null;
parentSchemaTableNames[1] = null;
return null;
}
rowKeyMetaData = new byte[5][];
getVarChars(physicalTableRow.getRow(), 5, rowKeyMetaData);
byte[] colBytes = rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX];
byte[] famBytes = rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX];
if ((colBytes == null || colBytes.length == 0) && (famBytes != null && famBytes.length > 0)) {
byte[] sName = SchemaUtil.getSchemaNameFromFullName(famBytes).getBytes();
byte[] tName = SchemaUtil.getTableNameFromFullName(famBytes).getBytes();
parentSchemaTableNames[0] = sName;
parentSchemaTableNames[1] = tName;
}
return physicalTableRow;
}
@Override
public void createTable(RpcController controller, CreateTableRequest request,
RpcCallback<MetaDataResponse> done) {
MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
byte[][] rowKeyMetaData = new byte[3][];
byte[] schemaName = null;
byte[] tableName = null;
try {
List<Mutation> tableMetadata = ProtobufUtil.getMutations(request);
MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
byte[] parentSchemaName = null;
byte[] parentTableName = null;
PTableType tableType = MetaDataUtil.getTableType(tableMetadata, GenericKeyValueBuilder.INSTANCE, new ImmutableBytesWritable());
byte[] parentTableKey = null;
Mutation viewPhysicalTableRow = null;
if (tableType == PTableType.VIEW) {
byte[][] parentSchemaTableNames = new byte[2][];
/*
* For a view, we lock the base physical table row. For a mapped view, there is
* no link present to the physical table. So the viewPhysicalTableRow is null
* in that case.
*/
viewPhysicalTableRow = getPhysicalTableForView(tableMetadata, parentSchemaTableNames);
parentSchemaName = parentSchemaTableNames[0];
parentTableName = parentSchemaTableNames[1];
if (parentTableName != null) {
parentTableKey = SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY, parentSchemaName, parentTableName);
}
} else if (tableType == PTableType.INDEX) {
parentSchemaName = schemaName;
/*
* For an index we lock the parent table's row which could be a physical table or a view.
* If the parent table is a physical table, then the tenantIdBytes is empty because
* we allow creating an index with a tenant connection only if the parent table is a view.
*/
parentTableName = MetaDataUtil.getParentTableName(tableMetadata);
parentTableKey = SchemaUtil.getTableKey(tenantIdBytes, parentSchemaName, parentTableName);
}
Region region = env.getRegion();
List<RowLock> locks = Lists.newArrayList();
// Place a lock using key for the table to be created
byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName);
try {
acquireLock(region, tableKey, locks);
// If the table key resides outside the region, return without doing anything
MetaDataMutationResult result = checkTableKeyInRegion(tableKey, region);
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
return;
}
long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
ImmutableBytesPtr parentCacheKey = null;
PTable parentTable = null;
if (parentTableName != null) {
// Check if the parent table resides in the same region. If not, don't worry about locking the parent table row
// or loading the parent table. For a view, the parent table that needs to be locked is the base physical table.
// For an index on view, the view header row needs to be locked.
result = checkTableKeyInRegion(parentTableKey, region);
if (result == null) {
acquireLock(region, parentTableKey, locks);
parentCacheKey = new ImmutableBytesPtr(parentTableKey);
parentTable = loadTable(env, parentTableKey, parentCacheKey, clientTimeStamp,
clientTimeStamp);
if (parentTable == null || isTableDeleted(parentTable)) {
builder.setReturnCode(MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
done.run(builder.build());
return;
}
long parentTableSeqNumber;
if (tableType == PTableType.VIEW && viewPhysicalTableRow != null && request.hasClientVersion()) {
// Starting 4.5, the client passes the sequence number of the physical table in the table metadata.
parentTableSeqNumber = MetaDataUtil.getSequenceNumber(viewPhysicalTableRow);
} else if (tableType == PTableType.VIEW && !request.hasClientVersion()) {
// Before 4.5, due to a bug, the parent table key wasn't available.
// So don't do anything and prevent the exception from being thrown.
parentTableSeqNumber = parentTable.getSequenceNumber();
} else {
parentTableSeqNumber = MetaDataUtil.getParentSequenceNumber(tableMetadata);
}
// If parent table isn't at the expected sequence number, then return
if (parentTable.getSequenceNumber() != parentTableSeqNumber) {
builder.setReturnCode(MetaDataProtos.MutationCode.CONCURRENT_TABLE_MUTATION);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
builder.setTable(PTableImpl.toProto(parentTable));
done.run(builder.build());
return;
}
}
}
// Load child table next
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey);
// Get as of latest timestamp so we can detect if we have a newer table that already
// exists without making an additional query
PTable table =
loadTable(env, tableKey, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
if (table != null) {
if (table.getTimeStamp() < clientTimeStamp) {
// If the table is older than the client time stamp and it's deleted,
// continue
if (!isTableDeleted(table)) {
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
builder.setTable(PTableImpl.toProto(table));
done.run(builder.build());
return;
}
} else {
builder.setReturnCode(MetaDataProtos.MutationCode.NEWER_TABLE_FOUND);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
builder.setTable(PTableImpl.toProto(table));
done.run(builder.build());
return;
}
}
// Add cell for ROW_KEY_ORDER_OPTIMIZABLE = true, as we know that new tables
// conform the correct row key. The exception is for a VIEW, which the client
// sends over depending on its base physical table.
if (tableType != PTableType.VIEW) {
UpgradeUtil.addRowKeyOrderOptimizableCell(tableMetadata, tableKey, clientTimeStamp);
}
// If the parent table of the view has the auto partition sequence name attribute, modify the
// tableMetadata and set the view statement and partition column correctly
if (parentTable!=null && parentTable.getAutoPartitionSeqName()!=null) {
long autoPartitionNum = 1;
try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
Statement stmt = connection.createStatement()) {
String seqName = parentTable.getAutoPartitionSeqName();
// Not going through the standard route of using statement.execute() as that code path
// is blocked if the metadata hasn't been been upgraded to the new minor release.
String seqNextValueSql = String.format("SELECT NEXT VALUE FOR %s", seqName);
PhoenixStatement ps = stmt.unwrap(PhoenixStatement.class);
QueryPlan plan = ps.compileQuery(seqNextValueSql);
ResultIterator resultIterator = plan.iterator();
PhoenixResultSet rs = ps.newResultSet(resultIterator, plan.getProjector(), plan.getContext());
rs.next();
autoPartitionNum = rs.getLong(1);
}
catch (SequenceNotFoundException e) {
builder.setReturnCode(MetaDataProtos.MutationCode.AUTO_PARTITION_SEQUENCE_NOT_FOUND);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
done.run(builder.build());
return;
}
PColumn autoPartitionCol = parentTable.getPKColumns().get(MetaDataUtil.getAutoPartitionColIndex(parentTable));
if (!PLong.INSTANCE.isCoercibleTo(autoPartitionCol.getDataType(), autoPartitionNum)) {
builder.setReturnCode(MetaDataProtos.MutationCode.CANNOT_COERCE_AUTO_PARTITION_ID);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
done.run(builder.build());
return;
}
builder.setAutoPartitionNum(autoPartitionNum);
// set the VIEW STATEMENT column of the header row
Put tableHeaderPut = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata);
NavigableMap<byte[], List<Cell>> familyCellMap = tableHeaderPut.getFamilyCellMap();
List<Cell> cells = familyCellMap.get(TABLE_FAMILY_BYTES);
Cell cell = cells.get(0);
String autoPartitionWhere = QueryUtil.getViewPartitionClause(MetaDataUtil.getAutoPartitionColumnName(parentTable), autoPartitionNum);
String hbaseVersion = VersionInfo.getVersion();
ImmutableBytesPtr ptr = new ImmutableBytesPtr();
KeyValueBuilder kvBuilder = KeyValueBuilder.get(hbaseVersion);
MetaDataUtil.getMutationValue(tableHeaderPut, VIEW_STATEMENT_BYTES, kvBuilder, ptr);
byte[] value = ptr.copyBytesIfNecessary();
byte[] viewStatement = null;
// if we have an existing where clause add the auto partition where clause to it
if (!Bytes.equals(value, QueryConstants.EMPTY_COLUMN_VALUE_BYTES)) {
viewStatement = Bytes.add(value, Bytes.toBytes(" AND "), Bytes.toBytes(autoPartitionWhere));
}
else {
viewStatement = Bytes.toBytes(QueryUtil.getViewStatement(parentTable.getSchemaName().getString(), parentTable.getTableName().getString(), autoPartitionWhere));
}
Cell viewStatementCell = new KeyValue(cell.getRow(), cell.getFamily(), VIEW_STATEMENT_BYTES,
cell.getTimestamp(), Type.codeToType(cell.getTypeByte()), viewStatement);
cells.add(viewStatementCell);
// set the IS_VIEW_REFERENCED column of the auto partition column row
Put autoPartitionPut = MetaDataUtil.getPutOnlyAutoPartitionColumn(parentTable, tableMetadata);
familyCellMap = autoPartitionPut.getFamilyCellMap();
cells = familyCellMap.get(TABLE_FAMILY_BYTES);
cell = cells.get(0);
PDataType dataType = autoPartitionCol.getDataType();
Object val = dataType.toObject(autoPartitionNum, PLong.INSTANCE);
byte[] bytes = new byte [dataType.getByteSize() + 1];
dataType.toBytes(val, bytes, 0);
Cell viewConstantCell = new KeyValue(cell.getRow(), cell.getFamily(), VIEW_CONSTANT_BYTES,
cell.getTimestamp(), Type.codeToType(cell.getTypeByte()), bytes);
cells.add(viewConstantCell);
}
Short indexId = null;
if (request.hasAllocateIndexId() && request.getAllocateIndexId()) {
String tenantIdStr = tenantIdBytes.length == 0 ? null : Bytes.toString(tenantIdBytes);
try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class)) {
PName physicalName = parentTable.getPhysicalName();
int nSequenceSaltBuckets = connection.getQueryServices().getSequenceSaltBuckets();
SequenceKey key = MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName,
nSequenceSaltBuckets, parentTable.isNamespaceMapped() );
// TODO Review Earlier sequence was created at (SCN-1/LATEST_TIMESTAMP) and incremented at the client max(SCN,dataTable.getTimestamp), but it seems we should
// use always LATEST_TIMESTAMP to avoid seeing wrong sequence values by different connection having SCN
// or not.
long sequenceTimestamp = HConstants.LATEST_TIMESTAMP;
try {
connection.getQueryServices().createSequence(key.getTenantId(), key.getSchemaName(), key.getSequenceName(),
Short.MIN_VALUE, 1, 1, Long.MIN_VALUE, Long.MAX_VALUE, false, sequenceTimestamp);
} catch (SequenceAlreadyExistsException e) {
}
long[] seqValues = new long[1];
SQLException[] sqlExceptions = new SQLException[1];
connection.getQueryServices().incrementSequences(Collections.singletonList(new SequenceAllocation(key, 1)),
HConstants.LATEST_TIMESTAMP, seqValues, sqlExceptions);
if (sqlExceptions[0] != null) {
throw sqlExceptions[0];
}
long seqValue = seqValues[0];
if (seqValue > Short.MAX_VALUE) {
builder.setReturnCode(MetaDataProtos.MutationCode.TOO_MANY_INDEXES);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
done.run(builder.build());
return;
}
Put tableHeaderPut = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata);
NavigableMap<byte[], List<Cell>> familyCellMap = tableHeaderPut.getFamilyCellMap();
List<Cell> cells = familyCellMap.get(TABLE_FAMILY_BYTES);
Cell cell = cells.get(0);
PDataType dataType = MetaDataUtil.getViewIndexIdDataType();
Object val = dataType.toObject(seqValue, PLong.INSTANCE);
byte[] bytes = new byte [dataType.getByteSize() + 1];
dataType.toBytes(val, bytes, 0);
Cell indexIdCell = new KeyValue(cell.getRow(), cell.getFamily(), VIEW_INDEX_ID_BYTES,
cell.getTimestamp(), Type.codeToType(cell.getTypeByte()), bytes);
cells.add(indexIdCell);
indexId = (short) seqValue;
}
}
// TODO: Switch this to HRegion#batchMutate when we want to support indexes on the
// system table. Basically, we get all the locks that we don't already hold for all the
// tableMetadata rows. This ensures we don't have deadlock situations (ensuring
// primary and then index table locks are held, in that order). For now, we just don't support
// indexing on the system table. This is an issue because of the way we manage batch mutation
// in the Indexer.
region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
// Invalidate the cache - the next getTable call will add it
// TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache
Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
if (parentCacheKey != null) {
metaDataCache.invalidate(parentCacheKey);
}
metaDataCache.invalidate(cacheKey);
// Get timeStamp from mutations - the above method sets it if it's unset
long currentTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
if (indexId != null) {
builder.setViewIndexId(indexId);
}
builder.setMutationTime(currentTimeStamp);
done.run(builder.build());
return;
} finally {
region.releaseRowLocks(locks);
}
} catch (Throwable t) {
logger.error("createTable failed", t);
ProtobufUtil.setControllerException(controller,
ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
}
}
private static RowLock acquireLock(Region region, byte[] key, List<RowLock> locks)
throws IOException {
RowLock rowLock = region.getRowLock(key, false);
if (rowLock == null) {
throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
}
locks.add(rowLock);
return rowLock;
}
private static final byte[] PHYSICAL_TABLE_BYTES = new byte[] {PTable.LinkType.PHYSICAL_TABLE.getSerializedValue()};
/**
* @param tableName parent table's name
* Looks for whether child views exist for the table specified by table.
* TODO: should we pass a timestamp here?
*/
private TableViewFinderResult findChildViews(Region region, byte[] tenantId, PTable table, byte[] linkTypeBytes) throws IOException {
byte[] schemaName = table.getSchemaName().getBytes();
byte[] tableName = table.getTableName().getBytes();
boolean isMultiTenant = table.isMultiTenant();
Scan scan = new Scan();
// If the table is multi-tenant, we need to check across all tenant_ids,
// so we can't constrain the row key. Otherwise, any views would have
// the same tenantId.
if (!isMultiTenant) {
byte[] startRow = ByteUtil.concat(tenantId, QueryConstants.SEPARATOR_BYTE_ARRAY);
byte[] stopRow = ByteUtil.nextKey(startRow);
scan.setStartRow(startRow);
scan.setStopRow(stopRow);
}
SingleColumnValueFilter linkFilter = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES, CompareOp.EQUAL, linkTypeBytes);
SingleColumnValueFilter tableTypeFilter = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES,
CompareOp.EQUAL, PTableType.VIEW.getSerializedValue().getBytes());
tableTypeFilter.setFilterIfMissing(false);
linkFilter.setFilterIfMissing(true);
byte[] suffix = ByteUtil.concat(QueryConstants.SEPARATOR_BYTE_ARRAY, SchemaUtil
.getPhysicalTableName(SchemaUtil.getTableNameAsBytes(schemaName, tableName), table.isNamespaceMapped())
.getName());
SuffixFilter rowFilter = new SuffixFilter(suffix);
FilterList filter = new FilterList(linkFilter,tableTypeFilter,rowFilter);
scan.setFilter(filter);
scan.addColumn(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES);
scan.addColumn(TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES);
scan.addColumn(TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
// Original region-only scanner modified due to PHOENIX-1208
// RegionScanner scanner = region.getScanner(scan);
// The following *should* work, but doesn't due to HBASE-11837
// TableName systemCatalogTableName = region.getTableDesc().getTableName();
// HTableInterface hTable = env.getTable(systemCatalogTableName);
// These deprecated calls work around the issue
HTableInterface hTable = ServerUtil.getHTableForCoprocessorScan(env,
region.getTableDesc().getTableName().getName());
try {
boolean allViewsInCurrentRegion = true;
int numOfChildViews = 0;
List<Result> results = Lists.newArrayList();
ResultScanner scanner = hTable.getScanner(scan);
try {
for (Result result = scanner.next(); (result != null); result = scanner.next()) {
numOfChildViews++;
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
ResultTuple resultTuple = new ResultTuple(result);
resultTuple.getKey(ptr);
byte[] key = ptr.copyBytes();
if (checkTableKeyInRegion(key, region) != null) {
allViewsInCurrentRegion = false;
}
results.add(result);
}
TableViewFinderResult tableViewFinderResult = new TableViewFinderResult(results, table);
if (numOfChildViews > 0 && !allViewsInCurrentRegion) {
tableViewFinderResult.setAllViewsNotInSingleRegion();
}
return tableViewFinderResult;
} finally {
scanner.close();
}
} finally {
hTable.close();
}
}
@Override
public void dropTable(RpcController controller, DropTableRequest request,
RpcCallback<MetaDataResponse> done) {
MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
boolean isCascade = request.getCascade();
byte[][] rowKeyMetaData = new byte[3][];
String tableType = request.getTableType();
byte[] schemaName = null;
byte[] tableName = null;
try {
List<Mutation> tableMetadata = ProtobufUtil.getMutations(request);
MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
// Disallow deletion of a system table
if (tableType.equals(PTableType.SYSTEM.getSerializedValue())) {
builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
done.run(builder.build());
return;
}
List<byte[]> tableNamesToDelete = Lists.newArrayList();
List<SharedTableState> sharedTablesToDelete = Lists.newArrayList();
byte[] parentTableName = MetaDataUtil.getParentTableName(tableMetadata);
byte[] lockTableName = parentTableName == null ? tableName : parentTableName;
byte[] lockKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, lockTableName);
byte[] key =
parentTableName == null ? lockKey : SchemaUtil.getTableKey(tenantIdBytes,
schemaName, tableName);
Region region = env.getRegion();
MetaDataMutationResult result = checkTableKeyInRegion(key, region);
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
return;
}
List<RowLock> locks = Lists.newArrayList();
try {
acquireLock(region, lockKey, locks);
if (key != lockKey) {
acquireLock(region, key, locks);
}
List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();
result =
doDropTable(key, tenantIdBytes, schemaName, tableName, parentTableName,
PTableType.fromSerializedValue(tableType), tableMetadata,
invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, isCascade);
if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
done.run(MetaDataMutationResult.toProto(result));
return;
}
Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
// Commit the list of deletion.
region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
HConstants.NO_NONCE);
long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
for (ImmutableBytesPtr ckey : invalidateList) {
metaDataCache.put(ckey, newDeletedTableMarker(currentTime));
}
if (parentTableName != null) {
ImmutableBytesPtr parentCacheKey = new ImmutableBytesPtr(lockKey);
metaDataCache.invalidate(parentCacheKey);
}
done.run(MetaDataMutationResult.toProto(result));
return;
} finally {
region.releaseRowLocks(locks);
}
} catch (Throwable t) {
logger.error("dropTable failed", t);
ProtobufUtil.setControllerException(controller,
ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
}
}
private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, byte[] schemaName,
byte[] tableName, byte[] parentTableName, PTableType tableType, List<Mutation> rowsToDelete,
List<ImmutableBytesPtr> invalidateList, List<RowLock> locks,
List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete, boolean isCascade) throws IOException, SQLException {
long clientTimeStamp = MetaDataUtil.getClientTimeStamp(rowsToDelete);
Region region = env.getRegion();
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
// We always cache the latest version - fault in if not in cache
if (table != null
|| (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) != null) {
if (table.getTimeStamp() < clientTimeStamp) {
if (isTableDeleted(table) || tableType != table.getType()) {
return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
}
} else {
return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
EnvironmentEdgeManager.currentTimeMillis(), null);
}
}
// We didn't find a table at the latest timestamp, so either there is no table or
// there was a table, but it's been deleted. In either case we want to return.
if (table == null) {
if (buildDeletedTable(key, cacheKey, region, clientTimeStamp) != null) {
return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
}
return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
}
// Make sure we're not deleting the "wrong" child
if (parentTableName!=null && table.getParentTableName() != null && !Arrays.equals(parentTableName, table.getParentTableName().getBytes())) {
return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
}
// Since we don't allow back in time DDL, we know if we have a table it's the one
// we want to delete. FIXME: we shouldn't need a scan here, but should be able to
// use the table to generate the Delete markers.
Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp);
List<byte[]> indexNames = Lists.newArrayList();
List<Cell> results = Lists.newArrayList();
try (RegionScanner scanner = region.getScanner(scan);) {
scanner.next(results);
if (results.isEmpty()) { // Should not be possible
return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
EnvironmentEdgeManager.currentTimeMillis(), null);
}
// Only tables may have views, so prevent the running of this potentially
// expensive full table scan over the SYSTEM.CATALOG table unless it's needed.
if (tableType == PTableType.TABLE || tableType == PTableType.SYSTEM) {
// Handle any child views that exist
TableViewFinderResult tableViewFinderResult = findChildViews(region, tenantId, table,
PHYSICAL_TABLE_BYTES);
if (tableViewFinderResult.hasViews()) {
if (isCascade) {
if (tableViewFinderResult.allViewsInMultipleRegions()) {
// We don't yet support deleting a table with views where SYSTEM.CATALOG has split and the
// view metadata spans multiple regions
return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
EnvironmentEdgeManager.currentTimeMillis(), null);
} else if (tableViewFinderResult.allViewsInSingleRegion()) {
// Recursively delete views - safe as all the views as all in the same region
for (Result viewResult : tableViewFinderResult.getResults()) {
byte[][] rowKeyMetaData = new byte[3][];
getVarChars(viewResult.getRow(), 3, rowKeyMetaData);
byte[] viewTenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
byte[] viewSchemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
byte[] viewName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
byte[] viewKey = SchemaUtil.getTableKey(viewTenantId, viewSchemaName, viewName);
Delete delete = new Delete(viewKey, clientTimeStamp);
rowsToDelete.add(delete);
acquireLock(region, viewKey, locks);
MetaDataMutationResult result = doDropTable(viewKey, viewTenantId, viewSchemaName,
viewName, null, PTableType.VIEW, rowsToDelete, invalidateList, locks,
tableNamesToDelete, sharedTablesToDelete, false);
if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { return result; }
}
}
} else {
// DROP without CASCADE on tables with child views is not permitted
return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
EnvironmentEdgeManager.currentTimeMillis(), null);
}
}
}
// Add to list of HTables to delete, unless it's a view or its a shared index
if (tableType != PTableType.VIEW && table.getViewIndexId()==null) {
tableNamesToDelete.add(table.getPhysicalName().getBytes());
}
else {
sharedTablesToDelete.add(new SharedTableState(table));
}
invalidateList.add(cacheKey);
byte[][] rowKeyMetaData = new byte[5][];
do {
Cell kv = results.get(LINK_TYPE_INDEX);
int nColumns = getVarChars(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), 0, rowKeyMetaData);
if (nColumns == 5
&& rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length == 0
&& rowKeyMetaData[PhoenixDatabaseMetaData.INDEX_NAME_INDEX].length > 0
&& Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(),
LINK_TYPE_BYTES, 0, LINK_TYPE_BYTES.length) == 0
&& LinkType.fromSerializedValue(kv.getValueArray()[kv.getValueOffset()]) == LinkType.INDEX_TABLE) {
indexNames.add(rowKeyMetaData[PhoenixDatabaseMetaData.INDEX_NAME_INDEX]);
}
// FIXME: Remove when unintentionally deprecated method is fixed (HBASE-7870).
// FIXME: the version of the Delete constructor without the lock args was introduced
// in 0.94.4, thus if we try to use it here we can no longer use the 0.94.2 version
// of the client.
Delete delete = new Delete(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), clientTimeStamp);
rowsToDelete.add(delete);
results.clear();
scanner.next(results);
} while (!results.isEmpty());
}
// Recursively delete indexes
for (byte[] indexName : indexNames) {
byte[] indexKey = SchemaUtil.getTableKey(tenantId, schemaName, indexName);
// FIXME: Remove when unintentionally deprecated method is fixed (HBASE-7870).
// FIXME: the version of the Delete constructor without the lock args was introduced
// in 0.94.4, thus if we try to use it here we can no longer use the 0.94.2 version
// of the client.
Delete delete = new Delete(indexKey, clientTimeStamp);
rowsToDelete.add(delete);
acquireLock(region, indexKey, locks);
MetaDataMutationResult result =
doDropTable(indexKey, tenantId, schemaName, indexName, tableName, PTableType.INDEX,
rowsToDelete, invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, false);
if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
return result;
}
}
return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS,
EnvironmentEdgeManager.currentTimeMillis(), table, tableNamesToDelete);
}
private static interface ColumnMutator {
MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData,
List<Mutation> tableMetadata, Region region,
List<ImmutableBytesPtr> invalidateList, List<RowLock> locks, long clientTimeStamp) throws IOException,
SQLException;
}
private MetaDataMutationResult
mutateColumn(List<Mutation> tableMetadata, ColumnMutator mutator) throws IOException {
byte[][] rowKeyMetaData = new byte[5][];
MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
byte[] tenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
byte[] tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
try {
Region region = env.getRegion();
MetaDataMutationResult result = checkTableKeyInRegion(key, region);
if (result != null) {
return result;
}
List<RowLock> locks = Lists.newArrayList();
try {
acquireLock(region, key, locks);
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();
invalidateList.add(cacheKey);
Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
if (logger.isDebugEnabled()) {
if (table == null) {
logger.debug("Table " + Bytes.toStringBinary(key)
+ " not found in cache. Will build through scan");
} else {
logger.debug("Table " + Bytes.toStringBinary(key)
+ " found in cache with timestamp " + table.getTimeStamp()
+ " seqNum " + table.getSequenceNumber());
}
}
// Get client timeStamp from mutations
long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
if (table == null
&& (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) == null) {
// if not found then call newerTableExists and add delete marker for timestamp
// found
table = buildDeletedTable(key, cacheKey, region, clientTimeStamp);
if (table != null) {
logger.info("Found newer table deleted as of " + table.getTimeStamp() + " versus client timestamp of " + clientTimeStamp);
return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
EnvironmentEdgeManager.currentTimeMillis(), null);
}
return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
EnvironmentEdgeManager.currentTimeMillis(), null);
}
if (table.getTimeStamp() >= clientTimeStamp) {
logger.info("Found newer table as of " + table.getTimeStamp() + " versus client timestamp of " + clientTimeStamp);
return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
EnvironmentEdgeManager.currentTimeMillis(), table);
} else if (isTableDeleted(table)) {
return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
EnvironmentEdgeManager.currentTimeMillis(), null);
}
long expectedSeqNum = MetaDataUtil.getSequenceNumber(tableMetadata) - 1; // lookup
// TABLE_SEQ_NUM
// in
// tableMetaData
if (logger.isDebugEnabled()) {
logger.debug("For table " + Bytes.toStringBinary(key) + " expecting seqNum "
+ expectedSeqNum + " and found seqNum " + table.getSequenceNumber()
+ " with " + table.getColumns().size() + " columns: "
+ table.getColumns());
}
if (expectedSeqNum != table.getSequenceNumber()) {
if (logger.isDebugEnabled()) {
logger.debug("For table " + Bytes.toStringBinary(key)
+ " returning CONCURRENT_TABLE_MUTATION due to unexpected seqNum");
}
return new MetaDataMutationResult(MutationCode.CONCURRENT_TABLE_MUTATION,
EnvironmentEdgeManager.currentTimeMillis(), table);
}
PTableType type = table.getType();
if (type == PTableType.INDEX) {
// Disallow mutation of an index table
return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
EnvironmentEdgeManager.currentTimeMillis(), null);
} else {
// server-side, except for indexing, we always expect the keyvalues to be standard KeyValues
PTableType expectedType = MetaDataUtil.getTableType(tableMetadata, GenericKeyValueBuilder.INSTANCE,
new ImmutableBytesWritable());
// We said to drop a table, but found a view or visa versa
if (type != expectedType) { return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
EnvironmentEdgeManager.currentTimeMillis(), null); }
}
result = mutator.updateMutation(table, rowKeyMetaData, tableMetadata, region,
invalidateList, locks, clientTimeStamp);
// if the update mutation caused tables to be deleted, the mutation code returned will be MutationCode.TABLE_ALREADY_EXISTS
if (result != null && result.getMutationCode()!=MutationCode.TABLE_ALREADY_EXISTS) {
return result;
}
region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
// Invalidate from cache
for (ImmutableBytesPtr invalidateKey : invalidateList) {
metaDataCache.invalidate(invalidateKey);
}
// Get client timeStamp from mutations, since it may get updated by the
// mutateRowsWithLocks call
long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
// if the update mutation caused tables to be deleted just return the result which will contain the table to be deleted
if (result !=null) {
return result;
} else {
table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP);
return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, table);
}
} finally {
region.releaseRowLocks(locks);
}
} catch (Throwable t) {
ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t);
return null; // impossible
}
}
private final class PutWithOrdinalPosition implements Comparable<PutWithOrdinalPosition>{
private final Put put;
private final int ordinalPosition;
public PutWithOrdinalPosition(Put put, int ordinalPos) {
this.put = put;
this.ordinalPosition = ordinalPos;
}
@Override
public int compareTo(PutWithOrdinalPosition o) {
return (this.ordinalPosition < o.ordinalPosition ? -1 : this.ordinalPosition > o.ordinalPosition ? 1 : 0);
}
}
private static int getOrdinalPosition(PTable table, PColumn col) {
return table.getBucketNum() == null ? col.getPosition() + 1 : col.getPosition();
}
private static boolean isDivergedView(PTable view) {
return view.getBaseColumnCount() == QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT;
}
/**
*
* Class to keep track of columns and their ordinal positions as we
* process through the list of columns to be added.
*
*/
private static class ColumnOrdinalPositionUpdateList {
final List<byte[]> columnKeys = new ArrayList<>(10);
int offset;
int size() {
return columnKeys.size();
}
private void setOffset(int lowestOrdinalPos) {
this.offset = lowestOrdinalPos;
}
private void addColumn(byte[] columnKey) {
columnKeys.add(columnKey);
}
private void dropColumn(byte[] columnKey) {
// Check if an entry for this column key exists
int index = -1;
for (int i = 0; i < columnKeys.size(); i++) {
if (Bytes.equals(columnKeys.get(i), columnKey)) {
index = i;
break;
}
}
if (index != -1) {
columnKeys.remove(index);
}
}
private void addColumn(byte[] columnKey, int position) {
checkArgument(position >= this.offset);
int index = position - offset;
int size = columnKeys.size();
checkState(index <= size);
if (size == 0) {
columnKeys.add(columnKey);
return;
}
int stopIndex = size;
// Check if an entry for this column key is already there.
for (int i = 0; i < size; i++) {
if (Bytes.equals(columnKeys.get(i), columnKey)) {
stopIndex = i;
break;
}
}
if (stopIndex == size) {
/*
* The column key is not present in the list. So add it at the specified index
* and right shift the elements at this index and beyond.
*/
columnKeys.add(index, columnKey);
} else {
/*
* The column key is already present in the list.
* Move the elements of the list to the left up to the stop index
*/
for (int i = stopIndex; i > index; i--) {
columnKeys.set(i, columnKeys.get(i - 1));
}
columnKeys.set(index, columnKey);
}
}
private int getOrdinalPositionFromListIdx(int listIndex) {
checkArgument(listIndex < columnKeys.size());
return listIndex + offset;
}
/**
* @param columnKey
* @return if present - the ordinal position of the column in this list.
* If not present - -1.
*/
private int getOrdinalPositionOfColumn(byte[] columnKey) {
int i = 0;
for (byte[] key : columnKeys) {
if (Bytes.equals(key, columnKey)) {
return i + offset;
}
i++;
}
return -1;
}
}
private static byte[] getColumnKey(byte[] viewKey, PColumn column) {
return getColumnKey(viewKey, column.getName().getString(), column.getFamilyName() != null ? column.getFamilyName().getString() : null);
}
private static byte[] getColumnKey(byte[] viewKey, String columnName, String columnFamily) {
byte[] columnKey = ByteUtil.concat(viewKey, QueryConstants.SEPARATOR_BYTE_ARRAY,
Bytes.toBytes(columnName));
if (columnFamily != null) {
columnKey = ByteUtil.concat(columnKey, QueryConstants.SEPARATOR_BYTE_ARRAY,
Bytes.toBytes(columnFamily));
}
return columnKey;
}
private boolean switchAttribute(PTable table, boolean currAttribute, List<Mutation> tableMetaData, byte[] attrQualifier) {
for (Mutation m : tableMetaData) {
if (m instanceof Put) {
Put p = (Put)m;
List<Cell> cells = p.get(TABLE_FAMILY_BYTES, attrQualifier);
if (cells != null && cells.size() > 0) {
Cell cell = cells.get(0);
boolean newAttribute = (boolean)PBoolean.INSTANCE.toObject(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
return currAttribute != newAttribute;
}
}
}
return false;
}
private MetaDataMutationResult addColumnsAndTablePropertiesToChildViews(PTable basePhysicalTable, List<Mutation> tableMetadata, List<Mutation> mutationsForAddingColumnsToViews, byte[] schemaName, byte[] tableName,
List<ImmutableBytesPtr> invalidateList, long clientTimeStamp, TableViewFinderResult childViewsResult,
Region region, List<RowLock> locks) throws IOException, SQLException {
List<PutWithOrdinalPosition> columnPutsForBaseTable = Lists.newArrayListWithExpectedSize(tableMetadata.size());
Map<TableProperty, Cell> tablePropertyCellMap = Maps.newHashMapWithExpectedSize(tableMetadata.size());
// Isolate the puts relevant to adding columns. Also figure out what kind of columns are being added.
for (Mutation m : tableMetadata) {
if (m instanceof Put) {
byte[][] rkmd = new byte[5][];
int pkCount = getVarChars(m.getRow(), rkmd);
// check if this put is for adding a column
if (pkCount > COLUMN_NAME_INDEX
&& rkmd[COLUMN_NAME_INDEX] != null && rkmd[COLUMN_NAME_INDEX].length > 0
&& Bytes.compareTo(schemaName, rkmd[SCHEMA_NAME_INDEX]) == 0
&& Bytes.compareTo(tableName, rkmd[TABLE_NAME_INDEX]) == 0) {
columnPutsForBaseTable.add(new PutWithOrdinalPosition((Put)m, getInteger((Put)m, TABLE_FAMILY_BYTES, ORDINAL_POSITION_BYTES)));
}
// check if the put is for a table property
else if (pkCount <= COLUMN_NAME_INDEX
&& Bytes.compareTo(schemaName, rkmd[SCHEMA_NAME_INDEX]) == 0
&& Bytes.compareTo(tableName, rkmd[TABLE_NAME_INDEX]) == 0) {
for (Cell cell : m.getFamilyCellMap().get(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES)) {
for (TableProperty tableProp : TableProperty.values()) {
byte[] propNameBytes = Bytes.toBytes(tableProp.getPropertyName());
if (Bytes.compareTo(propNameBytes, 0, propNameBytes.length, cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())==0
&& tableProp.isValidOnView() && tableProp.isMutable()) {
Cell tablePropCell = CellUtil.createCell(cell.getRow(), CellUtil.cloneFamily(cell),
CellUtil.cloneQualifier(cell), cell.getTimestamp(), cell.getTypeByte(),
CellUtil.cloneValue(cell));
tablePropertyCellMap.put(tableProp, tablePropCell);
}
}
}
}
}
}
// Sort the puts by ordinal position
Collections.sort(columnPutsForBaseTable);
for (Result viewResult : childViewsResult.getResults()) {
short deltaNumPkColsSoFar = 0;
short columnsAddedToView = 0;
short columnsAddedToBaseTable = 0;
byte[][] rowViewKeyMetaData = new byte[3][];
getVarChars(viewResult.getRow(), 3, rowViewKeyMetaData);
byte[] viewKey = SchemaUtil.getTableKey(rowViewKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX],
rowViewKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX],
rowViewKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]);
// lock the rows corresponding to views so that no other thread can modify the view meta-data
RowLock viewRowLock = acquireLock(region, viewKey, locks);
PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock);
ColumnOrdinalPositionUpdateList ordinalPositionList = new ColumnOrdinalPositionUpdateList();
List<PColumn> viewPkCols = new ArrayList<>(view.getPKColumns());
boolean addingExistingPkCol = false;
int numCols = view.getColumns().size();
// add the new columns to the child view
for (PutWithOrdinalPosition p : columnPutsForBaseTable) {
Put baseTableColumnPut = p.put;
PColumn existingViewColumn = null;
byte[][] rkmd = new byte[5][];
getVarChars(baseTableColumnPut.getRow(), rkmd);
String columnName = Bytes.toString(rkmd[COLUMN_NAME_INDEX]);
String columnFamily = rkmd[FAMILY_NAME_INDEX] == null ? null : Bytes.toString(rkmd[FAMILY_NAME_INDEX]);
try {
existingViewColumn = columnFamily == null ? view.getColumnForColumnName(columnName) : view.getColumnFamily(
columnFamily).getPColumnForColumnName(columnName);
} catch (ColumnFamilyNotFoundException e) {
// ignore since it means that the column family is not present for the column to be added.
} catch (ColumnNotFoundException e) {
// ignore since it means the column is not present in the view
}
boolean isPkCol = columnFamily == null;
byte[] columnKey = getColumnKey(viewKey, columnName, columnFamily);
if (existingViewColumn != null) {
MetaDataMutationResult result = validateColumnForAddToBaseTable(existingViewColumn, baseTableColumnPut, basePhysicalTable, isPkCol, view);
if (result != null) {
return result;
}
if (isPkCol) {
viewPkCols.remove(existingViewColumn);
addingExistingPkCol = true;
}
/*
* For views that are not diverged, we need to make sure that the existing columns
* have the same ordinal position as in the base table. This is important because
* we rely on the ordinal position of the column to figure out whether dropping a
* column from the view will end up diverging the view from the base table.
*
* For already diverged views, we don't care about the ordinal position of the existing column.
*/
if (!isDivergedView(view)) {
int newOrdinalPosition = p.ordinalPosition;
// Check if the ordinal position of the column was getting updated from previous add column
// mutations.
int existingOrdinalPos = ordinalPositionList.getOrdinalPositionOfColumn(columnKey);
if (ordinalPositionList.size() == 0) {
/*
* No ordinal positions to be updated are in the list. In that case, check whether the
* existing ordinal position of the column is different from its new ordinal position.
* If yes, then initialize the ordinal position list with this column's ordinal position
* as the offset.
*/
existingOrdinalPos = getOrdinalPosition(view, existingViewColumn);
if (existingOrdinalPos != newOrdinalPosition) {
ordinalPositionList.setOffset(newOrdinalPosition);
ordinalPositionList.addColumn(columnKey, newOrdinalPosition);
for (PColumn col : view.getColumns()) {
int ordinalPos = getOrdinalPosition(view, col);
if (ordinalPos >= newOrdinalPosition) {
if (ordinalPos == existingOrdinalPos) {
/*
* No need to update ordinal positions of columns beyond the existing column's
* old ordinal position.
*/
break;
}
// increment ordinal position of columns occurring after this column by 1
int updatedPos = ordinalPos + 1;
ordinalPositionList.addColumn(getColumnKey(viewKey, col), updatedPos);
}
}
}
} else {
if (existingOrdinalPos != newOrdinalPosition) {
ordinalPositionList.addColumn(columnKey, newOrdinalPosition);
}
}
columnsAddedToBaseTable++;
}
} else {
// The column doesn't exist in the view.
Put viewColumnPut = new Put(columnKey, clientTimeStamp);
for (Cell cell : baseTableColumnPut.getFamilyCellMap().values().iterator().next()) {
viewColumnPut.add(CellUtil.createCell(columnKey, CellUtil.cloneFamily(cell),
CellUtil.cloneQualifier(cell), cell.getTimestamp(), cell.getTypeByte(),
CellUtil.cloneValue(cell)));
}
if (isDivergedView(view)) {
if (isPkCol) {
/*
* Only pk cols of the base table are added to the diverged views. These pk
* cols are added at the end.
*/
int lastOrdinalPos = getOrdinalPosition(view, view.getColumns().get(numCols - 1));
int newPosition = ++lastOrdinalPos;
byte[] ptr = new byte[PInteger.INSTANCE.getByteSize()];
PInteger.INSTANCE.getCodec().encodeInt(newPosition, ptr, 0);
viewColumnPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES, clientTimeStamp, ptr);
mutationsForAddingColumnsToViews.add(viewColumnPut);
} else {
continue; // move on to the next column
}
} else {
int newOrdinalPosition = p.ordinalPosition;
/*
* For a non-diverged view, we need to make sure that the base table column
* is added at the right position.
*/
if (ordinalPositionList.size() == 0) {
ordinalPositionList.setOffset(newOrdinalPosition);
ordinalPositionList.addColumn(columnKey, newOrdinalPosition);
for (PColumn col : view.getColumns()) {
int ordinalPos = getOrdinalPosition(view, col);
if (ordinalPos >= newOrdinalPosition) {
// increment ordinal position of columns by 1
int updatedPos = ordinalPos + 1;
ordinalPositionList.addColumn(getColumnKey(viewKey, col), updatedPos);
}
}
} else {
ordinalPositionList.addColumn(columnKey, newOrdinalPosition);
}
mutationsForAddingColumnsToViews.add(viewColumnPut);
}
if (isPkCol) {
deltaNumPkColsSoFar++;
// Set the key sequence for the pk column to be added
short currentKeySeq = SchemaUtil.getMaxKeySeq(view);
short newKeySeq = (short)(currentKeySeq + deltaNumPkColsSoFar);
byte[] keySeqBytes = new byte[PSmallint.INSTANCE.getByteSize()];
PSmallint.INSTANCE.getCodec().encodeShort(newKeySeq, keySeqBytes, 0);
viewColumnPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.KEY_SEQ_BYTES, keySeqBytes);
addMutationsForAddingPkColsToViewIndexes(mutationsForAddingColumnsToViews, clientTimeStamp, view,
deltaNumPkColsSoFar, columnName, viewColumnPut);
}
columnsAddedToView++;
columnsAddedToBaseTable++;
}
}
/*
* Allow adding a pk columns to base table : 1. if all the view pk columns are exactly the same as the base
* table pk columns 2. if we are adding all the existing view pk columns to the base table
*/
if (addingExistingPkCol && !viewPkCols.equals(basePhysicalTable.getPKColumns())) {
return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
}
addViewIndexesHeaderRowMutations(mutationsForAddingColumnsToViews, invalidateList, clientTimeStamp, view,
deltaNumPkColsSoFar);
// set table properties in child view
if (!tablePropertyCellMap.isEmpty()) {
Put viewHeaderRowPut = new Put(viewKey, clientTimeStamp);
for (TableProperty tableProp : TableProperty.values()) {
Cell tablePropertyCell = tablePropertyCellMap.get(tableProp);
if ( tablePropertyCell != null) {
// set this table property on the view if it is not mutable on a view (which means the property is always the same as the base table)
// or if it is mutable on a view and the property value is the same as the base table property (which means it wasn't changed on the view)
if (!tableProp.isMutableOnView() || tableProp.getPTableValue(view).equals(tableProp.getPTableValue(basePhysicalTable))) {
viewHeaderRowPut.add(CellUtil.createCell(viewKey, CellUtil.cloneFamily(tablePropertyCell),
CellUtil.cloneQualifier(tablePropertyCell), clientTimeStamp, tablePropertyCell.getTypeByte(),
CellUtil.cloneValue(tablePropertyCell)));
}
}
}
byte[] viewSequencePtr = new byte[PLong.INSTANCE.getByteSize()];
PLong.INSTANCE.getCodec().encodeLong(view.getSequenceNumber() + 1, viewSequencePtr, 0);
viewHeaderRowPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, clientTimeStamp, viewSequencePtr);
// invalidate the view so that it is removed from the cache
invalidateList.add(new ImmutableBytesPtr(viewKey));
mutationsForAddingColumnsToViews.add(viewHeaderRowPut);
}
/*
* Increment the sequence number by 1 if:
* 1) For a diverged view, there were columns (pk columns) added to the view.
* 2) For a non-diverged view if the base column count changed.
*/
boolean changeSequenceNumber = (isDivergedView(view) && columnsAddedToView > 0)
|| (!isDivergedView(view) && columnsAddedToBaseTable > 0);
updateViewHeaderRow(basePhysicalTable, tableMetadata, mutationsForAddingColumnsToViews,
invalidateList, clientTimeStamp, columnsAddedToView, columnsAddedToBaseTable,
viewKey, view, ordinalPositionList, numCols, changeSequenceNumber);
}
return null;
}
/**
* Updates the base table column count, column count, sequence number and ordinal postions of
* columns of the view based on the columns being added or dropped.
*/
private void updateViewHeaderRow(PTable basePhysicalTable, List<Mutation> tableMetadata,
List<Mutation> mutationsForAddingColumnsToViews,
List<ImmutableBytesPtr> invalidateList, long clientTimeStamp, short viewColumnDelta,
short baseTableColumnDelta, byte[] viewKey, PTable view,
ColumnOrdinalPositionUpdateList ordinalPositionList, int numCols, boolean changeSequenceNumber) {
// Update the view header rows with new column counts.
Put viewHeaderRowPut = new Put(viewKey, clientTimeStamp);
if (!isDivergedView(view) && baseTableColumnDelta != 0) {
// Base column count should only be updated for diverged views.
int oldBaseColumnCount = view.getBaseColumnCount();
byte[] baseColumnCountPtr = new byte[PInteger.INSTANCE.getByteSize()];
PInteger.INSTANCE.getCodec().encodeInt(oldBaseColumnCount + baseTableColumnDelta, baseColumnCountPtr, 0);
viewHeaderRowPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES, clientTimeStamp, baseColumnCountPtr);
}
if (viewColumnDelta != 0) {
byte[] columnCountPtr = new byte[PInteger.INSTANCE.getByteSize()];
PInteger.INSTANCE.getCodec().encodeInt(numCols + viewColumnDelta, columnCountPtr, 0);
viewHeaderRowPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES, clientTimeStamp, columnCountPtr);
}
if (changeSequenceNumber) {
byte[] viewSequencePtr = new byte[PLong.INSTANCE.getByteSize()];
PLong.INSTANCE.getCodec().encodeLong(view.getSequenceNumber() + 1, viewSequencePtr, 0);
viewHeaderRowPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, clientTimeStamp, viewSequencePtr);
mutationsForAddingColumnsToViews.add(viewHeaderRowPut);
// only invalidate if the sequence number is about to change
invalidateList.add(new ImmutableBytesPtr(viewKey));
// Update the ordinal positions. The list would be non-empty only if the sequence
// number will change.
int i = 0;
for (byte[] columnKey : ordinalPositionList.columnKeys) {
int ordinalPosition = ordinalPositionList.getOrdinalPositionFromListIdx(i);
Put positionUpdatePut = new Put(columnKey, clientTimeStamp);
byte[] ptr = new byte[PInteger.INSTANCE.getByteSize()];
PInteger.INSTANCE.getCodec().encodeInt(ordinalPosition, ptr, 0);
positionUpdatePut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES, clientTimeStamp, ptr);
mutationsForAddingColumnsToViews.add(positionUpdatePut);
i++;
}
}
if (view.rowKeyOrderOptimizable()) {
UpgradeUtil.addRowKeyOrderOptimizableCell(mutationsForAddingColumnsToViews, viewKey, clientTimeStamp);
}
// if switching from from non tx to tx
if (!basePhysicalTable.isTransactional() && switchAttribute(basePhysicalTable, basePhysicalTable.isTransactional(), tableMetadata, TRANSACTIONAL_BYTES)) {
invalidateList.add(new ImmutableBytesPtr(viewKey));
Put put = new Put(viewKey);
put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
TRANSACTIONAL_BYTES, clientTimeStamp, PBoolean.INSTANCE.toBytes(true));
mutationsForAddingColumnsToViews.add(put);
}
}
private class ColumnFinder extends StatelessTraverseAllExpressionVisitor<Void> {
private boolean columnFound;
private final Expression columnExpression;
public ColumnFinder(Expression columnExpression) {
this.columnExpression = columnExpression;
columnFound = false;
}
private Void process(Expression expression) {
if (expression.equals(columnExpression)) {
columnFound = true;
}
return null;
}
@Override
public Void visit(KeyValueColumnExpression expression) {
return process(expression);
}
@Override
public Void visit(RowKeyColumnExpression expression) {
return process(expression);
}
@Override
public Void visit(ProjectedColumnExpression expression) {
return process(expression);
}
public boolean getColumnFound() {
return columnFound;
}
}
private MetaDataMutationResult dropColumnsFromChildViews(Region region,
PTable basePhysicalTable, List<RowLock> locks, List<Mutation> tableMetadata,
List<Mutation> mutationsForAddingColumnsToViews, byte[] schemaName, byte[] tableName,
List<ImmutableBytesPtr> invalidateList, long clientTimeStamp,
TableViewFinderResult childViewsResult, List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete)
throws IOException, SQLException {
List<Delete> columnDeletesForBaseTable = new ArrayList<>(tableMetadata.size());
// Isolate the deletes relevant to dropping columns. Also figure out what kind of columns
// are being added.
for (Mutation m : tableMetadata) {
if (m instanceof Delete) {
byte[][] rkmd = new byte[5][];
int pkCount = getVarChars(m.getRow(), rkmd);
if (pkCount > COLUMN_NAME_INDEX
&& Bytes.compareTo(schemaName, rkmd[SCHEMA_NAME_INDEX]) == 0
&& Bytes.compareTo(tableName, rkmd[TABLE_NAME_INDEX]) == 0) {
columnDeletesForBaseTable.add((Delete) m);
}
}
}
for (Result viewResult : childViewsResult.getResults()) {
short numColsDeleted = 0;
byte[][] rowViewKeyMetaData = new byte[3][];
getVarChars(viewResult.getRow(), 3, rowViewKeyMetaData);
byte[] viewKey =
SchemaUtil.getTableKey(
rowViewKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX],
rowViewKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX],
rowViewKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]);
// lock the rows corresponding to views so that no other thread can modify the view
// meta-data
RowLock viewRowLock = acquireLock(region, viewKey, locks);
PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock);
ColumnOrdinalPositionUpdateList ordinalPositionList =
new ColumnOrdinalPositionUpdateList();
int numCols = view.getColumns().size();
int minDroppedColOrdinalPos = Integer.MAX_VALUE;
for (Delete columnDeleteForBaseTable : columnDeletesForBaseTable) {
PColumn existingViewColumn = null;
byte[][] rkmd = new byte[5][];
getVarChars(columnDeleteForBaseTable.getRow(), rkmd);
String columnName = Bytes.toString(rkmd[COLUMN_NAME_INDEX]);
String columnFamily =
rkmd[FAMILY_NAME_INDEX] == null ? null : Bytes
.toString(rkmd[FAMILY_NAME_INDEX]);
byte[] columnKey = getColumnKey(viewKey, columnName, columnFamily);
try {
existingViewColumn =
columnFamily == null ? view.getColumnForColumnName(columnName) : view
.getColumnFamily(columnFamily).getPColumnForColumnName(columnName);
} catch (ColumnFamilyNotFoundException e) {
// ignore since it means that the column family is not present for the column to
// be added.
} catch (ColumnNotFoundException e) {
// ignore since it means the column is not present in the view
}
// check if the view where expression contains the column being dropped and prevent
// it
if (existingViewColumn != null && view.getViewStatement() != null) {
ParseNode viewWhere =
new SQLParser(view.getViewStatement()).parseQuery().getWhere();
PhoenixConnection conn=null;
try {
conn = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(
PhoenixConnection.class);
} catch (ClassNotFoundException e) {
}
PhoenixStatement statement = new PhoenixStatement(conn);
TableRef baseTableRef = new TableRef(basePhysicalTable);
ColumnResolver columnResolver = FromCompiler.getResolver(baseTableRef);
StatementContext context = new StatementContext(statement, columnResolver);
Expression whereExpression = WhereCompiler.compile(context, viewWhere);
Expression colExpression =
new ColumnRef(baseTableRef, existingViewColumn.getPosition())
.newColumnExpression();
ColumnFinder columnFinder = new ColumnFinder(colExpression);
whereExpression.accept(columnFinder);
if (columnFinder.getColumnFound()) {
return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
}
}
minDroppedColOrdinalPos =
Math.min(getOrdinalPosition(view, existingViewColumn),
minDroppedColOrdinalPos);
if (existingViewColumn != null) {
--numColsDeleted;
if (ordinalPositionList.size() == 0) {
ordinalPositionList.setOffset(view.getBucketNum() == null ? 1 : 0);
for (PColumn col : view.getColumns()) {
ordinalPositionList.addColumn(getColumnKey(viewKey, col));
}
}
ordinalPositionList.dropColumn(columnKey);
Delete viewColumnDelete = new Delete(columnKey, clientTimeStamp);
mutationsForAddingColumnsToViews.add(viewColumnDelete);
// drop any view indexes that need this column
dropIndexes(view, region, invalidateList, locks, clientTimeStamp,
schemaName, view.getName().getBytes(),
mutationsForAddingColumnsToViews, existingViewColumn,
tableNamesToDelete, sharedTablesToDelete);
}
}
updateViewHeaderRow(basePhysicalTable, tableMetadata, mutationsForAddingColumnsToViews,
invalidateList, clientTimeStamp, numColsDeleted, numColsDeleted, viewKey, view,
ordinalPositionList, numCols, true);
}
return null;
}
private MetaDataMutationResult validateColumnForAddToBaseTable(PColumn existingViewColumn, Put columnToBeAdded, PTable basePhysicalTable, boolean isColumnToBeAddPkCol, PTable view) {
if (existingViewColumn != null) {
if (EncodedColumnsUtil.usesEncodedColumnNames(basePhysicalTable) && !SchemaUtil.isPKColumn(existingViewColumn)) {
/*
* If the column already exists in a view, then we cannot add the column to the base
* table. The reason is subtle and is as follows: consider the case where a table
* has two views where both the views have the same key value column KV. Now, we
* dole out encoded column qualifiers for key value columns in views by using the
* counters stored in the base physical table. So the KV column can have different
* column qualifiers for the two views. For example, 11 for VIEW1 and 12 for VIEW2.
* This naturally extends to rows being inserted using the two views having
* different column qualifiers for the column named KV. Now, when an attempt is made
* to add column KV to the base table, we cannot decide which column qualifier
* should that column be assigned. It cannot be a number different than 11 or 12
* since a query like SELECT KV FROM BASETABLE would return null for KV which is
* incorrect since column KV is present in rows inserted from the two views. We
* cannot use 11 or 12 either because we will then incorrectly return value of KV
* column inserted using only one view.
*/
return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
}
// Validate data type is same
int baseColumnDataType = getInteger(columnToBeAdded, TABLE_FAMILY_BYTES, DATA_TYPE_BYTES);
if (baseColumnDataType != existingViewColumn.getDataType().getSqlType()) {
return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
}
// Validate max length is same
int maxLength = getInteger(columnToBeAdded, TABLE_FAMILY_BYTES, COLUMN_SIZE_BYTES);
int existingMaxLength = existingViewColumn.getMaxLength() == null ? 0 : existingViewColumn.getMaxLength();
if (maxLength != existingMaxLength) {
return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
}
// Validate scale is same
int scale = getInteger(columnToBeAdded, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
int existingScale = existingViewColumn.getScale() == null ? 0 : existingViewColumn.getScale();
if (scale != existingScale) {
return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
}
// Validate sort order is same
int sortOrder = getInteger(columnToBeAdded, TABLE_FAMILY_BYTES, SORT_ORDER_BYTES);
if (sortOrder != existingViewColumn.getSortOrder().getSystemValue()) {
return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
}
// if the column to be added to the base table is a pk column, then we need to validate that the key slot position is the same
if (isColumnToBeAddPkCol) {
List<Cell> keySeqCells = columnToBeAdded.get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.KEY_SEQ_BYTES);
if (keySeqCells != null && keySeqCells.size() > 0) {
Cell cell = keySeqCells.get(0);
int keySeq = PSmallint.INSTANCE.getCodec().decodeInt(cell.getValueArray(), cell.getValueOffset(),
SortOrder.getDefault());
int pkPosition = SchemaUtil.getPKPosition(view, existingViewColumn) + 1;
if (pkPosition != keySeq) { return new MetaDataMutationResult(
MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable); }
}
}
}
return null;
}
private int getInteger(Put p, byte[] family, byte[] qualifier) {
List<Cell> cells = p.get(family, qualifier);
if (cells != null && cells.size() > 0) {
Cell cell = cells.get(0);
return (Integer)PInteger.INSTANCE.toObject(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
}
return 0;
}
private void addViewIndexesHeaderRowMutations(List<Mutation> mutationsForAddingColumnsToViews,
List<ImmutableBytesPtr> invalidateList, long clientTimeStamp, PTable view, short deltaNumPkColsSoFar) {
if (deltaNumPkColsSoFar > 0) {
for (PTable index : view.getIndexes()) {
byte[] indexHeaderRowKey = getViewIndexHeaderRowKey(index);
Put indexHeaderRowMutation = new Put(indexHeaderRowKey);
// increment sequence number
long newSequenceNumber = index.getSequenceNumber() + 1;
byte[] newSequenceNumberPtr = new byte[PLong.INSTANCE.getByteSize()];
PLong.INSTANCE.getCodec().encodeLong(newSequenceNumber, newSequenceNumberPtr, 0);
indexHeaderRowMutation.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, newSequenceNumberPtr);
// increase the column count
int newColumnCount = index.getColumns().size() + deltaNumPkColsSoFar;
byte[] newColumnCountPtr = new byte[PInteger.INSTANCE.getByteSize()];
PInteger.INSTANCE.getCodec().encodeInt(newColumnCount, newColumnCountPtr, 0);
indexHeaderRowMutation.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES, newColumnCountPtr);
// add index row header key to the invalidate list to force clients to fetch the latest meta-data
invalidateList.add(new ImmutableBytesPtr(indexHeaderRowKey));
if (index.rowKeyOrderOptimizable()) {
UpgradeUtil.addRowKeyOrderOptimizableCell(mutationsForAddingColumnsToViews, indexHeaderRowKey, clientTimeStamp);
}
mutationsForAddingColumnsToViews.add(indexHeaderRowMutation);
}
}
}
private void addMutationsForAddingPkColsToViewIndexes(List<Mutation> mutationsForAddingColumnsToViews, long clientTimeStamp,
PTable view, short deltaNumPkColsSoFar, String viewPkColumnName, Put viewColumnDefinitionPut) {
for (PTable index : view.getIndexes()) {
int oldNumberOfColsInIndex = index.getColumns().size();
byte[] indexColumnKey = ByteUtil.concat(getViewIndexHeaderRowKey(index), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(IndexUtil.getIndexColumnName(null, viewPkColumnName)));
Put indexColumnDefinitionPut = new Put(indexColumnKey, clientTimeStamp);
// Set the index specific data type for the column
int viewPkColumnDataType = getInteger(viewColumnDefinitionPut, TABLE_FAMILY_BYTES, DATA_TYPE_BYTES);
byte[] indexColumnDataTypeBytes = new byte[PInteger.INSTANCE.getByteSize()];
int indexColumnDataType = IndexUtil.getIndexColumnDataType(true,
PDataType.fromTypeId(viewPkColumnDataType)).getSqlType();
PInteger.INSTANCE.getCodec().encodeInt(indexColumnDataType, indexColumnDataTypeBytes, 0);
indexColumnDefinitionPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.DATA_TYPE_BYTES, indexColumnDataTypeBytes);
// Set precision
List<Cell> decimalDigits = viewColumnDefinitionPut.get(
PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.DECIMAL_DIGITS_BYTES);
if (decimalDigits != null && decimalDigits.size() > 0) {
Cell decimalDigit = decimalDigits.get(0);
indexColumnDefinitionPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.DECIMAL_DIGITS_BYTES, decimalDigit.getValueArray());
}
// Set size
List<Cell> columnSizes = viewColumnDefinitionPut.get(
PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES);
if (columnSizes != null && columnSizes.size() > 0) {
Cell columnSize = columnSizes.get(0);
indexColumnDefinitionPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES, columnSize.getValueArray());
}
// Set sort order
List<Cell> sortOrders = viewColumnDefinitionPut.get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.SORT_ORDER_BYTES);
if (sortOrders != null && sortOrders.size() > 0) {
Cell sortOrder = sortOrders.get(0);
indexColumnDefinitionPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.SORT_ORDER_BYTES, sortOrder.getValueArray());
}
// Set data table name
List<Cell> dataTableNames = viewColumnDefinitionPut.get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
if (dataTableNames != null && dataTableNames.size() > 0) {
Cell dataTableName = dataTableNames.get(0);
indexColumnDefinitionPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES, dataTableName.getValueArray());
}
// Set the ordinal position of the new column.
byte[] ordinalPositionBytes = new byte[PInteger.INSTANCE.getByteSize()];
int ordinalPositionOfNewCol = oldNumberOfColsInIndex + deltaNumPkColsSoFar;
PInteger.INSTANCE.getCodec().encodeInt(ordinalPositionOfNewCol, ordinalPositionBytes, 0);
indexColumnDefinitionPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES, ordinalPositionBytes);
// New PK columns have to be nullable after the first DDL
byte[] isNullableBytes = PBoolean.INSTANCE.toBytes(true);
indexColumnDefinitionPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.NULLABLE_BYTES, isNullableBytes);
// Set the key sequence for the pk column to be added
short currentKeySeq = SchemaUtil.getMaxKeySeq(index);
short newKeySeq = (short)(currentKeySeq + deltaNumPkColsSoFar);
byte[] keySeqBytes = new byte[PSmallint.INSTANCE.getByteSize()];
PSmallint.INSTANCE.getCodec().encodeShort(newKeySeq, keySeqBytes, 0);
indexColumnDefinitionPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.KEY_SEQ_BYTES, keySeqBytes);
mutationsForAddingColumnsToViews.add(indexColumnDefinitionPut);
}
}
private byte[] getViewIndexHeaderRowKey(PTable index) {
byte[] tenantIdBytes = index.getKey().getTenantId() != null ? index.getKey().getTenantId().getBytes() : EMPTY_BYTE_ARRAY;
byte[] schemaNameBytes = index.getSchemaName() != null ? index.getSchemaName().getBytes() : EMPTY_BYTE_ARRAY;
byte[] tableNameBytes = index.getTableName().getBytes();
return ByteUtil.concat(tenantIdBytes, SEPARATOR_BYTE_ARRAY, schemaNameBytes, SEPARATOR_BYTE_ARRAY, tableNameBytes);
}
@Override
public void addColumn(RpcController controller, final AddColumnRequest request,
RpcCallback<MetaDataResponse> done) {
try {
List<Mutation> tableMetaData = ProtobufUtil.getMutations(request);
MetaDataMutationResult result = mutateColumn(tableMetaData, new ColumnMutator() {
@Override
public MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData,
List<Mutation> tableMetaData, Region region, List<ImmutableBytesPtr> invalidateList,
List<RowLock> locks, long clientTimeStamp) throws IOException, SQLException {
byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX];
byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
PTableType type = table.getType();
byte[] tableHeaderRowKey = SchemaUtil.getTableKey(tenantId,
schemaName, tableName);
// Size for worst case - all new columns are PK column
List<Mutation> mutationsForAddingColumnsToViews = Lists.newArrayListWithExpectedSize(tableMetaData.size() * ( 1 + table.getIndexes().size()));
/*
* If adding a column to a view, we don't want to propagate those meta-data changes to the child
* view hierarchy. This is because our check of finding child views is expensive and we want making
* meta-data changes to views to be light-weight. The side-effect of this change is that a child
* won't have it's parent views columns i.e. it would have diverged itself from the parent view. See
* https://issues.apache.org/jira/browse/PHOENIX-2051 for a proper way to fix the performance issue
* and https://issues.apache.org/jira/browse/PHOENIX-2054 for enabling meta-data changes to a view
* to be propagated to its view hierarchy.
*/
if (type == PTableType.TABLE || type == PTableType.SYSTEM) {
TableViewFinderResult childViewsResult = findChildViews(region, tenantId, table, PHYSICAL_TABLE_BYTES);
if (childViewsResult.hasViews()) {
/*
* Dis-allow if:
* 1) The meta-data for child view/s spans over
* more than one region (since the changes cannot be made in a transactional fashion)
*
* 2) The base column count is 0 which means that the metadata hasn't been upgraded yet or
* the upgrade is currently in progress.
*
* 3) If the request is from a client that is older than 4.5 version of phoenix.
* Starting from 4.5, metadata requests have the client version included in them.
* We don't want to allow clients before 4.5 to add a column to the base table if it has views.
*
* 4) Trying to swtich tenancy of a table that has views
*/
if (!childViewsResult.allViewsInSingleRegion()
|| table.getBaseColumnCount() == 0
|| !request.hasClientVersion()
|| switchAttribute(table, table.isMultiTenant(), tableMetaData, MULTI_TENANT_BYTES)) {
return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
EnvironmentEdgeManager.currentTimeMillis(), null);
} else {
mutationsForAddingColumnsToViews = new ArrayList<>(childViewsResult.getResults().size() * tableMetaData.size());
MetaDataMutationResult mutationResult = addColumnsAndTablePropertiesToChildViews(table, tableMetaData, mutationsForAddingColumnsToViews, schemaName, tableName, invalidateList, clientTimeStamp,
childViewsResult, region, locks);
// return if we were not able to add the column successfully
if (mutationResult!=null)
return mutationResult;
}
}
} else if (type == PTableType.VIEW
&& EncodedColumnsUtil.usesEncodedColumnNames(table)) {
/*
* When adding a column to a view that uses encoded column name scheme, we
* need to modify the CQ counters stored in the view's physical table. So to
* make sure clients get the latest PTable, we need to invalidate the cache
* entry.
*/
invalidateList.add(new ImmutableBytesPtr(MetaDataUtil
.getPhysicalTableRowForView(table)));
}
for (Mutation m : tableMetaData) {
byte[] key = m.getRow();
boolean addingPKColumn = false;
int pkCount = getVarChars(key, rowKeyMetaData);
if (pkCount > COLUMN_NAME_INDEX
&& Bytes.compareTo(schemaName, rowKeyMetaData[SCHEMA_NAME_INDEX]) == 0
&& Bytes.compareTo(tableName, rowKeyMetaData[TABLE_NAME_INDEX]) == 0) {
try {
if (pkCount > FAMILY_NAME_INDEX
&& rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
PColumnFamily family =
table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
} else if (pkCount > COLUMN_NAME_INDEX
&& rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
addingPKColumn = true;
table.getPKColumn(new String(
rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]));
} else {
continue;
}
return new MetaDataMutationResult(
MutationCode.COLUMN_ALREADY_EXISTS, EnvironmentEdgeManager
.currentTimeMillis(), table);
} catch (ColumnFamilyNotFoundException e) {
continue;
} catch (ColumnNotFoundException e) {
if (addingPKColumn) {
// We may be adding a DESC column, so if table is already
// able to be rowKeyOptimized, it should continue to be so.
if (table.rowKeyOrderOptimizable()) {
UpgradeUtil.addRowKeyOrderOptimizableCell(mutationsForAddingColumnsToViews, tableHeaderRowKey, clientTimeStamp);
} else if (table.getType() == PTableType.VIEW){
// Don't allow view PK to diverge from table PK as our upgrade code
// does not handle this.
return new MetaDataMutationResult(
MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager
.currentTimeMillis(), null);
}
// Add all indexes to invalidate list, as they will all be
// adding the same PK column. No need to lock them, as we
// have the parent table lock at this point.
for (PTable index : table.getIndexes()) {
invalidateList.add(new ImmutableBytesPtr(SchemaUtil
.getTableKey(tenantId, index.getSchemaName()
.getBytes(), index.getTableName()
.getBytes())));
// We may be adding a DESC column, so if index is already
// able to be rowKeyOptimized, it should continue to be so.
if (index.rowKeyOrderOptimizable()) {
byte[] indexHeaderRowKey = SchemaUtil.getTableKey(index.getTenantId() == null ? ByteUtil.EMPTY_BYTE_ARRAY : index.getTenantId().getBytes(),
index.getSchemaName().getBytes(), index.getTableName().getBytes());
UpgradeUtil.addRowKeyOrderOptimizableCell(mutationsForAddingColumnsToViews, indexHeaderRowKey, clientTimeStamp);
}
}
}
continue;
}
} else if (pkCount == COLUMN_NAME_INDEX &&
! (Bytes.compareTo(schemaName, rowKeyMetaData[SCHEMA_NAME_INDEX]) == 0 &&
Bytes.compareTo(tableName, rowKeyMetaData[TABLE_NAME_INDEX]) == 0 ) ) {
// Invalidate any table with mutations
// TODO: this likely means we don't need the above logic that
// loops through the indexes if adding a PK column, since we'd
// always have header rows for those.
invalidateList.add(new ImmutableBytesPtr(SchemaUtil
.getTableKey(tenantId,
rowKeyMetaData[SCHEMA_NAME_INDEX],
rowKeyMetaData[TABLE_NAME_INDEX])));
}
}
tableMetaData.addAll(mutationsForAddingColumnsToViews);
return null;
}
});
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
}
} catch (IOException ioe) {
ProtobufUtil.setControllerException(controller, ioe);
}
}
private PTable doGetTable(byte[] key, long clientTimeStamp) throws IOException, SQLException {
return doGetTable(key, clientTimeStamp, null);
}
private PTable doGetTable(byte[] key, long clientTimeStamp, RowLock rowLock) throws IOException, SQLException {
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
GlobalCache.getInstance(this.env).getMetaDataCache();
// Ask Lars about the expense of this call - if we don't take the lock, we still won't get
// partial results
// get the co-processor environment
// TODO: check that key is within region.getStartKey() and region.getEndKey()
// and return special code to force client to lookup region from meta.
Region region = env.getRegion();
/*
* Lock directly on key, though it may be an index table. This will just prevent a table
* from getting rebuilt too often.
*/
final boolean wasLocked = (rowLock != null);
boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
if (!wasLocked) {
rowLock = region.getRowLock(key, false);
if (rowLock == null) {
throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
}
}
try {
PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
// We only cache the latest, so we'll end up building the table with every call if the
// client connection has specified an SCN.
// TODO: If we indicate to the client that we're returning an older version, but there's a
// newer version available, the client
// can safely not call this, since we only allow modifications to the latest.
if (table != null && table.getTimeStamp() < clientTimeStamp) {
// Table on client is up-to-date with table on server, so just return
if (isTableDeleted(table)) {
return null;
}
return table;
}
// Try cache again in case we were waiting on a lock
table = (PTable)metaDataCache.getIfPresent(cacheKey);
// We only cache the latest, so we'll end up building the table with every call if the
// client connection has specified an SCN.
// TODO: If we indicate to the client that we're returning an older version, but there's
// a newer version available, the client
// can safely not call this, since we only allow modifications to the latest.
if (table != null && table.getTimeStamp() < clientTimeStamp) {
// Table on client is up-to-date with table on server, so just return
if (isTableDeleted(table)) {
return null;
}
return table;
}
// Query for the latest table first, since it's not cached
table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP);
if ((table != null && table.getTimeStamp() < clientTimeStamp) ||
(blockWriteRebuildIndex && table.getIndexDisableTimestamp() > 0)) {
return table;
}
// Otherwise, query for an older version of the table - it won't be cached
return buildTable(key, cacheKey, region, clientTimeStamp);
} finally {
if (!wasLocked) rowLock.release();
}
}
private List<PFunction> doGetFunctions(List<byte[]> keys, long clientTimeStamp) throws IOException, SQLException {
Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
GlobalCache.getInstance(this.env).getMetaDataCache();
Region region = env.getRegion();
Collections.sort(keys, new Comparator<byte[]>() {
@Override
public int compare(byte[] o1, byte[] o2) {
return Bytes.compareTo(o1, o2);
}
});
/*
* Lock directly on key, though it may be an index table. This will just prevent a table
* from getting rebuilt too often.
*/
List<RowLock> rowLocks = new ArrayList<Region.RowLock>(keys.size());;
try {
rowLocks = new ArrayList<Region.RowLock>(keys.size());
for (int i = 0; i < keys.size(); i++) {
Region.RowLock rowLock = region.getRowLock(keys.get(i), false);
if (rowLock == null) {
throw new IOException("Failed to acquire lock on "
+ Bytes.toStringBinary(keys.get(i)));
}
rowLocks.add(rowLock);
}
List<PFunction> functionsAvailable = new ArrayList<PFunction>(keys.size());
int numFunctions = keys.size();
Iterator<byte[]> iterator = keys.iterator();
while(iterator.hasNext()) {
byte[] key = iterator.next();
PFunction function = (PFunction)metaDataCache.getIfPresent(new FunctionBytesPtr(key));
if (function != null && function.getTimeStamp() < clientTimeStamp) {
if (isFunctionDeleted(function)) {
return null;
}
functionsAvailable.add(function);
iterator.remove();
}
}
if(functionsAvailable.size() == numFunctions) return functionsAvailable;
// Query for the latest table first, since it's not cached
List<PFunction> buildFunctions =
buildFunctions(keys, region, clientTimeStamp, false,
Collections.<Mutation> emptyList());
if(buildFunctions == null || buildFunctions.isEmpty()) {
return null;
}
functionsAvailable.addAll(buildFunctions);
if(functionsAvailable.size() == numFunctions) return functionsAvailable;
return null;
} finally {
for (Region.RowLock lock : rowLocks) {
lock.release();
}
rowLocks.clear();
}
}
@Override
public void dropColumn(RpcController controller, DropColumnRequest request,
RpcCallback<MetaDataResponse> done) {
List<Mutation> tableMetaData = null;
final List<byte[]> tableNamesToDelete = Lists.newArrayList();
final List<SharedTableState> sharedTablesToDelete = Lists.newArrayList();
try {
tableMetaData = ProtobufUtil.getMutations(request);
MetaDataMutationResult result = mutateColumn(tableMetaData, new ColumnMutator() {
@Override
public MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData,
List<Mutation> tableMetaData, Region region,
List<ImmutableBytesPtr> invalidateList, List<RowLock> locks, long clientTimeStamp)
throws IOException, SQLException {
byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX];
byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
boolean deletePKColumn = false;
List<Mutation> additionalTableMetaData = Lists.newArrayList();
PTableType type = table.getType();
// Only tables may have views, so prevent the running of this potentially
// expensive full table scan over the SYSTEM.CATALOG table unless it's needed.
// In the case of a view, we allow a column to be dropped without checking for
// child views, but in the future we'll allow it and propagate it as necessary.
if (type == PTableType.TABLE || type == PTableType.SYSTEM) {
TableViewFinderResult childViewsResult =
findChildViews(region, tenantId, table, PHYSICAL_TABLE_BYTES);
if (childViewsResult.hasViews()) {
MetaDataMutationResult mutationResult =
dropColumnsFromChildViews(region, table,
locks, tableMetaData, additionalTableMetaData,
schemaName, tableName, invalidateList,
clientTimeStamp, childViewsResult, tableNamesToDelete, sharedTablesToDelete);
// return if we were not able to drop the column successfully
if (mutationResult != null) return mutationResult;
}
}
for (Mutation m : tableMetaData) {
if (m instanceof Delete) {
byte[] key = m.getRow();
int pkCount = getVarChars(key, rowKeyMetaData);
if (pkCount > COLUMN_NAME_INDEX
&& Bytes.compareTo(schemaName,
rowKeyMetaData[SCHEMA_NAME_INDEX]) == 0
&& Bytes.compareTo(tableName, rowKeyMetaData[TABLE_NAME_INDEX]) == 0) {
PColumn columnToDelete = null;
try {
if (pkCount > FAMILY_NAME_INDEX
&& rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
PColumnFamily family =
table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
columnToDelete =
family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
} else if (pkCount > COLUMN_NAME_INDEX
&& rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
deletePKColumn = true;
columnToDelete = table.getPKColumn(new String(
rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]));
} else {
continue;
}
if (table.getType() == PTableType.VIEW) {
if (table.getBaseColumnCount() != DIVERGED_VIEW_BASE_COLUMN_COUNT
&& columnToDelete.getPosition() < table.getBaseColumnCount()) {
/*
* If the column being dropped is inherited from the base table, then the
* view is about to diverge itself from the base table. The consequence of
* this divergence is that that any further meta-data changes made to the
* base table will not be propagated to the hierarchy of views where this
* view is the root.
*/
byte[] viewKey = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
Put updateBaseColumnCountPut = new Put(viewKey);
byte[] baseColumnCountPtr = new byte[PInteger.INSTANCE.getByteSize()];
PInteger.INSTANCE.getCodec().encodeInt(DIVERGED_VIEW_BASE_COLUMN_COUNT,
baseColumnCountPtr, 0);
updateBaseColumnCountPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES, clientTimeStamp,
baseColumnCountPtr);
additionalTableMetaData.add(updateBaseColumnCountPut);
}
}
if (columnToDelete.isViewReferenced()) { // Disallow deletion of column referenced in WHERE clause of view
return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), table, columnToDelete);
}
// drop any indexes that need the column that is going to be dropped
dropIndexes(table, region, invalidateList, locks,
clientTimeStamp, schemaName, tableName,
additionalTableMetaData, columnToDelete,
tableNamesToDelete, sharedTablesToDelete);
} catch (ColumnFamilyNotFoundException e) {
return new MetaDataMutationResult(
MutationCode.COLUMN_NOT_FOUND, EnvironmentEdgeManager
.currentTimeMillis(), table, columnToDelete);
} catch (ColumnNotFoundException e) {
return new MetaDataMutationResult(
MutationCode.COLUMN_NOT_FOUND, EnvironmentEdgeManager
.currentTimeMillis(), table, columnToDelete);
}
}
}
}
if (deletePKColumn) {
if (table.getPKColumns().size() == 1) {
return new MetaDataMutationResult(MutationCode.NO_PK_COLUMNS,
EnvironmentEdgeManager.currentTimeMillis(), null);
}
}
tableMetaData.addAll(additionalTableMetaData);
long currentTime = MetaDataUtil.getClientTimeStamp(tableMetaData);
return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, null, tableNamesToDelete, sharedTablesToDelete);
}
});
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
}
} catch (IOException ioe) {
ProtobufUtil.setControllerException(controller, ioe);
}
}
private void dropIndexes(PTable table, Region region, List<ImmutableBytesPtr> invalidateList,
List<RowLock> locks, long clientTimeStamp, byte[] schemaName,
byte[] tableName, List<Mutation> additionalTableMetaData, PColumn columnToDelete,
List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete)
throws IOException, SQLException {
// Look for columnToDelete in any indexes. If found as PK column, get lock and drop the
// index and then invalidate it
// Covered columns are deleted from the index by the client
PhoenixConnection connection = null;
try {
connection = table.getIndexes().isEmpty() ? null : QueryUtil.getConnectionOnServer(
env.getConfiguration()).unwrap(PhoenixConnection.class);
} catch (ClassNotFoundException e) {
}
for (PTable index : table.getIndexes()) {
byte[] tenantId = index.getTenantId() == null ? ByteUtil.EMPTY_BYTE_ARRAY : index.getTenantId().getBytes();
IndexMaintainer indexMaintainer = index.getIndexMaintainer(table, connection);
byte[] indexKey =
SchemaUtil.getTableKey(tenantId, index.getSchemaName().getBytes(), index
.getTableName().getBytes());
Pair<String, String> columnToDeleteInfo = new Pair<>(columnToDelete.getFamilyName().getString(), columnToDelete.getName().getString());
ColumnReference colDropRef = new ColumnReference(columnToDelete.getFamilyName().getBytes(), columnToDelete.getColumnQualifierBytes());
boolean isColumnIndexed = indexMaintainer.getIndexedColumnInfo().contains(columnToDeleteInfo);
boolean isCoveredColumn = indexMaintainer.getCoveredColumns().contains(colDropRef);
// If index requires this column for its pk, then drop it
if (isColumnIndexed) {
// Since we're dropping the index, lock it to ensure
// that a change in index state doesn't
// occur while we're dropping it.
acquireLock(region, indexKey, locks);
// Drop the index table. The doDropTable will expand
// this to all of the table rows and invalidate the
// index table
additionalTableMetaData.add(new Delete(indexKey, clientTimeStamp));
byte[] linkKey =
MetaDataUtil.getParentLinkKey(tenantId, schemaName, tableName, index
.getTableName().getBytes());
// Drop the link between the data table and the
// index table
additionalTableMetaData.add(new Delete(linkKey, clientTimeStamp));
doDropTable(indexKey, tenantId, index.getSchemaName().getBytes(), index
.getTableName().getBytes(), tableName, index.getType(),
additionalTableMetaData, invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, false);
invalidateList.add(new ImmutableBytesPtr(indexKey));
}
// If the dropped column is a covered index column, invalidate the index
else if (isCoveredColumn){
invalidateList.add(new ImmutableBytesPtr(indexKey));
}
}
}
@Override
public void clearCache(RpcController controller, ClearCacheRequest request,
RpcCallback<ClearCacheResponse> done) {
GlobalCache cache = GlobalCache.getInstance(this.env);
Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
GlobalCache.getInstance(this.env).getMetaDataCache();
metaDataCache.invalidateAll();
long unfreedBytes = cache.clearTenantCache();
ClearCacheResponse.Builder builder = ClearCacheResponse.newBuilder();
builder.setUnfreedBytes(unfreedBytes);
done.run(builder.build());
}
@Override
public void getVersion(RpcController controller, GetVersionRequest request, RpcCallback<GetVersionResponse> done) {
GetVersionResponse.Builder builder = GetVersionResponse.newBuilder();
Configuration config = env.getConfiguration();
boolean isTablesMappingEnabled = SchemaUtil.isNamespaceMappingEnabled(PTableType.TABLE,
new ReadOnlyProps(config.iterator()));
if (isTablesMappingEnabled
&& PhoenixDatabaseMetaData.MIN_NAMESPACE_MAPPED_PHOENIX_VERSION > request.getClientVersion()) {
logger.error("Old client is not compatible when" + " system tables are upgraded to map to namespace");
ProtobufUtil.setControllerException(controller,
ServerUtil.createIOException(
SchemaUtil.getPhysicalHBaseTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME,
isTablesMappingEnabled, PTableType.SYSTEM).getString(),
new DoNotRetryIOException(
"Old client is not compatible when" + " system tables are upgraded to map to namespace")));
}
long version = MetaDataUtil.encodeVersion(env.getHBaseVersion(), config);
builder.setVersion(version);
done.run(builder.build());
}
@Override
public void updateIndexState(RpcController controller, UpdateIndexStateRequest request,
RpcCallback<MetaDataResponse> done) {
MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
byte[] schemaName = null;
byte[] tableName = null;
try {
byte[][] rowKeyMetaData = new byte[3][];
List<Mutation> tableMetadata = ProtobufUtil.getMutations(request);
MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
byte[] tenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
final byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
Region region = env.getRegion();
MetaDataMutationResult result = checkTableKeyInRegion(key, region);
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
return;
}
long timeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
List<Cell> newKVs = tableMetadata.get(0).getFamilyCellMap().get(TABLE_FAMILY_BYTES);
Cell newKV = null;
int disableTimeStampKVIndex = -1;
int indexStateKVIndex = 0;
int index = 0;
for(Cell cell : newKVs){
if(Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
INDEX_STATE_BYTES, 0, INDEX_STATE_BYTES.length) == 0){
newKV = cell;
indexStateKVIndex = index;
} else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
INDEX_DISABLE_TIMESTAMP_BYTES, 0, INDEX_DISABLE_TIMESTAMP_BYTES.length) == 0){
disableTimeStampKVIndex = index;
}
index++;
}
PIndexState newState =
PIndexState.fromSerializedValue(newKV.getValueArray()[newKV.getValueOffset()]);
RowLock rowLock = region.getRowLock(key, false);
if (rowLock == null) {
throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
}
try {
Get get = new Get(key);
get.setTimeRange(PTable.INITIAL_SEQ_NUM, timeStamp);
get.addColumn(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
get.addColumn(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
get.addColumn(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
get.addColumn(TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES);
Result currentResult = region.get(get);
if (currentResult.rawCells().length == 0) {
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
done.run(builder.build());
return;
}
Cell dataTableKV = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
Cell currentStateKV = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
Cell currentDisableTimeStamp = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
boolean rowKeyOrderOptimizable = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES) != null;
PIndexState currentState =
PIndexState.fromSerializedValue(currentStateKV.getValueArray()[currentStateKV
.getValueOffset()]);
if ((currentDisableTimeStamp != null && currentDisableTimeStamp.getValueLength() > 0) &&
(disableTimeStampKVIndex >= 0)) {
long curTimeStampVal = (Long) PLong.INSTANCE.toObject(currentDisableTimeStamp.getValueArray(),
currentDisableTimeStamp.getValueOffset(), currentDisableTimeStamp.getValueLength());
// new DisableTimeStamp is passed in
Cell newDisableTimeStampCell = newKVs.get(disableTimeStampKVIndex);
long newDisableTimeStamp = (Long) PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(),
newDisableTimeStampCell.getValueOffset(), newDisableTimeStampCell.getValueLength());
if(curTimeStampVal > 0 && curTimeStampVal < newDisableTimeStamp){
// not reset disable timestamp
newKVs.remove(disableTimeStampKVIndex);
disableTimeStampKVIndex = -1;
}
}
// Detect invalid transitions
if (currentState == PIndexState.BUILDING) {
if (newState == PIndexState.USABLE) {
builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
done.run(builder.build());
return;
}
} else if (currentState == PIndexState.DISABLE) {
if (newState != PIndexState.BUILDING && newState != PIndexState.DISABLE &&
newState != PIndexState.INACTIVE) {
builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
done.run(builder.build());
return;
}
// Done building, but was disable before that, so that in disabled state
if (newState == PIndexState.ACTIVE) {
newState = PIndexState.DISABLE;
}
}
if (currentState == PIndexState.BUILDING && newState != PIndexState.ACTIVE) {
timeStamp = currentStateKV.getTimestamp();
}
if ((currentState == PIndexState.UNUSABLE && newState == PIndexState.ACTIVE)
|| (currentState == PIndexState.ACTIVE && newState == PIndexState.UNUSABLE)) {
newState = PIndexState.INACTIVE;
newKVs.set(indexStateKVIndex, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue())));
} else if (currentState == PIndexState.INACTIVE && newState == PIndexState.USABLE) {
newState = PIndexState.ACTIVE;
newKVs.set(indexStateKVIndex, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue())));
}
PTable returnTable = null;
if (currentState != newState || disableTimeStampKVIndex != -1) {
byte[] dataTableKey = null;
if(dataTableKV != null) {
dataTableKey = SchemaUtil.getTableKey(tenantId, schemaName, dataTableKV.getValue());
}
if(dataTableKey != null) {
// make a copy of tableMetadata
tableMetadata = new ArrayList<Mutation>(tableMetadata);
// insert an empty KV to trigger time stamp update on data table row
Put p = new Put(dataTableKey);
p.add(TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timeStamp, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
tableMetadata.add(p);
}
boolean setRowKeyOrderOptimizableCell = newState == PIndexState.BUILDING && !rowKeyOrderOptimizable;
// We're starting a rebuild of the index, so add our rowKeyOrderOptimizable cell
// so that the row keys get generated using the new row key format
if (setRowKeyOrderOptimizableCell) {
UpgradeUtil.addRowKeyOrderOptimizableCell(tableMetadata, key, timeStamp);
}
region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
HConstants.NO_NONCE);
// Invalidate from cache
Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
metaDataCache.invalidate(cacheKey);
if(dataTableKey != null) {
metaDataCache.invalidate(new ImmutableBytesPtr(dataTableKey));
}
if (setRowKeyOrderOptimizableCell || disableTimeStampKVIndex != -1
|| currentState == PIndexState.DISABLE || newState == PIndexState.BUILDING) {
returnTable = doGetTable(key, HConstants.LATEST_TIMESTAMP, rowLock);
}
}
// Get client timeStamp from mutations, since it may get updated by the
// mutateRowsWithLocks call
long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
builder.setMutationTime(currentTime);
if (returnTable != null) {
builder.setTable(PTableImpl.toProto(returnTable));
}
done.run(builder.build());
return;
} finally {
rowLock.release();
}
} catch (Throwable t) {
logger.error("updateIndexState failed", t);
ProtobufUtil.setControllerException(controller,
ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
}
}
private static MetaDataMutationResult checkKeyInRegion(byte[] key, Region region, MutationCode code) {
byte[] startKey = region.getRegionInfo().getStartKey();
byte[] endKey = region.getRegionInfo().getEndKey();
if (Bytes.compareTo(startKey, key) <= 0
&& (Bytes.compareTo(HConstants.LAST_ROW, endKey) == 0 || Bytes.compareTo(key,
endKey) < 0)) {
return null; // normal case;
}
return new MetaDataMutationResult(code, EnvironmentEdgeManager.currentTimeMillis(), null);
}
private static MetaDataMutationResult checkTableKeyInRegion(byte[] key, Region region) {
return checkKeyInRegion(key, region, MutationCode.TABLE_NOT_IN_REGION);
}
private static MetaDataMutationResult checkFunctionKeyInRegion(byte[] key, Region region) {
return checkKeyInRegion(key, region, MutationCode.FUNCTION_NOT_IN_REGION);
}
private static MetaDataMutationResult checkSchemaKeyInRegion(byte[] key, Region region) {
return checkKeyInRegion(key, region, MutationCode.SCHEMA_NOT_IN_REGION);
}
/**
* Certain operations, such as DROP TABLE are not allowed if there a table has child views. This class wraps the
* Results of a scanning the Phoenix Metadata for child views for a specific table and stores an additional flag for
* whether whether SYSTEM.CATALOG has split across multiple regions.
*/
private static class TableViewFinderResult {
private List<Result> results = Lists.newArrayList();
private boolean allViewsNotInSingleRegion = false;
private PTable table;
private TableViewFinderResult(List<Result> results, PTable table) {
this.results = results;
this.table = table;
}
public boolean hasViews() {
int localIndexesCount = 0;
for(PTable index : table.getIndexes()) {
if(index.getIndexType().equals(IndexType.LOCAL)) {
localIndexesCount++;
}
}
return results.size()-localIndexesCount > 0;
}
private void setAllViewsNotInSingleRegion() {
allViewsNotInSingleRegion = true;
}
private List<Result> getResults() {
return results;
}
/**
* Returns true is the table has views and they are all in the same HBase region.
*/
private boolean allViewsInSingleRegion() {
return results.size() > 0 && !allViewsNotInSingleRegion;
}
/**
* Returns true is the table has views and they are all NOT in the same HBase region.
*/
private boolean allViewsInMultipleRegions() {
return results.size() > 0 && allViewsNotInSingleRegion;
}
}
@Override
public void clearTableFromCache(RpcController controller, ClearTableFromCacheRequest request,
RpcCallback<ClearTableFromCacheResponse> done) {
byte[] schemaName = request.getSchemaName().toByteArray();
byte[] tableName = request.getTableName().toByteArray();
try {
byte[] tenantId = request.getTenantId().toByteArray();
byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
GlobalCache.getInstance(this.env).getMetaDataCache();
metaDataCache.invalidate(cacheKey);
} catch (Throwable t) {
logger.error("incrementTableTimeStamp failed", t);
ProtobufUtil.setControllerException(controller,
ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
}
}
@Override
public void getSchema(RpcController controller, GetSchemaRequest request, RpcCallback<MetaDataResponse> done) {
MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
Region region = env.getRegion();
String schemaName = request.getSchemaName();
byte[] lockKey = SchemaUtil.getSchemaKey(schemaName);
MetaDataMutationResult result = checkSchemaKeyInRegion(lockKey, region);
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
return;
}
long clientTimeStamp = request.getClientTimestamp();
List<RowLock> locks = Lists.newArrayList();
try {
acquireLock(region, lockKey, locks);
// Get as of latest timestamp so we can detect if we have a
// newer schema that already
// exists without making an additional query
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(lockKey);
PSchema schema = loadSchema(env, lockKey, cacheKey, clientTimeStamp, clientTimeStamp);
if (schema != null) {
if (schema.getTimeStamp() < clientTimeStamp) {
if (!isSchemaDeleted(schema)) {
builder.setReturnCode(MetaDataProtos.MutationCode.SCHEMA_ALREADY_EXISTS);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
builder.setSchema(PSchema.toProto(schema));
done.run(builder.build());
return;
} else {
builder.setReturnCode(MetaDataProtos.MutationCode.NEWER_SCHEMA_FOUND);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
builder.setSchema(PSchema.toProto(schema));
done.run(builder.build());
return;
}
}
}
} catch (Exception e) {
long currentTime = EnvironmentEdgeManager.currentTimeMillis();
builder.setReturnCode(MetaDataProtos.MutationCode.SCHEMA_NOT_FOUND);
builder.setMutationTime(currentTime);
done.run(builder.build());
return;
} finally {
region.releaseRowLocks(locks);
}
}
@Override
public void getFunctions(RpcController controller, GetFunctionsRequest request,
RpcCallback<MetaDataResponse> done) {
MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
byte[] tenantId = request.getTenantId().toByteArray();
List<String> functionNames = new ArrayList<>(request.getFunctionNamesCount());
try {
Region region = env.getRegion();
List<ByteString> functionNamesList = request.getFunctionNamesList();
List<Long> functionTimestampsList = request.getFunctionTimestampsList();
List<byte[]> keys = new ArrayList<byte[]>(request.getFunctionNamesCount());
List<Pair<byte[], Long>> functions = new ArrayList<Pair<byte[], Long>>(request.getFunctionNamesCount());
for(int i = 0; i< functionNamesList.size();i++) {
byte[] functionName = functionNamesList.get(i).toByteArray();
functionNames.add(Bytes.toString(functionName));
byte[] key = SchemaUtil.getFunctionKey(tenantId, functionName);
MetaDataMutationResult result = checkFunctionKeyInRegion(key, region);
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
return;
}
functions.add(new Pair<byte[], Long>(functionName,functionTimestampsList.get(i)));
keys.add(key);
}
long currentTime = EnvironmentEdgeManager.currentTimeMillis();
List<PFunction> functionsAvailable = doGetFunctions(keys, request.getClientTimestamp());
if (functionsAvailable == null) {
builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_NOT_FOUND);
builder.setMutationTime(currentTime);
done.run(builder.build());
return;
}
builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_ALREADY_EXISTS);
builder.setMutationTime(currentTime);
for (PFunction function : functionsAvailable) {
builder.addFunction(PFunction.toProto(function));
}
done.run(builder.build());
return;
} catch (Throwable t) {
logger.error("getFunctions failed", t);
ProtobufUtil.setControllerException(controller,
ServerUtil.createIOException(functionNames.toString(), t));
}
}
@Override
public void createFunction(RpcController controller, CreateFunctionRequest request,
RpcCallback<MetaDataResponse> done) {
MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
byte[][] rowKeyMetaData = new byte[2][];
byte[] functionName = null;
try {
List<Mutation> functionMetaData = ProtobufUtil.getMutations(request);
boolean temporaryFunction = request.getTemporary();
MetaDataUtil.getTenantIdAndFunctionName(functionMetaData, rowKeyMetaData);
byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
functionName = rowKeyMetaData[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX];
byte[] lockKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionName);
Region region = env.getRegion();
MetaDataMutationResult result = checkFunctionKeyInRegion(lockKey, region);
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
return;
}
List<RowLock> locks = Lists.newArrayList();
long clientTimeStamp = MetaDataUtil.getClientTimeStamp(functionMetaData);
try {
acquireLock(region, lockKey, locks);
// Get as of latest timestamp so we can detect if we have a newer function that already
// exists without making an additional query
ImmutableBytesPtr cacheKey = new FunctionBytesPtr(lockKey);
PFunction function =
loadFunction(env, lockKey, cacheKey, clientTimeStamp, clientTimeStamp, request.getReplace(), functionMetaData);
if (function != null) {
if (function.getTimeStamp() < clientTimeStamp) {
// If the function is older than the client time stamp and it's deleted,
// continue
if (!isFunctionDeleted(function)) {
builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_ALREADY_EXISTS);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
builder.addFunction(PFunction.toProto(function));
done.run(builder.build());
if(!request.getReplace()) {
return;
}
}
} else {
builder.setReturnCode(MetaDataProtos.MutationCode.NEWER_FUNCTION_FOUND);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
builder.addFunction(PFunction.toProto(function));
done.run(builder.build());
return;
}
}
// Don't store function info for temporary functions.
if(!temporaryFunction) {
region.mutateRowsWithLocks(functionMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
}
// Invalidate the cache - the next getFunction call will add it
// TODO: consider loading the function that was just created here, patching up the parent function, and updating the cache
Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
metaDataCache.invalidate(cacheKey);
// Get timeStamp from mutations - the above method sets it if it's unset
long currentTimeStamp = MetaDataUtil.getClientTimeStamp(functionMetaData);
builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_NOT_FOUND);
builder.setMutationTime(currentTimeStamp);
done.run(builder.build());
return;
} finally {
region.releaseRowLocks(locks);
}
} catch (Throwable t) {
logger.error("createFunction failed", t);
ProtobufUtil.setControllerException(controller,
ServerUtil.createIOException(Bytes.toString(functionName), t));
}
}
@Override
public void dropFunction(RpcController controller, DropFunctionRequest request,
RpcCallback<MetaDataResponse> done) {
byte[][] rowKeyMetaData = new byte[2][];
byte[] functionName = null;
try {
List<Mutation> functionMetaData = ProtobufUtil.getMutations(request);
MetaDataUtil.getTenantIdAndFunctionName(functionMetaData, rowKeyMetaData);
byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
functionName = rowKeyMetaData[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX];
byte[] lockKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionName);
Region region = env.getRegion();
MetaDataMutationResult result = checkFunctionKeyInRegion(lockKey, region);
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
return;
}
List<RowLock> locks = Lists.newArrayList();
long clientTimeStamp = MetaDataUtil.getClientTimeStamp(functionMetaData);
try {
acquireLock(region, lockKey, locks);
List<byte[]> keys = new ArrayList<byte[]>(1);
keys.add(lockKey);
List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();
result = doDropFunction(clientTimeStamp, keys, functionMetaData, invalidateList);
if (result.getMutationCode() != MutationCode.FUNCTION_ALREADY_EXISTS) {
done.run(MetaDataMutationResult.toProto(result));
return;
}
region.mutateRowsWithLocks(functionMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
long currentTime = MetaDataUtil.getClientTimeStamp(functionMetaData);
for(ImmutableBytesPtr ptr: invalidateList) {
metaDataCache.invalidate(ptr);
metaDataCache.put(ptr, newDeletedFunctionMarker(currentTime));
}
done.run(MetaDataMutationResult.toProto(result));
return;
} finally {
region.releaseRowLocks(locks);
}
} catch (Throwable t) {
logger.error("dropFunction failed", t);
ProtobufUtil.setControllerException(controller,
ServerUtil.createIOException(Bytes.toString(functionName), t));
}
}
private MetaDataMutationResult doDropFunction(long clientTimeStamp, List<byte[]> keys, List<Mutation> functionMetaData, List<ImmutableBytesPtr> invalidateList)
throws IOException, SQLException {
List<byte[]> keysClone = new ArrayList<byte[]>(keys);
List<PFunction> functions = doGetFunctions(keysClone, clientTimeStamp);
// We didn't find a table at the latest timestamp, so either there is no table or
// there was a table, but it's been deleted. In either case we want to return.
if (functions == null || functions.isEmpty()) {
if (buildDeletedFunction(keys.get(0), new FunctionBytesPtr(keys.get(0)), env.getRegion(), clientTimeStamp) != null) {
return new MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS, EnvironmentEdgeManager.currentTimeMillis(), null);
}
return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
}
if (functions != null && !functions.isEmpty()) {
if (functions.get(0).getTimeStamp() < clientTimeStamp) {
// If the function is older than the client time stamp and it's deleted,
// continue
if (isFunctionDeleted(functions.get(0))) {
return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND,
EnvironmentEdgeManager.currentTimeMillis(), null);
}
invalidateList.add(new FunctionBytesPtr(keys.get(0)));
Region region = env.getRegion();
Scan scan = MetaDataUtil.newTableRowsScan(keys.get(0), MIN_TABLE_TIMESTAMP, clientTimeStamp);
List<Cell> results = Lists.newArrayList();
try (RegionScanner scanner = region.getScanner(scan);) {
scanner.next(results);
if (results.isEmpty()) { // Should not be possible
return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
}
do {
Cell kv = results.get(0);
Delete delete = new Delete(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), clientTimeStamp);
functionMetaData.add(delete);
results.clear();
scanner.next(results);
} while (!results.isEmpty());
}
return new MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS,
EnvironmentEdgeManager.currentTimeMillis(), functions, true);
}
}
return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND,
EnvironmentEdgeManager.currentTimeMillis(), null);
}
@Override
public void createSchema(RpcController controller, CreateSchemaRequest request,
RpcCallback<MetaDataResponse> done) {
MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
String schemaName = null;
try {
List<Mutation> schemaMutations = ProtobufUtil.getMutations(request);
schemaName = request.getSchemaName();
Mutation m = MetaDataUtil.getPutOnlyTableHeaderRow(schemaMutations);
byte[] lockKey = m.getRow();
Region region = env.getRegion();
MetaDataMutationResult result = checkSchemaKeyInRegion(lockKey, region);
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
return;
}
List<RowLock> locks = Lists.newArrayList();
long clientTimeStamp = MetaDataUtil.getClientTimeStamp(schemaMutations);
try {
acquireLock(region, lockKey, locks);
// Get as of latest timestamp so we can detect if we have a newer schema that already exists without
// making an additional query
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(lockKey);
PSchema schema = loadSchema(env, lockKey, cacheKey, clientTimeStamp, clientTimeStamp);
if (schema != null) {
if (schema.getTimeStamp() < clientTimeStamp) {
if (!isSchemaDeleted(schema)) {
builder.setReturnCode(MetaDataProtos.MutationCode.SCHEMA_ALREADY_EXISTS);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
builder.setSchema(PSchema.toProto(schema));
done.run(builder.build());
return;
}
} else {
builder.setReturnCode(MetaDataProtos.MutationCode.NEWER_SCHEMA_FOUND);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
builder.setSchema(PSchema.toProto(schema));
done.run(builder.build());
return;
}
}
region.mutateRowsWithLocks(schemaMutations, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
HConstants.NO_NONCE);
// Invalidate the cache - the next getSchema call will add it
Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env)
.getMetaDataCache();
if (cacheKey != null) {
metaDataCache.invalidate(cacheKey);
}
// Get timeStamp from mutations - the above method sets it if
// it's unset
long currentTimeStamp = MetaDataUtil.getClientTimeStamp(schemaMutations);
builder.setReturnCode(MetaDataProtos.MutationCode.SCHEMA_NOT_FOUND);
builder.setMutationTime(currentTimeStamp);
done.run(builder.build());
return;
} finally {
region.releaseRowLocks(locks);
}
} catch (Throwable t) {
logger.error("Creating the schema" + schemaName + "failed", t);
ProtobufUtil.setControllerException(controller, ServerUtil.createIOException(schemaName, t));
}
}
@Override
public void dropSchema(RpcController controller, DropSchemaRequest request, RpcCallback<MetaDataResponse> done) {
String schemaName = null;
try {
List<Mutation> schemaMetaData = ProtobufUtil.getMutations(request);
schemaName = request.getSchemaName();
byte[] lockKey = SchemaUtil.getSchemaKey(schemaName);
Region region = env.getRegion();
MetaDataMutationResult result = checkSchemaKeyInRegion(lockKey, region);
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
return;
}
List<RowLock> locks = Lists.newArrayList();
long clientTimeStamp = MetaDataUtil.getClientTimeStamp(schemaMetaData);
try {
acquireLock(region, lockKey, locks);
List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>(1);
result = doDropSchema(clientTimeStamp, schemaName, lockKey, schemaMetaData, invalidateList);
if (result.getMutationCode() != MutationCode.SCHEMA_ALREADY_EXISTS) {
done.run(MetaDataMutationResult.toProto(result));
return;
}
region.mutateRowsWithLocks(schemaMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
HConstants.NO_NONCE);
Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env)
.getMetaDataCache();
long currentTime = MetaDataUtil.getClientTimeStamp(schemaMetaData);
for (ImmutableBytesPtr ptr : invalidateList) {
metaDataCache.invalidate(ptr);
metaDataCache.put(ptr, newDeletedSchemaMarker(currentTime));
}
done.run(MetaDataMutationResult.toProto(result));
return;
} finally {
region.releaseRowLocks(locks);
}
} catch (Throwable t) {
logger.error("drop schema failed:", t);
ProtobufUtil.setControllerException(controller, ServerUtil.createIOException(schemaName, t));
}
}
private MetaDataMutationResult doDropSchema(long clientTimeStamp, String schemaName, byte[] key,
List<Mutation> schemaMutations, List<ImmutableBytesPtr> invalidateList) throws IOException, SQLException {
PSchema schema = loadSchema(env, key, new ImmutableBytesPtr(key), clientTimeStamp, clientTimeStamp);
boolean areTablesExists = false;
if (schema == null) { return new MetaDataMutationResult(MutationCode.SCHEMA_NOT_FOUND,
EnvironmentEdgeManager.currentTimeMillis(), null); }
if (schema.getTimeStamp() < clientTimeStamp) {
Region region = env.getRegion();
Scan scan = MetaDataUtil.newTableRowsScan(SchemaUtil.getKeyForSchema(null, schemaName), MIN_TABLE_TIMESTAMP,
clientTimeStamp);
List<Cell> results = Lists.newArrayList();
try (RegionScanner scanner = region.getScanner(scan);) {
scanner.next(results);
if (results.isEmpty()) { // Should not be possible
return new MetaDataMutationResult(MutationCode.SCHEMA_NOT_FOUND,
EnvironmentEdgeManager.currentTimeMillis(), null);
}
do {
Cell kv = results.get(0);
if (Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), key, 0,
key.length) != 0) {
areTablesExists = true;
break;
}
results.clear();
scanner.next(results);
} while (!results.isEmpty());
}
if (areTablesExists) { return new MetaDataMutationResult(MutationCode.TABLES_EXIST_ON_SCHEMA, schema,
EnvironmentEdgeManager.currentTimeMillis()); }
invalidateList.add(new ImmutableBytesPtr(key));
return new MetaDataMutationResult(MutationCode.SCHEMA_ALREADY_EXISTS, schema,
EnvironmentEdgeManager.currentTimeMillis());
}
return new MetaDataMutationResult(MutationCode.SCHEMA_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(),
null);
}
}