| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.master.assignment; |
| |
| import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; |
| import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CompletableFuture; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.CoordinatedStateManager; |
| import org.apache.hadoop.hbase.ServerMetricsBuilder; |
| import org.apache.hadoop.hbase.ServerName; |
| import org.apache.hadoop.hbase.TableDescriptors; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; |
| import org.apache.hadoop.hbase.client.Connection; |
| import org.apache.hadoop.hbase.client.HConnectionTestingUtility; |
| import org.apache.hadoop.hbase.client.TableDescriptor; |
| import org.apache.hadoop.hbase.client.TableDescriptorBuilder; |
| import org.apache.hadoop.hbase.client.TableState; |
| import org.apache.hadoop.hbase.master.DummyRegionServerList; |
| import org.apache.hadoop.hbase.master.LoadBalancer; |
| import org.apache.hadoop.hbase.master.MasterFileSystem; |
| import org.apache.hadoop.hbase.master.MasterServices; |
| import org.apache.hadoop.hbase.master.MasterWalManager; |
| import org.apache.hadoop.hbase.master.MockNoopMasterServices; |
| import org.apache.hadoop.hbase.master.ServerManager; |
| import org.apache.hadoop.hbase.master.SplitWALManager; |
| import org.apache.hadoop.hbase.master.TableStateManager; |
| import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; |
| import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; |
| import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; |
| import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; |
| import org.apache.hadoop.hbase.master.region.MasterRegion; |
| import org.apache.hadoop.hbase.master.region.MasterRegionFactory; |
| import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; |
| import org.apache.hadoop.hbase.procedure2.ProcedureEvent; |
| import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; |
| import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; |
| import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; |
| import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; |
| import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; |
| import org.apache.hadoop.hbase.replication.ReplicationException; |
| import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; |
| import org.apache.hadoop.hbase.security.Superusers; |
| import org.apache.hadoop.hbase.util.CommonFSUtils; |
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; |
| import org.apache.zookeeper.KeeperException; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; |
| |
| import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; |
| |
| /** |
| * A mocked master services. Tries to fake it. May not always work. |
| */ |
| public class MockMasterServices extends MockNoopMasterServices { |
| private final MasterFileSystem fileSystemManager; |
| private final MasterWalManager walManager; |
| private final SplitWALManager splitWALManager; |
| private final AssignmentManager assignmentManager; |
| private final TableStateManager tableStateManager; |
| private final MasterRegion masterRegion; |
| |
| private MasterProcedureEnv procedureEnv; |
| private ProcedureExecutor<MasterProcedureEnv> procedureExecutor; |
| private ProcedureStore procedureStore; |
| private final Connection connection; |
| private final LoadBalancer balancer; |
| private final ServerManager serverManager; |
| private final ReplicationPeerManager rpm; |
| |
| private final ProcedureEvent<?> initialized = new ProcedureEvent<>("master initialized"); |
| public static final String DEFAULT_COLUMN_FAMILY_NAME = "cf"; |
| public static final ServerName MOCK_MASTER_SERVERNAME = |
| ServerName.valueOf("mockmaster.example.org", 1234, -1L); |
| |
| public MockMasterServices(Configuration conf) throws IOException, ReplicationException { |
| super(conf); |
| Superusers.initialize(conf); |
| this.fileSystemManager = new MasterFileSystem(conf); |
| this.walManager = new MasterWalManager(this); |
| this.splitWALManager = |
| conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK) |
| ? null |
| : new SplitWALManager(this); |
| this.masterRegion = MasterRegionFactory.create(this); |
| // Mock an AM. |
| this.assignmentManager = |
| new AssignmentManager(this, masterRegion, new MockRegionStateStore(this, masterRegion)); |
| this.balancer = LoadBalancerFactory.getLoadBalancer(conf); |
| this.serverManager = new ServerManager(this, new DummyRegionServerList()); |
| this.tableStateManager = mock(TableStateManager.class); |
| when(this.tableStateManager.getTableState(any())).thenReturn(new TableState( |
| TableName.valueOf("AnyTableNameSetInMockMasterServcies"), TableState.State.ENABLED)); |
| |
| // Mock up a Client Interface |
| ClientProtos.ClientService.BlockingInterface ri = |
| mock(ClientProtos.ClientService.BlockingInterface.class); |
| MutateResponse.Builder builder = MutateResponse.newBuilder(); |
| builder.setProcessed(true); |
| try { |
| when(ri.mutate(any(), any())).thenReturn(builder.build()); |
| } catch (ServiceException se) { |
| throw ProtobufUtil.handleRemoteException(se); |
| } |
| try { |
| when(ri.multi(any(), any())).thenAnswer(new Answer<MultiResponse>() { |
| @Override |
| public MultiResponse answer(InvocationOnMock invocation) throws Throwable { |
| return buildMultiResponse(invocation.getArgument(1)); |
| } |
| }); |
| } catch (ServiceException se) { |
| throw ProtobufUtil.getRemoteException(se); |
| } |
| this.connection = HConnectionTestingUtility.getMockedConnection(getConfiguration()); |
| // Set hbase.rootdir into test dir. |
| Path rootdir = CommonFSUtils.getRootDir(getConfiguration()); |
| CommonFSUtils.setRootDir(getConfiguration(), rootdir); |
| this.rpm = mock(ReplicationPeerManager.class); |
| ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class); |
| when(rqs.listAllQueueIds(any(ServerName.class))).thenReturn(Collections.emptyList()); |
| when(rpm.getQueueStorage()).thenReturn(rqs); |
| } |
| |
| public void start(final int numServes, final RSProcedureDispatcher remoteDispatcher) |
| throws IOException, KeeperException { |
| startProcedureExecutor(remoteDispatcher); |
| this.assignmentManager.start(); |
| for (int i = 0; i < numServes; ++i) { |
| ServerName sn = ServerName.valueOf("localhost", 100 + i, 1); |
| serverManager.regionServerReport(sn, ServerMetricsBuilder.newBuilder(sn) |
| .setLastReportTimestamp(EnvironmentEdgeManager.currentTime()).build()); |
| } |
| this.procedureExecutor.getEnvironment().setEventReady(initialized, true); |
| } |
| |
| /** |
| * Call this restart method only after running MockMasterServices#start() The RSs can be |
| * differentiated by the port number, see ServerName in MockMasterServices#start() method above. |
| * Restart of region server will have new startcode in server name |
| * @param serverName Server name to be restarted |
| */ |
| public void restartRegionServer(ServerName serverName) throws IOException { |
| List<ServerName> onlineServers = serverManager.getOnlineServersList(); |
| long startCode = -1; |
| for (ServerName s : onlineServers) { |
| if (s.getAddress().equals(serverName.getAddress())) { |
| startCode = s.getStartcode() + 1; |
| break; |
| } |
| } |
| if (startCode == -1) { |
| return; |
| } |
| ServerName sn = ServerName.valueOf(serverName.getAddress().toString(), startCode); |
| serverManager.regionServerReport(sn, ServerMetricsBuilder.newBuilder(sn) |
| .setLastReportTimestamp(EnvironmentEdgeManager.currentTime()).build()); |
| } |
| |
| @Override |
| public void stop(String why) { |
| stopProcedureExecutor(); |
| this.assignmentManager.stop(); |
| } |
| |
| private void startProcedureExecutor(final RSProcedureDispatcher remoteDispatcher) |
| throws IOException { |
| final Configuration conf = getConfiguration(); |
| this.procedureStore = new NoopProcedureStore(); |
| this.procedureStore.registerListener(new ProcedureStoreListener() { |
| |
| @Override |
| public void abortProcess() { |
| abort("The Procedure Store lost the lease", null); |
| } |
| }); |
| |
| this.procedureEnv = new MasterProcedureEnv(this, |
| remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this)); |
| |
| this.procedureExecutor = new ProcedureExecutor<>(conf, procedureEnv, procedureStore, |
| procedureEnv.getProcedureScheduler()); |
| |
| final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, |
| Math.max(Runtime.getRuntime().availableProcessors(), |
| MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS)); |
| final boolean abortOnCorruption = |
| conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION, |
| MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); |
| this.procedureStore.start(numThreads); |
| ProcedureTestingUtility.initAndStartWorkers(procedureExecutor, numThreads, abortOnCorruption); |
| this.procedureEnv.getRemoteDispatcher().start(); |
| } |
| |
| private void stopProcedureExecutor() { |
| if (this.procedureEnv != null) { |
| this.procedureEnv.getRemoteDispatcher().stop(); |
| } |
| |
| if (this.procedureExecutor != null) { |
| this.procedureExecutor.stop(); |
| } |
| |
| if (this.procedureStore != null) { |
| this.procedureStore.stop(isAborted()); |
| } |
| } |
| |
| @Override |
| public boolean isInitialized() { |
| return true; |
| } |
| |
| @Override |
| public ProcedureEvent<?> getInitializedEvent() { |
| return this.initialized; |
| } |
| |
| @Override |
| public MasterFileSystem getMasterFileSystem() { |
| return fileSystemManager; |
| } |
| |
| @Override |
| public MasterWalManager getMasterWalManager() { |
| return walManager; |
| } |
| |
| @Override |
| public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() { |
| return procedureExecutor; |
| } |
| |
| @Override |
| public LoadBalancer getLoadBalancer() { |
| return balancer; |
| } |
| |
| @Override |
| public ServerManager getServerManager() { |
| return serverManager; |
| } |
| |
| @Override |
| public AssignmentManager getAssignmentManager() { |
| return assignmentManager; |
| } |
| |
| @Override |
| public TableStateManager getTableStateManager() { |
| return tableStateManager; |
| } |
| |
| @Override |
| public Connection getConnection() { |
| return this.connection; |
| } |
| |
| @Override |
| public ServerName getServerName() { |
| return MOCK_MASTER_SERVERNAME; |
| } |
| |
| @Override |
| public CoordinatedStateManager getCoordinatedStateManager() { |
| return super.getCoordinatedStateManager(); |
| } |
| |
| private static class MockRegionStateStore extends RegionStateStore { |
| public MockRegionStateStore(MasterServices master, MasterRegion masterRegion) { |
| super(master, masterRegion); |
| } |
| |
| @Override |
| public CompletableFuture<Void> updateRegionLocation(RegionStateNode regionNode) { |
| return CompletableFuture.completedFuture(null); |
| } |
| } |
| |
| @Override |
| public TableDescriptors getTableDescriptors() { |
| return new TableDescriptors() { |
| @Override |
| public TableDescriptor remove(TableName tablename) throws IOException { |
| // noop |
| return null; |
| } |
| |
| @Override |
| public Map<String, TableDescriptor> getAll() throws IOException { |
| // noop |
| return null; |
| } |
| |
| @Override |
| public TableDescriptor get(TableName tablename) throws IOException { |
| TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tablename); |
| builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(DEFAULT_COLUMN_FAMILY_NAME)); |
| return builder.build(); |
| } |
| |
| @Override |
| public Map<String, TableDescriptor> getByNamespace(String name) throws IOException { |
| return null; |
| } |
| |
| @Override |
| public void update(TableDescriptor htd, boolean cacheOnly) throws IOException { |
| // noop |
| } |
| }; |
| } |
| |
| private static MultiResponse buildMultiResponse(MultiRequest req) { |
| MultiResponse.Builder builder = MultiResponse.newBuilder(); |
| RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); |
| ResultOrException.Builder roeBuilder = ResultOrException.newBuilder(); |
| for (RegionAction regionAction : req.getRegionActionList()) { |
| regionActionResultBuilder.clear(); |
| for (ClientProtos.Action action : regionAction.getActionList()) { |
| roeBuilder.clear(); |
| roeBuilder.setResult(ClientProtos.Result.getDefaultInstance()); |
| roeBuilder.setIndex(action.getIndex()); |
| regionActionResultBuilder.addResultOrException(roeBuilder.build()); |
| } |
| builder.addRegionActionResult(regionActionResultBuilder.build()); |
| } |
| return builder.build(); |
| } |
| |
| @Override |
| public SplitWALManager getSplitWALManager() { |
| return splitWALManager; |
| } |
| |
| @Override |
| public ReplicationPeerManager getReplicationPeerManager() { |
| return rpm; |
| } |
| } |