| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase; |
| |
| import static org.apache.hadoop.hbase.util.FutureUtils.addListener; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Optional; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.stream.Collectors; |
| import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; |
| import org.apache.hadoop.hbase.client.AsyncTable; |
| import org.apache.hadoop.hbase.client.Consistency; |
| import org.apache.hadoop.hbase.client.Get; |
| import org.apache.hadoop.hbase.client.RegionInfo; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.client.Scan.ReadType; |
| import org.apache.hadoop.hbase.client.TableState; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.Pair; |
| import org.apache.yetus.audience.InterfaceAudience; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * The (asynchronous) meta table accessor used at client side. Used to read/write region and |
| * assignment information store in <code>hbase:meta</code>. |
| * @since 2.0.0 |
| * @see CatalogFamilyFormat |
| */ |
| @InterfaceAudience.Private |
| public final class ClientMetaTableAccessor { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(ClientMetaTableAccessor.class); |
| |
| private ClientMetaTableAccessor() { |
| } |
| |
| @InterfaceAudience.Private |
| public enum QueryType { |
| ALL(HConstants.TABLE_FAMILY, HConstants.CATALOG_FAMILY), REGION(HConstants.CATALOG_FAMILY), |
| TABLE(HConstants.TABLE_FAMILY), REPLICATION(HConstants.REPLICATION_BARRIER_FAMILY); |
| |
| private final byte[][] families; |
| |
| QueryType(byte[]... families) { |
| this.families = families; |
| } |
| |
| byte[][] getFamilies() { |
| return this.families; |
| } |
| } |
| |
| public static CompletableFuture<Boolean> tableExists(AsyncTable<?> metaTable, |
| TableName tableName) { |
| return getTableState(metaTable, tableName).thenApply(Optional::isPresent); |
| } |
| |
| public static CompletableFuture<Optional<TableState>> getTableState(AsyncTable<?> metaTable, |
| TableName tableName) { |
| CompletableFuture<Optional<TableState>> future = new CompletableFuture<>(); |
| Get get = new Get(tableName.getName()).addColumn(HConstants.TABLE_FAMILY, |
| HConstants.TABLE_STATE_QUALIFIER); |
| addListener(metaTable.get(get), (result, error) -> { |
| if (error != null) { |
| future.completeExceptionally(error); |
| return; |
| } |
| try { |
| future.complete(getTableState(result)); |
| } catch (IOException e) { |
| future.completeExceptionally(e); |
| } |
| }); |
| return future; |
| } |
| |
| /** |
| * Returns the HRegionLocation from meta for the given region |
| * @param metaTable |
| * @param regionName region we're looking for |
| * @return HRegionLocation for the given region |
| */ |
| public static CompletableFuture<Optional<HRegionLocation>> |
| getRegionLocation(AsyncTable<?> metaTable, byte[] regionName) { |
| CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>(); |
| try { |
| RegionInfo parsedRegionInfo = CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName); |
| addListener(metaTable.get(new Get(CatalogFamilyFormat.getMetaKeyForRegion(parsedRegionInfo)) |
| .addFamily(HConstants.CATALOG_FAMILY)), (r, err) -> { |
| if (err != null) { |
| future.completeExceptionally(err); |
| return; |
| } |
| future.complete(getRegionLocations(r) |
| .map(locations -> locations.getRegionLocation(parsedRegionInfo.getReplicaId()))); |
| }); |
| } catch (IOException parseEx) { |
| LOG.warn("Failed to parse the passed region name: " + Bytes.toStringBinary(regionName)); |
| future.completeExceptionally(parseEx); |
| } |
| return future; |
| } |
| |
| /** |
| * Returns the HRegionLocation from meta for the given encoded region name |
| * @param metaTable |
| * @param encodedRegionName region we're looking for |
| * @return HRegionLocation for the given region |
| */ |
| public static CompletableFuture<Optional<HRegionLocation>> |
| getRegionLocationWithEncodedName(AsyncTable<?> metaTable, byte[] encodedRegionName) { |
| CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>(); |
| addListener( |
| metaTable |
| .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)), |
| (results, err) -> { |
| if (err != null) { |
| future.completeExceptionally(err); |
| return; |
| } |
| String encodedRegionNameStr = Bytes.toString(encodedRegionName); |
| results.stream().filter(result -> !result.isEmpty()) |
| .filter(result -> CatalogFamilyFormat.getRegionInfo(result) != null).forEach(result -> { |
| getRegionLocations(result).ifPresent(locations -> { |
| for (HRegionLocation location : locations.getRegionLocations()) { |
| if (location != null && |
| encodedRegionNameStr.equals(location.getRegion().getEncodedName())) { |
| future.complete(Optional.of(location)); |
| return; |
| } |
| } |
| }); |
| }); |
| future.complete(Optional.empty()); |
| }); |
| return future; |
| } |
| |
| private static Optional<TableState> getTableState(Result r) throws IOException { |
| return Optional.ofNullable(CatalogFamilyFormat.getTableState(r)); |
| } |
| |
| /** |
| * Used to get all region locations for the specific table. |
| * @param metaTable |
| * @param tableName table we're looking for, can be null for getting all regions |
| * @return the list of region locations. The return value will be wrapped by a |
| * {@link CompletableFuture}. |
| */ |
| public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations( |
| AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName) { |
| CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); |
| addListener(getTableRegionsAndLocations(metaTable, tableName, true), (locations, err) -> { |
| if (err != null) { |
| future.completeExceptionally(err); |
| } else if (locations == null || locations.isEmpty()) { |
| future.complete(Collections.emptyList()); |
| } else { |
| List<HRegionLocation> regionLocations = |
| locations.stream().map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond())) |
| .collect(Collectors.toList()); |
| future.complete(regionLocations); |
| } |
| }); |
| return future; |
| } |
| |
| /** |
| * Used to get table regions' info and server. |
| * @param metaTable |
| * @param tableName table we're looking for, can be null for getting all regions |
| * @param excludeOfflinedSplitParents don't return split parents |
| * @return the list of regioninfos and server. The return value will be wrapped by a |
| * {@link CompletableFuture}. |
| */ |
| private static CompletableFuture<List<Pair<RegionInfo, ServerName>>> getTableRegionsAndLocations( |
| final AsyncTable<AdvancedScanResultConsumer> metaTable, final TableName tableName, |
| final boolean excludeOfflinedSplitParents) { |
| CompletableFuture<List<Pair<RegionInfo, ServerName>>> future = new CompletableFuture<>(); |
| if (TableName.META_TABLE_NAME.equals(tableName)) { |
| future.completeExceptionally(new IOException( |
| "This method can't be used to locate meta regions;" + " use MetaTableLocator instead")); |
| } |
| |
| // Make a version of CollectingVisitor that collects RegionInfo and ServerAddress |
| CollectRegionLocationsVisitor visitor = |
| new CollectRegionLocationsVisitor(excludeOfflinedSplitParents); |
| |
| addListener(scanMeta(metaTable, tableName, QueryType.REGION, visitor), (v, error) -> { |
| if (error != null) { |
| future.completeExceptionally(error); |
| return; |
| } |
| future.complete(visitor.getResults()); |
| }); |
| return future; |
| } |
| |
| /** |
| * Performs a scan of META table for given table. |
| * @param metaTable |
| * @param tableName table withing we scan |
| * @param type scanned part of meta |
| * @param visitor Visitor invoked against each row |
| */ |
| private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable, |
| TableName tableName, QueryType type, final Visitor visitor) { |
| return scanMeta(metaTable, getTableStartRowForMeta(tableName, type), |
| getTableStopRowForMeta(tableName, type), type, Integer.MAX_VALUE, visitor); |
| } |
| |
| /** |
| * Performs a scan of META table for given table. |
| * @param metaTable |
| * @param startRow Where to start the scan |
| * @param stopRow Where to stop the scan |
| * @param type scanned part of meta |
| * @param maxRows maximum rows to return |
| * @param visitor Visitor invoked against each row |
| */ |
| private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable, |
| byte[] startRow, byte[] stopRow, QueryType type, int maxRows, final Visitor visitor) { |
| int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE; |
| Scan scan = getMetaScan(metaTable, rowUpperLimit); |
| for (byte[] family : type.getFamilies()) { |
| scan.addFamily(family); |
| } |
| if (startRow != null) { |
| scan.withStartRow(startRow); |
| } |
| if (stopRow != null) { |
| scan.withStopRow(stopRow); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Scanning META" + " starting at row=" + Bytes.toStringBinary(scan.getStartRow()) + |
| " stopping at row=" + Bytes.toStringBinary(scan.getStopRow()) + " for max=" + |
| rowUpperLimit + " with caching=" + scan.getCaching()); |
| } |
| |
| CompletableFuture<Void> future = new CompletableFuture<Void>(); |
| metaTable.scan(scan, new MetaTableScanResultConsumer(rowUpperLimit, visitor, future)); |
| return future; |
| } |
| |
| private static final class MetaTableScanResultConsumer implements AdvancedScanResultConsumer { |
| |
| private int currentRowCount; |
| |
| private final int rowUpperLimit; |
| |
| private final Visitor visitor; |
| |
| private final CompletableFuture<Void> future; |
| |
| MetaTableScanResultConsumer(int rowUpperLimit, Visitor visitor, |
| CompletableFuture<Void> future) { |
| this.rowUpperLimit = rowUpperLimit; |
| this.visitor = visitor; |
| this.future = future; |
| this.currentRowCount = 0; |
| } |
| |
| @Override |
| public void onError(Throwable error) { |
| future.completeExceptionally(error); |
| } |
| |
| @Override |
| @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NONNULL_PARAM_VIOLATION", |
| justification = "https://github.com/findbugsproject/findbugs/issues/79") |
| public void onComplete() { |
| future.complete(null); |
| } |
| |
| @Override |
| public void onNext(Result[] results, ScanController controller) { |
| boolean terminateScan = false; |
| for (Result result : results) { |
| try { |
| if (!visitor.visit(result)) { |
| terminateScan = true; |
| break; |
| } |
| } catch (Exception e) { |
| future.completeExceptionally(e); |
| terminateScan = true; |
| break; |
| } |
| if (++currentRowCount >= rowUpperLimit) { |
| terminateScan = true; |
| break; |
| } |
| } |
| if (terminateScan) { |
| controller.terminate(); |
| } |
| } |
| } |
| |
| /** |
| * Implementations 'visit' a catalog table row. |
| */ |
| public interface Visitor { |
| /** |
| * Visit the catalog table row. |
| * @param r A row from catalog table |
| * @return True if we are to proceed scanning the table, else false if we are to stop now. |
| */ |
| boolean visit(final Result r) throws IOException; |
| } |
| |
| /** |
| * Implementations 'visit' a catalog table row but with close() at the end. |
| */ |
| public interface CloseableVisitor extends Visitor, Closeable { |
| } |
| |
| /** |
| * A {@link Visitor} that collects content out of passed {@link Result}. |
| */ |
| private static abstract class CollectingVisitor<T> implements Visitor { |
| final List<T> results = new ArrayList<>(); |
| |
| @Override |
| public boolean visit(Result r) throws IOException { |
| if (r != null && !r.isEmpty()) { |
| add(r); |
| } |
| return true; |
| } |
| |
| abstract void add(Result r); |
| |
| /** |
| * @return Collected results; wait till visits complete to collect all possible results |
| */ |
| List<T> getResults() { |
| return this.results; |
| } |
| } |
| |
| static class CollectRegionLocationsVisitor |
| extends CollectingVisitor<Pair<RegionInfo, ServerName>> { |
| |
| private final boolean excludeOfflinedSplitParents; |
| |
| private RegionLocations current = null; |
| |
| CollectRegionLocationsVisitor(boolean excludeOfflinedSplitParents) { |
| this.excludeOfflinedSplitParents = excludeOfflinedSplitParents; |
| } |
| |
| @Override |
| public boolean visit(Result r) throws IOException { |
| Optional<RegionLocations> currentRegionLocations = getRegionLocations(r); |
| current = currentRegionLocations.orElse(null); |
| if (current == null || current.getRegionLocation().getRegion() == null) { |
| LOG.warn("No serialized RegionInfo in " + r); |
| return true; |
| } |
| RegionInfo hri = current.getRegionLocation().getRegion(); |
| if (excludeOfflinedSplitParents && hri.isSplitParent()) { |
| return true; |
| } |
| // Else call super and add this Result to the collection. |
| return super.visit(r); |
| } |
| |
| @Override |
| void add(Result r) { |
| if (current == null) { |
| return; |
| } |
| for (HRegionLocation loc : current.getRegionLocations()) { |
| if (loc != null) { |
| this.results.add(new Pair<RegionInfo, ServerName>(loc.getRegion(), loc.getServerName())); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Collects all returned. |
| */ |
| static class CollectAllVisitor extends CollectingVisitor<Result> { |
| @Override |
| void add(Result r) { |
| this.results.add(r); |
| } |
| } |
| |
| private static Scan getMetaScan(AsyncTable<?> metaTable, int rowUpperLimit) { |
| Scan scan = new Scan(); |
| int scannerCaching = metaTable.getConfiguration().getInt(HConstants.HBASE_META_SCANNER_CACHING, |
| HConstants.DEFAULT_HBASE_META_SCANNER_CACHING); |
| if (metaTable.getConfiguration().getBoolean(HConstants.USE_META_REPLICAS, |
| HConstants.DEFAULT_USE_META_REPLICAS)) { |
| scan.setConsistency(Consistency.TIMELINE); |
| } |
| if (rowUpperLimit <= scannerCaching) { |
| scan.setLimit(rowUpperLimit); |
| } |
| int rows = Math.min(rowUpperLimit, scannerCaching); |
| scan.setCaching(rows); |
| return scan; |
| } |
| |
| /** |
| * Returns an HRegionLocationList extracted from the result. |
| * @return an HRegionLocationList containing all locations for the region range or null if we |
| * can't deserialize the result. |
| */ |
| private static Optional<RegionLocations> getRegionLocations(Result r) { |
| return Optional.ofNullable(CatalogFamilyFormat.getRegionLocations(r)); |
| } |
| |
| /** |
| * @param tableName table we're working with |
| * @return start row for scanning META according to query type |
| */ |
| public static byte[] getTableStartRowForMeta(TableName tableName, QueryType type) { |
| if (tableName == null) { |
| return null; |
| } |
| switch (type) { |
| case REGION: |
| case REPLICATION: { |
| byte[] startRow = new byte[tableName.getName().length + 2]; |
| System.arraycopy(tableName.getName(), 0, startRow, 0, tableName.getName().length); |
| startRow[startRow.length - 2] = HConstants.DELIMITER; |
| startRow[startRow.length - 1] = HConstants.DELIMITER; |
| return startRow; |
| } |
| case ALL: |
| case TABLE: |
| default: { |
| return tableName.getName(); |
| } |
| } |
| } |
| |
| /** |
| * @param tableName table we're working with |
| * @return stop row for scanning META according to query type |
| */ |
| public static byte[] getTableStopRowForMeta(TableName tableName, QueryType type) { |
| if (tableName == null) { |
| return null; |
| } |
| final byte[] stopRow; |
| switch (type) { |
| case REGION: |
| case REPLICATION: { |
| stopRow = new byte[tableName.getName().length + 3]; |
| System.arraycopy(tableName.getName(), 0, stopRow, 0, tableName.getName().length); |
| stopRow[stopRow.length - 3] = ' '; |
| stopRow[stopRow.length - 2] = HConstants.DELIMITER; |
| stopRow[stopRow.length - 1] = HConstants.DELIMITER; |
| break; |
| } |
| case ALL: |
| case TABLE: |
| default: { |
| stopRow = new byte[tableName.getName().length + 1]; |
| System.arraycopy(tableName.getName(), 0, stopRow, 0, tableName.getName().length); |
| stopRow[stopRow.length - 1] = ' '; |
| break; |
| } |
| } |
| return stopRow; |
| } |
| } |