| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.master.assignment; |
| |
| import java.io.IOException; |
| import java.io.InterruptedIOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.stream.Stream; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.DoNotRetryIOException; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.ServerName; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.UnknownRegionException; |
| import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; |
| import org.apache.hadoop.hbase.client.MasterSwitchType; |
| import org.apache.hadoop.hbase.client.Mutation; |
| import org.apache.hadoop.hbase.client.RegionInfo; |
| import org.apache.hadoop.hbase.client.RegionInfoBuilder; |
| import org.apache.hadoop.hbase.client.TableDescriptor; |
| import org.apache.hadoop.hbase.io.hfile.CacheConfig; |
| import org.apache.hadoop.hbase.master.MasterCoprocessorHost; |
| import org.apache.hadoop.hbase.master.MasterFileSystem; |
| import org.apache.hadoop.hbase.master.RegionState.State; |
| import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan; |
| import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProcedure; |
| import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; |
| import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; |
| import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; |
| import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; |
| import org.apache.hadoop.hbase.quotas.QuotaExceededException; |
| import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; |
| import org.apache.hadoop.hbase.regionserver.HStore; |
| import org.apache.hadoop.hbase.regionserver.HStoreFile; |
| import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy; |
| import org.apache.hadoop.hbase.regionserver.StoreFileInfo; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.CommonFSUtils; |
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; |
| import org.apache.hadoop.hbase.util.FSUtils; |
| import org.apache.hadoop.hbase.util.Pair; |
| import org.apache.hadoop.hbase.util.Threads; |
| import org.apache.hadoop.hbase.wal.WALSplitUtil; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.yetus.audience.InterfaceAudience; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SplitTableRegionState; |
| |
| /** |
| * The procedure to split a region in a table. |
| * Takes lock on the parent region. |
| * It holds the lock for the life of the procedure. |
| * <p>Throws exception on construction if determines context hostile to spllt (cluster going |
| * down or master is shutting down or table is disabled).</p> |
| */ |
| @InterfaceAudience.Private |
| public class SplitTableRegionProcedure |
| extends AbstractStateMachineRegionProcedure<SplitTableRegionState> { |
| private static final Logger LOG = LoggerFactory.getLogger(SplitTableRegionProcedure.class); |
| private RegionInfo daughterOneRI; |
| private RegionInfo daughterTwoRI; |
| private byte[] bestSplitRow; |
| private RegionSplitPolicy splitPolicy; |
| |
| public SplitTableRegionProcedure() { |
| // Required by the Procedure framework to create the procedure on replay |
| } |
| |
| public SplitTableRegionProcedure(final MasterProcedureEnv env, |
| final RegionInfo regionToSplit, final byte[] splitRow) throws IOException { |
| super(env, regionToSplit); |
| preflightChecks(env, true); |
| // When procedure goes to run in its prepare step, it also does these checkOnline checks. Here |
| // we fail-fast on construction. There it skips the split with just a warning. |
| checkOnline(env, regionToSplit); |
| this.bestSplitRow = splitRow; |
| checkSplittable(env, regionToSplit, bestSplitRow); |
| final TableName table = regionToSplit.getTable(); |
| final long rid = getDaughterRegionIdTimestamp(regionToSplit); |
| this.daughterOneRI = RegionInfoBuilder.newBuilder(table) |
| .setStartKey(regionToSplit.getStartKey()) |
| .setEndKey(bestSplitRow) |
| .setSplit(false) |
| .setRegionId(rid) |
| .build(); |
| this.daughterTwoRI = RegionInfoBuilder.newBuilder(table) |
| .setStartKey(bestSplitRow) |
| .setEndKey(regionToSplit.getEndKey()) |
| .setSplit(false) |
| .setRegionId(rid) |
| .build(); |
| TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName()); |
| if(htd.getRegionSplitPolicyClassName() != null) { |
| // Since we don't have region reference here, creating the split policy instance without it. |
| // This can be used to invoke methods which don't require Region reference. This instantiation |
| // of a class on Master-side though it only makes sense on the RegionServer-side is |
| // for Phoenix Local Indexing. Refer HBASE-12583 for more information. |
| Class<? extends RegionSplitPolicy> clazz = |
| RegionSplitPolicy.getSplitPolicyClass(htd, env.getMasterConfiguration()); |
| this.splitPolicy = ReflectionUtils.newInstance(clazz, env.getMasterConfiguration()); |
| } |
| } |
| |
| @Override |
| protected LockState acquireLock(final MasterProcedureEnv env) { |
| if (env.getProcedureScheduler().waitRegions(this, getTableName(), getParentRegion(), |
| daughterOneRI, daughterTwoRI)) { |
| try { |
| LOG.debug(LockState.LOCK_EVENT_WAIT + " " + env.getProcedureScheduler().dumpLocks()); |
| } catch (IOException e) { |
| // Ignore, just for logging |
| } |
| return LockState.LOCK_EVENT_WAIT; |
| } |
| return LockState.LOCK_ACQUIRED; |
| } |
| |
| @Override |
| protected void releaseLock(final MasterProcedureEnv env) { |
| env.getProcedureScheduler().wakeRegions(this, getTableName(), getParentRegion(), daughterOneRI, |
| daughterTwoRI); |
| } |
| |
| public RegionInfo getDaughterOneRI() { |
| return daughterOneRI; |
| } |
| |
| public RegionInfo getDaughterTwoRI() { |
| return daughterTwoRI; |
| } |
| |
| private boolean hasBestSplitRow() { |
| return bestSplitRow != null && bestSplitRow.length > 0; |
| } |
| |
| /** |
| * Check whether the region is splittable |
| * @param env MasterProcedureEnv |
| * @param regionToSplit parent Region to be split |
| * @param splitRow if splitRow is not specified, will first try to get bestSplitRow from RS |
| */ |
| private void checkSplittable(final MasterProcedureEnv env, |
| final RegionInfo regionToSplit, final byte[] splitRow) throws IOException { |
| // Ask the remote RS if this region is splittable. |
| // If we get an IOE, report it along w/ the failure so can see why we are not splittable at |
| // this time. |
| if(regionToSplit.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { |
| throw new IllegalArgumentException("Can't invoke split on non-default regions directly"); |
| } |
| RegionStateNode node = |
| env.getAssignmentManager().getRegionStates().getRegionStateNode(getParentRegion()); |
| IOException splittableCheckIOE = null; |
| boolean splittable = false; |
| if (node != null) { |
| try { |
| GetRegionInfoResponse response; |
| if (!hasBestSplitRow()) { |
| LOG.info( |
| "{} splitKey isn't explicitly specified, will try to find a best split key from RS {}", |
| node.getRegionInfo().getRegionNameAsString(), node.getRegionLocation()); |
| response = AssignmentManagerUtil.getRegionInfoResponse(env, node.getRegionLocation(), |
| node.getRegionInfo(), true); |
| bestSplitRow = |
| response.hasBestSplitRow() ? response.getBestSplitRow().toByteArray() : null; |
| } else { |
| response = AssignmentManagerUtil.getRegionInfoResponse(env, node.getRegionLocation(), |
| node.getRegionInfo(), false); |
| } |
| splittable = response.hasSplittable() && response.getSplittable(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Splittable=" + splittable + " " + node.toShortString()); |
| } |
| } catch (IOException e) { |
| splittableCheckIOE = e; |
| } |
| } |
| |
| if (!splittable) { |
| IOException e = |
| new DoNotRetryIOException(regionToSplit.getShortNameToLog() + " NOT splittable"); |
| if (splittableCheckIOE != null) { |
| e.initCause(splittableCheckIOE); |
| } |
| throw e; |
| } |
| |
| if (bestSplitRow == null || bestSplitRow.length == 0) { |
| throw new DoNotRetryIOException("Region not splittable because bestSplitPoint = null, " + |
| "maybe table is too small for auto split. For force split, try specifying split row"); |
| } |
| |
| if (Bytes.equals(regionToSplit.getStartKey(), bestSplitRow)) { |
| throw new DoNotRetryIOException( |
| "Split row is equal to startkey: " + Bytes.toStringBinary(splitRow)); |
| } |
| |
| if (!regionToSplit.containsRow(bestSplitRow)) { |
| throw new DoNotRetryIOException("Split row is not inside region key range splitKey:" + |
| Bytes.toStringBinary(splitRow) + " region: " + regionToSplit); |
| } |
| } |
| |
| /** |
| * Calculate daughter regionid to use. |
| * @param hri Parent {@link RegionInfo} |
| * @return Daughter region id (timestamp) to use. |
| */ |
| private static long getDaughterRegionIdTimestamp(final RegionInfo hri) { |
| long rid = EnvironmentEdgeManager.currentTime(); |
| // Regionid is timestamp. Can't be less than that of parent else will insert |
| // at wrong location in hbase:meta (See HBASE-710). |
| if (rid < hri.getRegionId()) { |
| LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() + |
| " but current time here is " + rid); |
| rid = hri.getRegionId() + 1; |
| } |
| return rid; |
| } |
| |
| private void removeNonDefaultReplicas(MasterProcedureEnv env) throws IOException { |
| AssignmentManagerUtil.removeNonDefaultReplicas(env, Stream.of(getParentRegion()), |
| getRegionReplication(env)); |
| } |
| |
| private void checkClosedRegions(MasterProcedureEnv env) throws IOException { |
| // theoretically this should not happen any more after we use TRSP, but anyway let's add a check |
| // here |
| AssignmentManagerUtil.checkClosedRegion(env, getParentRegion()); |
| } |
| |
| @Override |
| protected Flow executeFromState(MasterProcedureEnv env, SplitTableRegionState state) |
| throws InterruptedException { |
| LOG.trace("{} execute state={}", this, state); |
| |
| try { |
| switch (state) { |
| case SPLIT_TABLE_REGION_PREPARE: |
| if (prepareSplitRegion(env)) { |
| setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION); |
| break; |
| } else { |
| return Flow.NO_MORE_STATE; |
| } |
| case SPLIT_TABLE_REGION_PRE_OPERATION: |
| preSplitRegion(env); |
| setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_CLOSE_PARENT_REGION); |
| break; |
| case SPLIT_TABLE_REGION_CLOSE_PARENT_REGION: |
| addChildProcedure(createUnassignProcedures(env)); |
| setNextState(SplitTableRegionState.SPLIT_TABLE_REGIONS_CHECK_CLOSED_REGIONS); |
| break; |
| case SPLIT_TABLE_REGIONS_CHECK_CLOSED_REGIONS: |
| checkClosedRegions(env); |
| setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS); |
| break; |
| case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS: |
| removeNonDefaultReplicas(env); |
| createDaughterRegions(env); |
| setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_WRITE_MAX_SEQUENCE_ID_FILE); |
| break; |
| case SPLIT_TABLE_REGION_WRITE_MAX_SEQUENCE_ID_FILE: |
| writeMaxSequenceIdFile(env); |
| setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_META); |
| break; |
| case SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_META: |
| preSplitRegionBeforeMETA(env); |
| setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_UPDATE_META); |
| break; |
| case SPLIT_TABLE_REGION_UPDATE_META: |
| updateMeta(env); |
| setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META); |
| break; |
| case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META: |
| preSplitRegionAfterMETA(env); |
| setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS); |
| break; |
| case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS: |
| addChildProcedure(createAssignProcedures(env)); |
| setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_POST_OPERATION); |
| break; |
| case SPLIT_TABLE_REGION_POST_OPERATION: |
| postSplitRegion(env); |
| return Flow.NO_MORE_STATE; |
| default: |
| throw new UnsupportedOperationException(this + " unhandled state=" + state); |
| } |
| } catch (IOException e) { |
| String msg = "Splitting " + getParentRegion().getEncodedName() + ", " + this; |
| if (!isRollbackSupported(state)) { |
| // We reach a state that cannot be rolled back. We just need to keep retrying. |
| LOG.warn(msg, e); |
| } else { |
| LOG.error(msg, e); |
| setFailure("master-split-regions", e); |
| } |
| } |
| // if split fails, need to call ((HRegion)parent).clearSplit() when it is a force split |
| return Flow.HAS_MORE_STATE; |
| } |
| |
| /** |
| * To rollback {@link SplitTableRegionProcedure}, an AssignProcedure is asynchronously |
| * submitted for parent region to be split (rollback doesn't wait on the completion of the |
| * AssignProcedure) . This can be improved by changing rollback() to support sub-procedures. |
| * See HBASE-19851 for details. |
| */ |
| @Override |
| protected void rollbackState(final MasterProcedureEnv env, final SplitTableRegionState state) |
| throws IOException, InterruptedException { |
| LOG.trace("{} rollback state={}", this, state); |
| |
| try { |
| switch (state) { |
| case SPLIT_TABLE_REGION_POST_OPERATION: |
| case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS: |
| case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META: |
| case SPLIT_TABLE_REGION_UPDATE_META: |
| // PONR |
| throw new UnsupportedOperationException(this + " unhandled state=" + state); |
| case SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_META: |
| break; |
| case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS: |
| case SPLIT_TABLE_REGION_WRITE_MAX_SEQUENCE_ID_FILE: |
| // Doing nothing, as re-open parent region would clean up daughter region directories. |
| break; |
| case SPLIT_TABLE_REGIONS_CHECK_CLOSED_REGIONS: |
| // Doing nothing, in SPLIT_TABLE_REGION_CLOSE_PARENT_REGION, |
| // we will bring parent region online |
| break; |
| case SPLIT_TABLE_REGION_CLOSE_PARENT_REGION: |
| openParentRegion(env); |
| break; |
| case SPLIT_TABLE_REGION_PRE_OPERATION: |
| postRollBackSplitRegion(env); |
| break; |
| case SPLIT_TABLE_REGION_PREPARE: |
| break; // nothing to do |
| default: |
| throw new UnsupportedOperationException(this + " unhandled state=" + state); |
| } |
| } catch (IOException e) { |
| // This will be retried. Unless there is a bug in the code, |
| // this should be just a "temporary error" (e.g. network down) |
| LOG.warn("pid=" + getProcId() + " failed rollback attempt step " + state + |
| " for splitting the region " |
| + getParentRegion().getEncodedName() + " in table " + getTableName(), e); |
| throw e; |
| } |
| } |
| |
| /* |
| * Check whether we are in the state that can be rollback |
| */ |
| @Override |
| protected boolean isRollbackSupported(final SplitTableRegionState state) { |
| switch (state) { |
| case SPLIT_TABLE_REGION_POST_OPERATION: |
| case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS: |
| case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META: |
| case SPLIT_TABLE_REGION_UPDATE_META: |
| // It is not safe to rollback if we reach to these states. |
| return false; |
| default: |
| break; |
| } |
| return true; |
| } |
| |
| @Override |
| protected SplitTableRegionState getState(final int stateId) { |
| return SplitTableRegionState.forNumber(stateId); |
| } |
| |
| @Override |
| protected int getStateId(final SplitTableRegionState state) { |
| return state.getNumber(); |
| } |
| |
| @Override |
| protected SplitTableRegionState getInitialState() { |
| return SplitTableRegionState.SPLIT_TABLE_REGION_PREPARE; |
| } |
| |
| @Override |
| protected void serializeStateData(ProcedureStateSerializer serializer) |
| throws IOException { |
| super.serializeStateData(serializer); |
| |
| final MasterProcedureProtos.SplitTableRegionStateData.Builder splitTableRegionMsg = |
| MasterProcedureProtos.SplitTableRegionStateData.newBuilder() |
| .setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser())) |
| .setParentRegionInfo(ProtobufUtil.toRegionInfo(getRegion())) |
| .addChildRegionInfo(ProtobufUtil.toRegionInfo(daughterOneRI)) |
| .addChildRegionInfo(ProtobufUtil.toRegionInfo(daughterTwoRI)); |
| serializer.serialize(splitTableRegionMsg.build()); |
| } |
| |
| @Override |
| protected void deserializeStateData(ProcedureStateSerializer serializer) |
| throws IOException { |
| super.deserializeStateData(serializer); |
| |
| final MasterProcedureProtos.SplitTableRegionStateData splitTableRegionsMsg = |
| serializer.deserialize(MasterProcedureProtos.SplitTableRegionStateData.class); |
| setUser(MasterProcedureUtil.toUserInfo(splitTableRegionsMsg.getUserInfo())); |
| setRegion(ProtobufUtil.toRegionInfo(splitTableRegionsMsg.getParentRegionInfo())); |
| assert(splitTableRegionsMsg.getChildRegionInfoCount() == 2); |
| daughterOneRI = ProtobufUtil.toRegionInfo(splitTableRegionsMsg.getChildRegionInfo(0)); |
| daughterTwoRI = ProtobufUtil.toRegionInfo(splitTableRegionsMsg.getChildRegionInfo(1)); |
| } |
| |
| @Override |
| public void toStringClassDetails(StringBuilder sb) { |
| sb.append(getClass().getSimpleName()); |
| sb.append(" table="); |
| sb.append(getTableName()); |
| sb.append(", parent="); |
| sb.append(getParentRegion().getShortNameToLog()); |
| sb.append(", daughterA="); |
| sb.append(daughterOneRI.getShortNameToLog()); |
| sb.append(", daughterB="); |
| sb.append(daughterTwoRI.getShortNameToLog()); |
| } |
| |
| private RegionInfo getParentRegion() { |
| return getRegion(); |
| } |
| |
| @Override |
| public TableOperationType getTableOperationType() { |
| return TableOperationType.REGION_SPLIT; |
| } |
| |
| @Override |
| protected ProcedureMetrics getProcedureMetrics(MasterProcedureEnv env) { |
| return env.getAssignmentManager().getAssignmentManagerMetrics().getSplitProcMetrics(); |
| } |
| |
| private byte[] getSplitRow() { |
| return daughterTwoRI.getStartKey(); |
| } |
| |
| private static final State[] EXPECTED_SPLIT_STATES = new State[] { State.OPEN, State.CLOSED }; |
| |
| /** |
| * Prepare to Split region. |
| * @param env MasterProcedureEnv |
| */ |
| public boolean prepareSplitRegion(final MasterProcedureEnv env) throws IOException { |
| // Fail if we are taking snapshot for the given table |
| if (env.getMasterServices().getSnapshotManager() |
| .isTakingSnapshot(getParentRegion().getTable())) { |
| setFailure(new IOException("Skip splitting region " + getParentRegion().getShortNameToLog() + |
| ", because we are taking snapshot for the table " + getParentRegion().getTable())); |
| return false; |
| } |
| // Check whether the region is splittable |
| RegionStateNode node = |
| env.getAssignmentManager().getRegionStates().getRegionStateNode(getParentRegion()); |
| |
| if (node == null) { |
| throw new UnknownRegionException(getParentRegion().getRegionNameAsString()); |
| } |
| |
| RegionInfo parentHRI = node.getRegionInfo(); |
| if (parentHRI == null) { |
| LOG.info("Unsplittable; parent region is null; node={}", node); |
| return false; |
| } |
| // Lookup the parent HRI state from the AM, which has the latest updated info. |
| // Protect against the case where concurrent SPLIT requests came in and succeeded |
| // just before us. |
| if (node.isInState(State.SPLIT)) { |
| LOG.info("Split of " + parentHRI + " skipped; state is already SPLIT"); |
| return false; |
| } |
| if (parentHRI.isSplit() || parentHRI.isOffline()) { |
| LOG.info("Split of " + parentHRI + " skipped because offline/split."); |
| return false; |
| } |
| |
| // expected parent to be online or closed |
| if (!node.isInState(EXPECTED_SPLIT_STATES)) { |
| // We may have SPLIT already? |
| setFailure(new IOException("Split " + parentHRI.getRegionNameAsString() + |
| " FAILED because state=" + node.getState() + "; expected " + |
| Arrays.toString(EXPECTED_SPLIT_STATES))); |
| return false; |
| } |
| |
| // Mostly this check is not used because we already check the switch before submit a split |
| // procedure. Just for safe, check the switch again. This procedure can be rollbacked if |
| // the switch was set to false after submit. |
| if (!env.getMasterServices().isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) { |
| LOG.warn("pid=" + getProcId() + " split switch is off! skip split of " + parentHRI); |
| setFailure(new IOException("Split region " + parentHRI.getRegionNameAsString() + |
| " failed due to split switch off")); |
| return false; |
| } |
| |
| if (!env.getMasterServices().getTableDescriptors().get(getTableName()).isSplitEnabled()) { |
| LOG.warn("pid={}, split is disabled for the table! Skipping split of {}", getProcId(), |
| parentHRI); |
| setFailure(new IOException("Split region " + parentHRI.getRegionNameAsString() |
| + " failed as region split is disabled for the table")); |
| return false; |
| } |
| |
| // set node state as SPLITTING |
| node.setState(State.SPLITTING); |
| |
| // Since we have the lock and the master is coordinating the operation |
| // we are always able to split the region |
| return true; |
| } |
| |
| /** |
| * Action before splitting region in a table. |
| * @param env MasterProcedureEnv |
| */ |
| private void preSplitRegion(final MasterProcedureEnv env) |
| throws IOException, InterruptedException { |
| final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); |
| if (cpHost != null) { |
| cpHost.preSplitRegionAction(getTableName(), getSplitRow(), getUser()); |
| } |
| |
| // TODO: Clean up split and merge. Currently all over the place. |
| // Notify QuotaManager and RegionNormalizer |
| try { |
| env.getMasterServices().getMasterQuotaManager().onRegionSplit(this.getParentRegion()); |
| } catch (QuotaExceededException e) { |
| // TODO: why is this here? split requests can be submitted by actors other than the normalizer |
| env.getMasterServices() |
| .getRegionNormalizerManager() |
| .planSkipped(NormalizationPlan.PlanType.SPLIT); |
| throw e; |
| } |
| } |
| |
| /** |
| * Action after rollback a split table region action. |
| * @param env MasterProcedureEnv |
| */ |
| private void postRollBackSplitRegion(final MasterProcedureEnv env) throws IOException { |
| final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); |
| if (cpHost != null) { |
| cpHost.postRollBackSplitRegionAction(getUser()); |
| } |
| } |
| |
| /** |
| * Rollback close parent region |
| */ |
| private void openParentRegion(MasterProcedureEnv env) throws IOException { |
| AssignmentManagerUtil.reopenRegionsForRollback(env, |
| Collections.singletonList((getParentRegion())), getRegionReplication(env), |
| getParentRegionServerName(env)); |
| } |
| |
| /** |
| * Create daughter regions |
| */ |
| public void createDaughterRegions(final MasterProcedureEnv env) throws IOException { |
| final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); |
| final Path tabledir = CommonFSUtils.getTableDir(mfs.getRootDir(), getTableName()); |
| final FileSystem fs = mfs.getFileSystem(); |
| HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem( |
| env.getMasterConfiguration(), fs, tabledir, getParentRegion(), false); |
| regionFs.createSplitsDir(daughterOneRI, daughterTwoRI); |
| |
| Pair<Integer, Integer> expectedReferences = splitStoreFiles(env, regionFs); |
| |
| assertReferenceFileCount(fs, expectedReferences.getFirst(), |
| regionFs.getSplitsDir(daughterOneRI)); |
| //Move the files from the temporary .splits to the final /table/region directory |
| regionFs.commitDaughterRegion(daughterOneRI); |
| assertReferenceFileCount(fs, expectedReferences.getFirst(), |
| new Path(tabledir, daughterOneRI.getEncodedName())); |
| |
| assertReferenceFileCount(fs, expectedReferences.getSecond(), |
| regionFs.getSplitsDir(daughterTwoRI)); |
| regionFs.commitDaughterRegion(daughterTwoRI); |
| assertReferenceFileCount(fs, expectedReferences.getSecond(), |
| new Path(tabledir, daughterTwoRI.getEncodedName())); |
| } |
| |
| /** |
| * Create Split directory |
| * @param env MasterProcedureEnv |
| */ |
| private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env, |
| final HRegionFileSystem regionFs) throws IOException { |
| final Configuration conf = env.getMasterConfiguration(); |
| TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName()); |
| // The following code sets up a thread pool executor with as many slots as |
| // there's files to split. It then fires up everything, waits for |
| // completion and finally checks for any exception |
| // |
| // Note: splitStoreFiles creates daughter region dirs under the parent splits dir |
| // Nothing to unroll here if failure -- re-run createSplitsDir will |
| // clean this up. |
| int nbFiles = 0; |
| final Map<String, Collection<StoreFileInfo>> files = |
| new HashMap<String, Collection<StoreFileInfo>>(htd.getColumnFamilyCount()); |
| for (ColumnFamilyDescriptor cfd : htd.getColumnFamilies()) { |
| String family = cfd.getNameAsString(); |
| Collection<StoreFileInfo> sfis = regionFs.getStoreFiles(family); |
| if (sfis == null) { |
| continue; |
| } |
| Collection<StoreFileInfo> filteredSfis = null; |
| for (StoreFileInfo sfi : sfis) { |
| // Filter. There is a lag cleaning up compacted reference files. They get cleared |
| // after a delay in case outstanding Scanners still have references. Because of this, |
| // the listing of the Store content may have straggler reference files. Skip these. |
| // It should be safe to skip references at this point because we checked above with |
| // the region if it thinks it is splittable and if we are here, it thinks it is |
| // splitable. |
| if (sfi.isReference()) { |
| LOG.info("Skipping split of " + sfi + "; presuming ready for archiving."); |
| continue; |
| } |
| if (filteredSfis == null) { |
| filteredSfis = new ArrayList<StoreFileInfo>(sfis.size()); |
| files.put(family, filteredSfis); |
| } |
| filteredSfis.add(sfi); |
| nbFiles++; |
| } |
| } |
| if (nbFiles == 0) { |
| // no file needs to be splitted. |
| return new Pair<Integer, Integer>(0, 0); |
| } |
| // Max #threads is the smaller of the number of storefiles or the default max determined above. |
| int maxThreads = Math.min( |
| conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX, |
| conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT)), |
| nbFiles); |
| LOG.info("pid=" + getProcId() + " splitting " + nbFiles + " storefiles, region=" + |
| getParentRegion().getShortNameToLog() + ", threads=" + maxThreads); |
| final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreads, |
| new ThreadFactoryBuilder().setNameFormat("StoreFileSplitter-pool-%d").setDaemon(true) |
| .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); |
| final List<Future<Pair<Path, Path>>> futures = new ArrayList<Future<Pair<Path, Path>>>(nbFiles); |
| |
| // Split each store file. |
| for (Map.Entry<String, Collection<StoreFileInfo>> e : files.entrySet()) { |
| byte[] familyName = Bytes.toBytes(e.getKey()); |
| final ColumnFamilyDescriptor hcd = htd.getColumnFamily(familyName); |
| final Collection<StoreFileInfo> storeFiles = e.getValue(); |
| if (storeFiles != null && storeFiles.size() > 0) { |
| for (StoreFileInfo storeFileInfo : storeFiles) { |
| // As this procedure is running on master, use CacheConfig.DISABLED means |
| // don't cache any block. |
| StoreFileSplitter sfs = |
| new StoreFileSplitter(regionFs, familyName, new HStoreFile( |
| storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED)); |
| futures.add(threadPool.submit(sfs)); |
| } |
| } |
| } |
| // Shutdown the pool |
| threadPool.shutdown(); |
| |
| // Wait for all the tasks to finish. |
| // When splits ran on the RegionServer, how-long-to-wait-configuration was named |
| // hbase.regionserver.fileSplitTimeout. If set, use its value. |
| long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout", |
| conf.getLong("hbase.regionserver.fileSplitTimeout", 600000)); |
| try { |
| boolean stillRunning = !threadPool.awaitTermination(fileSplitTimeout, TimeUnit.MILLISECONDS); |
| if (stillRunning) { |
| threadPool.shutdownNow(); |
| // wait for the thread to shutdown completely. |
| while (!threadPool.isTerminated()) { |
| Thread.sleep(50); |
| } |
| throw new IOException( |
| "Took too long to split the" + " files and create the references, aborting split"); |
| } |
| } catch (InterruptedException e) { |
| throw (InterruptedIOException) new InterruptedIOException().initCause(e); |
| } |
| |
| int daughterA = 0; |
| int daughterB = 0; |
| // Look for any exception |
| for (Future<Pair<Path, Path>> future : futures) { |
| try { |
| Pair<Path, Path> p = future.get(); |
| daughterA += p.getFirst() != null ? 1 : 0; |
| daughterB += p.getSecond() != null ? 1 : 0; |
| } catch (InterruptedException e) { |
| throw (InterruptedIOException) new InterruptedIOException().initCause(e); |
| } catch (ExecutionException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("pid=" + getProcId() + " split storefiles for region " + |
| getParentRegion().getShortNameToLog() + " Daughter A: " + daughterA + |
| " storefiles, Daughter B: " + daughterB + " storefiles."); |
| } |
| return new Pair<Integer, Integer>(daughterA, daughterB); |
| } |
| |
| private void assertReferenceFileCount(final FileSystem fs, final int expectedReferenceFileCount, |
| final Path dir) throws IOException { |
| if (expectedReferenceFileCount != 0 && |
| expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(fs, dir)) { |
| throw new IOException("Failing split. Expected reference file count isn't equal."); |
| } |
| } |
| |
| private Pair<Path, Path> splitStoreFile(HRegionFileSystem regionFs, byte[] family, HStoreFile sf) |
| throws IOException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("pid=" + getProcId() + " splitting started for store file: " + |
| sf.getPath() + " for region: " + getParentRegion().getShortNameToLog()); |
| } |
| |
| final byte[] splitRow = getSplitRow(); |
| final String familyName = Bytes.toString(family); |
| final Path path_first = regionFs.splitStoreFile(this.daughterOneRI, familyName, sf, splitRow, |
| false, splitPolicy); |
| final Path path_second = regionFs.splitStoreFile(this.daughterTwoRI, familyName, sf, splitRow, |
| true, splitPolicy); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("pid=" + getProcId() + " splitting complete for store file: " + |
| sf.getPath() + " for region: " + getParentRegion().getShortNameToLog()); |
| } |
| return new Pair<Path,Path>(path_first, path_second); |
| } |
| |
| /** |
| * Utility class used to do the file splitting / reference writing |
| * in parallel instead of sequentially. |
| */ |
| private class StoreFileSplitter implements Callable<Pair<Path,Path>> { |
| private final HRegionFileSystem regionFs; |
| private final byte[] family; |
| private final HStoreFile sf; |
| |
| /** |
| * Constructor that takes what it needs to split |
| * @param regionFs the file system |
| * @param family Family that contains the store file |
| * @param sf which file |
| */ |
| public StoreFileSplitter(HRegionFileSystem regionFs, byte[] family, HStoreFile sf) { |
| this.regionFs = regionFs; |
| this.sf = sf; |
| this.family = family; |
| } |
| |
| @Override |
| public Pair<Path,Path> call() throws IOException { |
| return splitStoreFile(regionFs, family, sf); |
| } |
| } |
| |
| /** |
| * Post split region actions before the Point-of-No-Return step |
| * @param env MasterProcedureEnv |
| **/ |
| private void preSplitRegionBeforeMETA(final MasterProcedureEnv env) |
| throws IOException, InterruptedException { |
| final List<Mutation> metaEntries = new ArrayList<Mutation>(); |
| final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); |
| if (cpHost != null) { |
| cpHost.preSplitBeforeMETAAction(getSplitRow(), metaEntries, getUser()); |
| try { |
| for (Mutation p : metaEntries) { |
| RegionInfo.parseRegionName(p.getRow()); |
| } |
| } catch (IOException e) { |
| LOG.error("pid=" + getProcId() + " row key of mutation from coprocessor not parsable as " |
| + "region name." |
| + "Mutations from coprocessor should only for hbase:meta table."); |
| throw e; |
| } |
| } |
| } |
| |
| /** |
| * Add daughter regions to META |
| * @param env MasterProcedureEnv |
| */ |
| private void updateMeta(final MasterProcedureEnv env) throws IOException { |
| env.getAssignmentManager().markRegionAsSplit(getParentRegion(), getParentRegionServerName(env), |
| daughterOneRI, daughterTwoRI); |
| } |
| |
| /** |
| * Pre split region actions after the Point-of-No-Return step |
| * @param env MasterProcedureEnv |
| **/ |
| private void preSplitRegionAfterMETA(final MasterProcedureEnv env) |
| throws IOException, InterruptedException { |
| final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); |
| if (cpHost != null) { |
| cpHost.preSplitAfterMETAAction(getUser()); |
| } |
| } |
| |
| /** |
| * Post split region actions |
| * @param env MasterProcedureEnv |
| **/ |
| private void postSplitRegion(final MasterProcedureEnv env) throws IOException { |
| final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); |
| if (cpHost != null) { |
| cpHost.postCompletedSplitRegionAction(daughterOneRI, daughterTwoRI, getUser()); |
| } |
| } |
| |
| private ServerName getParentRegionServerName(final MasterProcedureEnv env) { |
| return env.getMasterServices().getAssignmentManager().getRegionStates() |
| .getRegionServerOfRegion(getParentRegion()); |
| } |
| |
| private TransitRegionStateProcedure[] createUnassignProcedures(MasterProcedureEnv env) |
| throws IOException { |
| return AssignmentManagerUtil.createUnassignProceduresForSplitOrMerge(env, |
| Stream.of(getParentRegion()), getRegionReplication(env)); |
| } |
| |
| private TransitRegionStateProcedure[] createAssignProcedures(MasterProcedureEnv env) |
| throws IOException { |
| List<RegionInfo> hris = new ArrayList<RegionInfo>(2); |
| hris.add(daughterOneRI); |
| hris.add(daughterTwoRI); |
| return AssignmentManagerUtil.createAssignProceduresForOpeningNewRegions(env, hris, |
| getRegionReplication(env), getParentRegionServerName(env)); |
| } |
| |
| private int getRegionReplication(final MasterProcedureEnv env) throws IOException { |
| final TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName()); |
| return htd.getRegionReplication(); |
| } |
| |
| private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException { |
| MasterFileSystem fs = env.getMasterFileSystem(); |
| long maxSequenceId = WALSplitUtil.getMaxRegionSequenceId(env.getMasterConfiguration(), |
| getParentRegion(), fs::getFileSystem, fs::getWALFileSystem); |
| if (maxSequenceId > 0) { |
| WALSplitUtil.writeRegionSequenceIdFile(fs.getWALFileSystem(), |
| getWALRegionDir(env, daughterOneRI), maxSequenceId); |
| WALSplitUtil.writeRegionSequenceIdFile(fs.getWALFileSystem(), |
| getWALRegionDir(env, daughterTwoRI), maxSequenceId); |
| } |
| } |
| |
| @Override |
| protected boolean abort(MasterProcedureEnv env) { |
| // Abort means rollback. We can't rollback all steps. HBASE-18018 added abort to all |
| // Procedures. Here is a Procedure that has a PONR and cannot be aborted wants it enters this |
| // range of steps; what do we do for these should an operator want to cancel them? HBASE-20022. |
| return isRollbackSupported(getCurrentState())? super.abort(env): false; |
| } |
| } |