blob: bd308c019115cfeebdebd535268165e5cd232f7d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.index;
import static org.apache.phoenix.query.QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB;
import java.sql.SQLException;
import java.util.List;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.join.MaxServerCacheSizeExceededException;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.ScanUtil;
public class IndexMetaDataCacheClient {
private final ServerCacheClient serverCache;
private PTable cacheUsingTable;
/**
* Construct client used to send index metadata to each region server
* for caching during batched put for secondary index maintenance.
* @param connection the client connection
* @param cacheUsingTableRef table ref to table that will use the cache during its scan
*/
public IndexMetaDataCacheClient(PhoenixConnection connection, PTable cacheUsingTable) {
serverCache = new ServerCacheClient(connection);
this.cacheUsingTable = cacheUsingTable;
}
/**
* Determines whether or not to use the IndexMetaDataCache to send the index metadata
* to the region servers. The alternative is to just set the index metadata as an attribute on
* the mutations.
* @param connection
* @param mutations the list of mutations that will be sent in a batch to server
* @param indexMetaDataByteLength length in bytes of the index metadata cache
*/
public static boolean useIndexMetadataCache(PhoenixConnection connection, List<? extends Mutation> mutations, int indexMetaDataByteLength) {
ReadOnlyProps props = connection.getQueryServices().getProps();
int threshold = props.getInt(INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD);
return (indexMetaDataByteLength > ServerCacheClient.UUID_LENGTH && mutations.size() > threshold);
}
/**
* Send the index metadata cahce to all region servers for regions that will handle the mutations.
* @return client-side {@link ServerCache} representing the added index metadata cache
* @throws SQLException
* @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed
* size
*/
public ServerCache addIndexMetadataCache(List<? extends Mutation> mutations, ImmutableBytesWritable ptr, byte[] txState) throws SQLException {
/**
* Serialize and compress hashCacheTable
*/
return serverCache.addServerCache(ScanUtil.newScanRanges(mutations), ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTable);
}
/**
* Send the index metadata cahce to all region servers for regions that will handle the mutations.
* @param txState TODO
* @return client-side {@link ServerCache} representing the added index metadata cache
* @throws SQLException
* @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed
* size
*/
public ServerCache addIndexMetadataCache(ScanRanges ranges, ImmutableBytesWritable ptr, byte[] txState) throws SQLException {
/**
* Serialize and compress hashCacheTable
*/
return serverCache.addServerCache(ranges, ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTable);
}
public static ServerCache setMetaDataOnMutations(PhoenixConnection connection, PTable table, List<? extends Mutation> mutations,
ImmutableBytesWritable indexMetaDataPtr) throws SQLException {
final byte[] tenantIdBytes;
if (table.isMultiTenant()) {
tenantIdBytes = connection.getTenantId() == null ? null : ScanUtil.getTenantIdBytes(
table.getRowKeySchema(), table.getBucketNum() != null, connection.getTenantId(),
table.getViewIndexId() != null);
} else {
tenantIdBytes = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
}
ServerCache cache = null;
byte[] attribValue = null;
byte[] uuidValue = null;
byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY;
if (table.isTransactional()) {
txState = connection.getMutationState().encodeTransaction();
}
boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0;
if (hasIndexMetaData) {
if (useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength() + txState.length)) {
IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, table);
cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState);
uuidValue = cache.getId();
} else {
attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
uuidValue = ServerCacheClient.generateId();
}
} else if (txState.length == 0) { return null; }
// Either set the UUID to be able to access the index metadata from the cache
// or set the index metadata directly on the Mutation
for (Mutation mutation : mutations) {
if (connection.getTenantId() != null) {
mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantIdBytes);
}
mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
if (attribValue != null) {
mutation.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
mutation.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION,
Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
if (txState.length > 0) {
mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
}
} else if (!hasIndexMetaData && txState.length > 0) {
mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
}
}
return cache;
}
}