blob: 770648ee0d9009cf61b7c2f6c97ca5fba402bdeb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test.manager;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ManagerClient;
import org.apache.accumulo.core.clientImpl.TabletLocator;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletLocationState;
import org.apache.accumulo.core.spi.balancer.HostRegexTableLoadBalancer;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.miniclusterImpl.ProcessReference;
import org.apache.accumulo.server.manager.state.MetaDataTableScanner;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
public class SuspendedTabletsIT extends ConfigurableMacBase {
private static final Logger log = LoggerFactory.getLogger(SuspendedTabletsIT.class);
private static final Random RANDOM = new SecureRandom();
private static ExecutorService THREAD_POOL;
public static final int TSERVERS = 3;
public static final long SUSPEND_DURATION = 80;
public static final int TABLETS = 30;
@Override
protected int defaultTimeoutSeconds() {
return 5 * 60;
}
@Override
public void configure(MiniAccumuloConfigImpl cfg, Configuration fsConf) {
cfg.setProperty(Property.TABLE_SUSPEND_DURATION, SUSPEND_DURATION + "s");
cfg.setClientProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT, "5s");
cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
cfg.setNumTservers(TSERVERS);
// config custom balancer to keep all metadata on one server
cfg.setProperty(HostRegexTableLoadBalancer.HOST_BALANCER_PREFIX + MetadataTable.NAME, "*");
cfg.setProperty(HostRegexTableLoadBalancer.HOST_BALANCER_OOB_CHECK_KEY, "7s");
cfg.setProperty(Property.TABLE_LOAD_BALANCER.getKey(),
HostRegexTableLoadBalancer.class.getName());
}
@Test
public void crashAndResumeTserver() throws Exception {
// Run the test body. When we get to the point where we need a tserver to go away, get rid of it
// via crashing
suspensionTestBody((ctx, locs, count) -> {
List<ProcessReference> procs =
new ArrayList<>(getCluster().getProcesses().get(ServerType.TABLET_SERVER));
Collections.shuffle(procs);
for (int i = 0; i < count; ++i) {
ProcessReference pr = procs.get(i);
log.info("Crashing {}", pr.getProcess());
getCluster().killProcess(ServerType.TABLET_SERVER, pr);
}
});
}
@Test
public void shutdownAndResumeTserver() throws Exception {
// Run the test body. When we get to the point where we need tservers to go away, stop them via
// a clean shutdown.
suspensionTestBody((ctx, locs, count) -> {
Set<TServerInstance> tserverSet = new HashSet<>();
Set<TServerInstance> metadataServerSet = new HashSet<>();
TabletLocator tl = TabletLocator.getLocator(ctx, MetadataTable.ID);
for (TabletLocationState tls : locs.locationStates.values()) {
if (tls.current != null) {
// add to set of all servers
tserverSet.add(tls.current);
// get server that the current tablets metadata is on
TabletLocator.TabletLocation tab =
tl.locateTablet(ctx, tls.extent.toMetaRow(), false, false);
// add it to the set of servers with metadata
metadataServerSet
.add(new TServerInstance(tab.tablet_location, Long.valueOf(tab.tablet_session, 16)));
}
}
// remove servers with metadata on them from the list of servers to be shutdown
tserverSet.removeAll(metadataServerSet);
assertEquals("Expecting two tServers in shutdown-list", 2, tserverSet.size());
List<TServerInstance> tserversList = new ArrayList<>(tserverSet);
Collections.shuffle(tserversList, RANDOM);
for (int i1 = 0; i1 < count; ++i1) {
final String tserverName = tserversList.get(i1).getHostPortSession();
ManagerClient.executeVoid(ctx, client -> {
log.info("Sending shutdown command to {} via ManagerClientService", tserverName);
client.shutdownTabletServer(null, ctx.rpcCreds(), tserverName, false);
});
}
log.info("Waiting for tserver process{} to die", count == 1 ? "" : "es");
for (int i2 = 0; i2 < 10; ++i2) {
List<ProcessReference> deadProcs = new ArrayList<>();
for (ProcessReference pr1 : getCluster().getProcesses().get(ServerType.TABLET_SERVER)) {
Process p = pr1.getProcess();
if (!p.isAlive()) {
deadProcs.add(pr1);
}
}
for (ProcessReference pr2 : deadProcs) {
log.info("Process {} is dead, informing cluster control about this", pr2.getProcess());
getCluster().getClusterControl().killProcess(ServerType.TABLET_SERVER, pr2);
--count;
}
if (count == 0) {
return;
} else {
Thread.sleep(MILLISECONDS.convert(2, SECONDS));
}
}
throw new IllegalStateException("Tablet servers didn't die!");
});
}
/**
* Main test body for suspension tests.
*
* @param serverStopper
* callback which shuts down some tablet servers.
*/
private void suspensionTestBody(TServerKiller serverStopper) throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
ClientContext ctx = (ClientContext) client;
String tableName = getUniqueNames(1)[0];
SortedSet<Text> splitPoints = new TreeSet<>();
for (int i = 1; i < TABLETS; ++i) {
splitPoints.add(new Text("" + i));
}
log.info("Creating table " + tableName);
NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splitPoints);
ctx.tableOperations().create(tableName, ntc);
// Wait for all of the tablets to hosted ...
log.info("Waiting on hosting and balance");
TabletLocations ds;
for (ds = TabletLocations.retrieve(ctx, tableName); ds.hostedCount != TABLETS;
ds = TabletLocations.retrieve(ctx, tableName)) {
Thread.sleep(1000);
}
// ... and balanced.
ctx.instanceOperations().waitForBalance();
do {
// Give at least another 15 seconds for migrations to finish up
Thread.sleep(15000);
ds = TabletLocations.retrieve(ctx, tableName);
} while (ds.hostedCount != TABLETS);
// Pray all of our tservers have at least 1 tablet.
assertEquals(TSERVERS, ds.hosted.keySet().size());
// Kill two tablet servers hosting our tablets. This should put tablets into suspended state,
// and thus halt balancing.
TabletLocations beforeDeathState = ds;
log.info("Eliminating tablet servers");
serverStopper.eliminateTabletServers(ctx, beforeDeathState, 2);
// All tablets should be either hosted or suspended.
log.info("Waiting on suspended tablets");
do {
Thread.sleep(1000);
ds = TabletLocations.retrieve(ctx, tableName);
} while (ds.suspended.keySet().size() != 2
&& (ds.suspendedCount + ds.hostedCount) != TABLETS);
SetMultimap<HostAndPort,KeyExtent> deadTabletsByServer = ds.suspended;
// All suspended tablets should "belong" to the dead tablet servers, and should be in exactly
// the same place as before any tserver death.
for (HostAndPort server : deadTabletsByServer.keySet()) {
// Comparing pre-death, hosted tablets to suspended tablets on a server
assertEquals(beforeDeathState.hosted.get(server), deadTabletsByServer.get(server));
}
assertEquals(TABLETS, ds.hostedCount + ds.suspendedCount);
// Restart the first tablet server, making sure it ends up on the same port
HostAndPort restartedServer = deadTabletsByServer.keySet().iterator().next();
log.info("Restarting " + restartedServer);
getCluster().getClusterControl().start(ServerType.TABLET_SERVER,
Map.of(Property.TSERV_CLIENTPORT.getKey(), "" + restartedServer.getPort(),
Property.TSERV_PORTSEARCH.getKey(), "false"),
1);
// Eventually, the suspended tablets should be reassigned to the newly alive tserver.
log.info("Awaiting tablet unsuspension for tablets belonging to " + restartedServer);
while (ds.suspended.containsKey(restartedServer) || ds.assignedCount != 0) {
Thread.sleep(1000);
ds = TabletLocations.retrieve(ctx, tableName);
}
assertEquals(deadTabletsByServer.get(restartedServer), ds.hosted.get(restartedServer));
// Finally, after much longer, remaining suspended tablets should be reassigned.
log.info("Awaiting tablet reassignment for remaining tablets");
while (ds.hostedCount != TABLETS) {
Thread.sleep(1000);
ds = TabletLocations.retrieve(ctx, tableName);
}
}
}
private interface TServerKiller {
void eliminateTabletServers(ClientContext ctx, TabletLocations locs, int count)
throws Exception;
}
private static final AtomicInteger threadCounter = new AtomicInteger(0);
@BeforeClass
public static void init() {
THREAD_POOL = Executors.newCachedThreadPool(
r -> new Thread(r, "Scanning deadline thread #" + threadCounter.incrementAndGet()));
}
@AfterClass
public static void cleanup() {
THREAD_POOL.shutdownNow();
}
private static class TabletLocations {
public final Map<KeyExtent,TabletLocationState> locationStates = new HashMap<>();
public final SetMultimap<HostAndPort,KeyExtent> hosted = HashMultimap.create();
public final SetMultimap<HostAndPort,KeyExtent> suspended = HashMultimap.create();
public int hostedCount = 0;
public int assignedCount = 0;
public int suspendedCount = 0;
public static TabletLocations retrieve(final ClientContext ctx, final String tableName)
throws Exception {
int sleepTime = 200;
int remainingAttempts = 30;
while (true) {
try {
FutureTask<TabletLocations> tlsFuture = new FutureTask<>(() -> {
TabletLocations answer = new TabletLocations();
answer.scan(ctx, tableName);
return answer;
});
THREAD_POOL.submit(tlsFuture);
return tlsFuture.get(5, SECONDS);
} catch (TimeoutException ex) {
log.debug("Retrieval timed out", ex);
} catch (Exception ex) {
log.warn("Failed to scan metadata", ex);
}
sleepTime = Math.min(2 * sleepTime, 10000);
Thread.sleep(sleepTime);
--remainingAttempts;
if (remainingAttempts == 0) {
fail("Scanning of metadata failed, aborting");
}
}
}
private void scan(ClientContext ctx, String tableName) {
Map<String,String> idMap = ctx.tableOperations().tableIdMap();
String tableId = Objects.requireNonNull(idMap.get(tableName));
try (var scanner = new MetaDataTableScanner(ctx, new Range(), MetadataTable.NAME)) {
while (scanner.hasNext()) {
TabletLocationState tls = scanner.next();
if (!tls.extent.tableId().canonical().equals(tableId)) {
continue;
}
locationStates.put(tls.extent, tls);
if (tls.suspend != null) {
suspended.put(tls.suspend.server, tls.extent);
++suspendedCount;
} else if (tls.current != null) {
hosted.put(tls.current.getHostAndPort(), tls.extent);
++hostedCount;
} else if (tls.future != null) {
++assignedCount;
} else {
// unassigned case
}
}
}
}
}
}