blob: 680eb3f316d75d37746b346efa0351daddf2068f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compat.hbase.CompatUtil;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.RepairUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
public class IndexHalfStoreFileReaderGenerator implements RegionObserver, RegionCoprocessor{
private static final String LOCAL_INDEX_AUTOMATIC_REPAIR = "local.index.automatic.repair";
public static final Logger LOGGER =
LoggerFactory.getLogger(IndexHalfStoreFileReaderGenerator.class);
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public StoreFileReader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
Reference r, StoreFileReader reader) throws IOException {
TableName tableName = ctx.getEnvironment().getRegion().getTableDescriptor().getTableName();
Region region = ctx.getEnvironment().getRegion();
RegionInfo childRegion = region.getRegionInfo();
byte[] splitKey = null;
if (reader == null && r != null) {
if(!p.toString().contains(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
return reader;
}
PhoenixConnection conn = null;
Table metaTable = null;
byte[] regionStartKeyInHFile = null;
try (Connection hbaseConn =
ConnectionFactory.createConnection(ctx.getEnvironment().getConfiguration())) {
Scan scan = MetaTableAccessor.getScanForTableName(hbaseConn, tableName);
SingleColumnValueFilter scvf = null;
if (Reference.isTopFileRegion(r.getFileRegion())) {
scvf = new SingleColumnValueFilter(HConstants.CATALOG_FAMILY,
HConstants.SPLITB_QUALIFIER, CompareOperator.EQUAL, RegionInfoUtil.toByteArray(region.getRegionInfo()));
scvf.setFilterIfMissing(true);
} else {
scvf = new SingleColumnValueFilter(HConstants.CATALOG_FAMILY,
HConstants.SPLITA_QUALIFIER, CompareOperator.EQUAL, RegionInfoUtil.toByteArray(region.getRegionInfo()));
scvf.setFilterIfMissing(true);
}
if(scvf != null) scan.setFilter(scvf);
metaTable = hbaseConn.getTable(TableName.META_TABLE_NAME);
Result result = null;
try (ResultScanner scanner = metaTable.getScanner(scan)) {
result = scanner.next();
}
if (result == null || result.isEmpty()) {
List<RegionInfo> mergeRegions =
CompatUtil.getMergeRegions(ctx.getEnvironment().getConnection(),
region.getRegionInfo().getRegionName());
if (mergeRegions == null || mergeRegions.isEmpty()){
return reader;
}
byte[] splitRow =
CellUtil.cloneRow(KeyValueUtil.createKeyValueFromKey(r.getSplitKey()));
// We need not change any thing in first region data because first region start key
// is equal to merged region start key. So returning same reader.
if (Bytes.compareTo(mergeRegions.get(0).getStartKey(), splitRow) == 0) {
if (mergeRegions.get(0).getStartKey().length == 0
&& region.getRegionInfo().getEndKey().length
!= mergeRegions.get(0).getEndKey().length) {
childRegion = mergeRegions.get(0);
regionStartKeyInHFile =
mergeRegions.get(0).getStartKey().length == 0
? new byte[mergeRegions.get(0).getEndKey().length]
: mergeRegions.get(0).getStartKey();
} else {
return reader;
}
} else {
for (RegionInfo mergeRegion :
mergeRegions.subList(1, mergeRegions.size())) {
if (Bytes.compareTo(mergeRegion.getStartKey(), splitRow) == 0) {
childRegion = mergeRegion;
regionStartKeyInHFile = mergeRegion.getStartKey();
break;
}
}
}
splitKey = KeyValueUtil.createFirstOnRow(region.getRegionInfo().getStartKey().length == 0 ?
new byte[region.getRegionInfo().getEndKey().length] :
region.getRegionInfo().getStartKey()).getKey();
} else {
RegionInfo parentRegion = MetaTableAccessor.getRegionInfo(result);
regionStartKeyInHFile =
parentRegion.getStartKey().length == 0 ? new byte[parentRegion
.getEndKey().length] : parentRegion.getStartKey();
}
} finally {
if (metaTable != null) metaTable.close();
}
try {
conn = QueryUtil.getConnectionOnServer(ctx.getEnvironment().getConfiguration()).unwrap(
PhoenixConnection.class);
PTable dataTable =
IndexUtil.getPDataTable(conn, ctx.getEnvironment().getRegion()
.getTableDescriptor());
List<PTable> indexes = dataTable.getIndexes();
Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers =
new HashMap<ImmutableBytesWritable, IndexMaintainer>();
for (PTable index : indexes) {
if (index.getIndexType() == IndexType.LOCAL) {
IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, conn);
indexMaintainers.put(new ImmutableBytesWritable(index.getviewIndexIdType().toBytes(index.getViewIndexId())),
indexMaintainer);
}
}
if(indexMaintainers.isEmpty()) return reader;
byte[][] viewConstants = getViewConstants(dataTable);
return new IndexHalfStoreFileReader(fs, p, cacheConf, in, size, r, ctx
.getEnvironment().getConfiguration(), indexMaintainers, viewConstants,
childRegion, regionStartKeyInHFile, splitKey,
childRegion.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID,
new AtomicInteger(0), region.getRegionInfo());
} catch (SQLException e) {
throw new IOException(e);
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
throw new IOException(e);
}
}
}
}
return reader;
}
@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner s, ScanType scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {
if (!IndexUtil.isLocalIndexStore(store)) { return s; }
if (!store.hasReferences()) {
InternalScanner repairScanner = null;
if (request.isMajor() && (!RepairUtil.isLocalIndexStoreFilesConsistent(c.getEnvironment(), store))) {
LOGGER.info("we have found inconsistent data for local index for region:"
+ c.getEnvironment().getRegion().getRegionInfo());
if (c.getEnvironment().getConfiguration().getBoolean(LOCAL_INDEX_AUTOMATIC_REPAIR, true)) {
LOGGER.info("Starting automatic repair of local Index for region:"
+ c.getEnvironment().getRegion().getRegionInfo());
repairScanner = getRepairScanner(c.getEnvironment(), store);
}
}
if (repairScanner != null) {
if (s!=null) {
s.close();
}
return repairScanner;
} else {
return s;
}
}
return s;
}
private byte[][] getViewConstants(PTable dataTable) {
int dataPosOffset = (dataTable.getBucketNum() != null ? 1 : 0) + (dataTable.isMultiTenant() ? 1 : 0);
byte[][] viewConstants = null;
int nViewConstants = 0;
if (dataTable.getType() == PTableType.VIEW) {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
List<PColumn> dataPkColumns = dataTable.getPKColumns();
for (int i = dataPosOffset; i < dataPkColumns.size(); i++) {
PColumn dataPKColumn = dataPkColumns.get(i);
if (dataPKColumn.getViewConstant() != null) {
nViewConstants++;
}
}
if (nViewConstants > 0) {
viewConstants = new byte[nViewConstants][];
int j = 0;
for (int i = dataPosOffset; i < dataPkColumns.size(); i++) {
PColumn dataPkColumn = dataPkColumns.get(i);
if (dataPkColumn.getViewConstant() != null) {
if (IndexUtil.getViewConstantValue(dataPkColumn, ptr)) {
viewConstants[j++] = ByteUtil.copyKeyBytesIfNecessary(ptr);
} else {
throw new IllegalStateException();
}
}
}
}
}
return viewConstants;
}
/**
* @param env
* @param store Local Index store
* @param scan
* @param scanType
* @param earliestPutTs
* @param request
* @return StoreScanner for new Local Index data for a passed store and Null if repair is not possible
* @throws IOException
*/
private InternalScanner getRepairScanner(RegionCoprocessorEnvironment env, Store store) throws IOException {
//List<KeyValueScanner> scannersForStoreFiles = Lists.newArrayListWithExpectedSize(store.getStorefilesCount());
Scan scan = new Scan();
scan.readVersions(store.getColumnFamilyDescriptor().getMaxVersions());
for (Store s : env.getRegion().getStores()) {
if (!IndexUtil.isLocalIndexStore(s)) {
scan.addFamily(s.getColumnFamilyDescriptor().getName());
}
}
try {
PhoenixConnection conn = QueryUtil.getConnectionOnServer(env.getConfiguration())
.unwrap(PhoenixConnection.class);
PTable dataPTable = IndexUtil.getPDataTable(conn, env.getRegion().getTableDescriptor());
final List<IndexMaintainer> maintainers = Lists
.newArrayListWithExpectedSize(dataPTable.getIndexes().size());
for (PTable index : dataPTable.getIndexes()) {
if (index.getIndexType() == IndexType.LOCAL) {
maintainers.add(index.getIndexMaintainer(dataPTable, conn));
}
}
return new DataTableLocalIndexRegionScanner(env.getRegion().getScanner(scan), env.getRegion(),
maintainers, store.getColumnFamilyDescriptor().getName(),env.getConfiguration());
} catch (SQLException e) {
throw new IOException(e);
}
}
}