blob: a90b5aace3b93bb217f0a527f102e79ef11ea8dc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test.functional;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.Locations;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TabletHostingGoal;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ClientTabletCache;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.manager.state.TabletManagement;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
import org.apache.accumulo.core.metadata.schema.TabletOperationId;
import org.apache.accumulo.core.metadata.schema.TabletOperationType;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.spi.ondemand.DefaultOnDemandTabletUnloader;
import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.server.manager.state.TabletManagementScanner;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import com.google.common.collect.Iterables;
import com.google.common.net.HostAndPort;
public class ManagerAssignmentIT extends SharedMiniClusterBase {
@Override
protected Duration defaultTimeout() {
return Duration.ofMinutes(2);
}
@BeforeAll
public static void beforeAll() throws Exception {
SharedMiniClusterBase.startMiniClusterWithConfig((cfg, core) -> {
cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1);
cfg.setProperty(Property.TSERV_ASSIGNMENT_MAXCONCURRENT, "10");
cfg.setProperty(Property.GENERAL_THREADPOOL_SIZE, "10");
cfg.setProperty(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL, "5s");
cfg.setProperty(Property.TSERV_ONDEMAND_UNLOADER_INTERVAL, "10s");
cfg.setProperty(DefaultOnDemandTabletUnloader.INACTIVITY_THRESHOLD, "15");
});
}
@BeforeEach
public void before() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
if (client.instanceOperations().getTabletServers().size() == 0) {
// There are a couple of tests in this class that kill tservers without
// clearing the list of processes for them. Calling stopAllServers in this
// case should clear out the list of processes. Then start the tablet servers.
getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
}
}
}
@Test
public void test() throws Exception {
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
// Confirm that the root and metadata tables are hosted
Locations rootLocations =
c.tableOperations().locate(RootTable.NAME, Collections.singletonList(new Range()));
rootLocations.groupByTablet().keySet()
.forEach(tid -> assertNotNull(rootLocations.getTabletLocation(tid)));
Locations metadataLocations =
c.tableOperations().locate(MetadataTable.NAME, Collections.singletonList(new Range()));
metadataLocations.groupByTablet().keySet()
.forEach(tid -> assertNotNull(metadataLocations.getTabletLocation(tid)));
String tableName = super.getUniqueNames(1)[0];
c.tableOperations().create(tableName);
String tableId = c.tableOperations().tableIdMap().get(tableName);
// wait for the tablet to exist in the metadata table. The tablet
// will not be hosted so the current location will be empty.
Wait.waitFor(
() -> getManagerTabletInfo(c, tableId, null).getTabletMetadata().getExtent() != null,
10000, 250);
TabletMetadata newTablet = getManagerTabletInfo(c, tableId, null).getTabletMetadata();
assertNotNull(newTablet.getExtent());
assertFalse(newTablet.hasCurrent());
assertNull(newTablet.getLast());
assertNull(newTablet.getLocation());
assertEquals(TabletHostingGoal.ONDEMAND, newTablet.getHostingGoal());
// calling the batch writer will cause the tablet to be hosted
try (BatchWriter bw = c.createBatchWriter(tableName)) {
Mutation m = new Mutation("a");
m.put("b", "c", "d");
bw.addMutation(m);
}
// give it a last location
c.tableOperations().flush(tableName, null, null, true);
TabletMetadata flushed = getManagerTabletInfo(c, tableId, null).getTabletMetadata();
assertTrue(flushed.hasCurrent());
assertNotNull(flushed.getLocation());
assertEquals(flushed.getLocation().getHostPort(), flushed.getLast().getHostPort());
assertFalse(flushed.getLocation().getType().equals(LocationType.FUTURE));
assertEquals(TabletHostingGoal.ONDEMAND, flushed.getHostingGoal());
// take the tablet offline
c.tableOperations().offline(tableName, true);
TabletMetadata offline = getManagerTabletInfo(c, tableId, null).getTabletMetadata();
assertFalse(offline.hasCurrent());
assertNull(offline.getLocation());
assertEquals(flushed.getLocation().getHostPort(), offline.getLast().getHostPort());
assertEquals(TabletHostingGoal.ONDEMAND, offline.getHostingGoal());
// put it back online
c.tableOperations().online(tableName, true);
TabletMetadata online = getManagerTabletInfo(c, tableId, null).getTabletMetadata();
assertTrue(online.hasCurrent());
assertNotNull(online.getLocation());
assertEquals(online.getLocation().getHostPort(), online.getLast().getHostPort());
assertEquals(TabletHostingGoal.ONDEMAND, online.getHostingGoal());
// set the hosting goal to always
c.tableOperations().setTabletHostingGoal(tableName, new Range(), TabletHostingGoal.ALWAYS);
Predicate<TabletMetadata> alwaysHostedOrCurrentNotNull =
t -> (t.getHostingGoal() == TabletHostingGoal.ALWAYS && t.hasCurrent());
Wait.waitFor(() -> alwaysHostedOrCurrentNotNull
.test(getManagerTabletInfo(c, tableId, null).getTabletMetadata()), 60000, 250);
final TabletMetadata always = getManagerTabletInfo(c, tableId, null).getTabletMetadata();
assertTrue(alwaysHostedOrCurrentNotNull.test(always));
assertTrue(always.hasCurrent());
assertEquals(flushed.getLocation().getHostPort(), always.getLast().getHostPort());
assertEquals(TabletHostingGoal.ALWAYS, always.getHostingGoal());
// set the hosting goal to never
c.tableOperations().setTabletHostingGoal(tableName, new Range(), TabletHostingGoal.NEVER);
Predicate<TabletMetadata> neverHostedOrCurrentNull =
t -> (t.getHostingGoal() == TabletHostingGoal.NEVER && !t.hasCurrent());
Wait.waitFor(() -> neverHostedOrCurrentNull
.test(getManagerTabletInfo(c, tableId, null).getTabletMetadata()), 60000, 250);
final TabletMetadata never = getManagerTabletInfo(c, tableId, null).getTabletMetadata();
assertTrue(neverHostedOrCurrentNull.test(never));
assertNull(never.getLocation());
assertEquals(flushed.getLocation().getHostPort(), never.getLast().getHostPort());
assertEquals(TabletHostingGoal.NEVER, never.getHostingGoal());
// set the hosting goal to ondemand
c.tableOperations().setTabletHostingGoal(tableName, new Range(), TabletHostingGoal.ONDEMAND);
Predicate<TabletMetadata> ondemandHosted =
t -> t.getHostingGoal() == TabletHostingGoal.ONDEMAND;
Wait.waitFor(
() -> ondemandHosted.test(getManagerTabletInfo(c, tableId, null).getTabletMetadata()),
60000, 250);
final TabletMetadata ondemand = getManagerTabletInfo(c, tableId, null).getTabletMetadata();
assertTrue(ondemandHosted.test(ondemand));
assertNull(ondemand.getLocation());
assertEquals(flushed.getLocation().getHostPort(), ondemand.getLast().getHostPort());
assertEquals(TabletHostingGoal.ONDEMAND, ondemand.getHostingGoal());
}
}
private String prepTableForScanTest(AccumuloClient c, String tableName) throws Exception {
TreeSet<Text> splits = new TreeSet<>();
splits.add(new Text("f"));
splits.add(new Text("m"));
splits.add(new Text("t"));
NewTableConfiguration ntc = new NewTableConfiguration();
ntc.withSplits(splits);
c.tableOperations().create(tableName, ntc);
String tableId = c.tableOperations().tableIdMap().get(tableName);
// The initial set of tablets should be unassigned
Wait.waitFor(() -> getTabletStats(c, tableId).isEmpty(), 60000, 50);
assertEquals(0, ClientTabletCache.getInstance((ClientContext) c, TableId.of(tableId))
.getTabletHostingRequestCount());
// loading data will force the tablets to be hosted
loadDataForScan(c, tableName);
assertTrue(ClientTabletCache.getInstance((ClientContext) c, TableId.of(tableId))
.getTabletHostingRequestCount() > 0);
Wait.waitFor(() -> getTabletStats(c, tableId).size() == 4, 60000, 50);
// offline table to force unassign tablets without having to wait for the tablet unloader
c.tableOperations().offline(tableName, true);
c.tableOperations().clearLocatorCache(tableName);
// online the table again, confirm still no tablets hosted
c.tableOperations().online(tableName, true);
Wait.waitFor(() -> getTabletStats(c, tableId).isEmpty(), 60000, 50);
assertEquals(0, ClientTabletCache.getInstance((ClientContext) c, TableId.of(tableId))
.getTabletHostingRequestCount());
return tableId;
}
@Test
public void testScannerAssignsOneOnDemandTablets() throws Exception {
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
String tableName = super.getUniqueNames(1)[0];
String tableId = prepTableForScanTest(c, tableName);
Range scanRange = new Range("a", "c");
Scanner s = c.createScanner(tableName);
s.setRange(scanRange);
// Should return keys for a, b, c
assertEquals(3, Iterables.size(s));
List<TabletStats> stats = getTabletStats(c, tableId);
// There should be one tablet online
assertEquals(1, stats.size());
assertTrue(ClientTabletCache.getInstance((ClientContext) c, TableId.of(tableId))
.getTabletHostingRequestCount() > 0);
}
}
@Test
public void testScannerAssignsMultipleOnDemandTablets() throws Exception {
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
String tableName = super.getUniqueNames(1)[0];
String tableId = prepTableForScanTest(c, tableName);
try (Scanner s = c.createScanner(tableName)) {
s.setRange(new Range("a", "s"));
assertEquals(19, Iterables.size(s));
}
List<TabletStats> stats = getTabletStats(c, tableId);
assertEquals(3, stats.size());
long hostingRequestCount = ClientTabletCache
.getInstance((ClientContext) c, TableId.of(tableId)).getTabletHostingRequestCount();
assertTrue(hostingRequestCount > 0);
// Run another scan, the t tablet should get loaded
// all others should be loaded.
try (Scanner s = c.createScanner(tableName)) {
s.setRange(new Range("a", "t"));
assertEquals(20, Iterables.size(s));
}
stats = getTabletStats(c, tableId);
assertEquals(3, stats.size());
// No more tablets should have been brought online
assertEquals(hostingRequestCount, ClientTabletCache
.getInstance((ClientContext) c, TableId.of(tableId)).getTabletHostingRequestCount());
}
}
@Test
public void testBatchScannerAssignsOneOnDemandTablets() throws Exception {
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
String tableName = super.getUniqueNames(1)[0];
String tableId = prepTableForScanTest(c, tableName);
try (BatchScanner s = c.createBatchScanner(tableName)) {
s.setRanges(List.of(new Range("a", "c")));
// Should return keys for a, b, c
assertEquals(3, Iterables.size(s));
}
List<TabletStats> stats = getTabletStats(c, tableId);
// There should be one tablet online
assertEquals(1, stats.size());
assertTrue(ClientTabletCache.getInstance((ClientContext) c, TableId.of(tableId))
.getTabletHostingRequestCount() > 0);
}
}
@Test
public void testBatchScannerAssignsMultipleOnDemandTablets() throws Exception {
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
String tableName = super.getUniqueNames(1)[0];
String tableId = prepTableForScanTest(c, tableName);
try (BatchScanner s = c.createBatchScanner(tableName)) {
s.setRanges(List.of(new Range("a", "s")));
assertEquals(19, Iterables.size(s));
}
List<TabletStats> stats = getTabletStats(c, tableId);
assertEquals(3, stats.size());
long hostingRequestCount = ClientTabletCache
.getInstance((ClientContext) c, TableId.of(tableId)).getTabletHostingRequestCount();
assertTrue(hostingRequestCount > 0);
// Run another scan, all tablets should be loaded
try (BatchScanner s = c.createBatchScanner(tableName)) {
s.setRanges(List.of(new Range("a", "t")));
assertEquals(20, Iterables.size(s));
}
stats = getTabletStats(c, tableId);
assertEquals(3, stats.size());
// No more tablets should have been brought online
assertEquals(hostingRequestCount, ClientTabletCache
.getInstance((ClientContext) c, TableId.of(tableId)).getTabletHostingRequestCount());
}
}
@Test
public void testBatchWriterAssignsTablets() throws Exception {
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
String tableName = super.getUniqueNames(1)[0];
prepTableForScanTest(c, tableName);
}
}
@Test
public void testOpidPreventsAssignment() throws Exception {
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
String tableName = super.getUniqueNames(1)[0];
var tableId = TableId.of(prepTableForScanTest(c, tableName));
assertEquals(0, countTabletsWithLocation(c, tableId));
assertEquals(Set.of("f", "m", "t"), c.tableOperations().listSplits(tableName).stream()
.map(Text::toString).collect(Collectors.toSet()));
c.securityOperations().grantTablePermission(getPrincipal(), MetadataTable.NAME,
TablePermission.WRITE);
// Set the OperationId on one tablet, which will cause that tablet
// to not be assigned
try (var writer = c.createBatchWriter(MetadataTable.NAME)) {
var extent = new KeyExtent(tableId, new Text("m"), new Text("f"));
var opid = TabletOperationId.from(TabletOperationType.SPLITTING, 42L);
Mutation m = new Mutation(extent.toMetaRow());
TabletsSection.ServerColumnFamily.OPID_COLUMN.put(m, new Value(opid.canonical()));
writer.addMutation(m);
}
// Host all tablets.
c.tableOperations().setTabletHostingGoal(tableName, new Range(), TabletHostingGoal.ALWAYS);
Wait.waitFor(() -> countTabletsWithLocation(c, tableId) == 3);
var ample = ((ClientContext) c).getAmple();
assertNull(
ample.readTablet(new KeyExtent(tableId, new Text("m"), new Text("f"))).getLocation());
// Delete the OperationId column, tablet should be assigned
try (var writer = c.createBatchWriter(MetadataTable.NAME)) {
var extent = new KeyExtent(tableId, new Text("m"), new Text("f"));
Mutation m = new Mutation(extent.toMetaRow());
TabletsSection.ServerColumnFamily.OPID_COLUMN.putDelete(m);
writer.addMutation(m);
}
Wait.waitFor(() -> countTabletsWithLocation(c, tableId) == 4);
// Set the OperationId on one tablet, which will cause that tablet
// to be unhosted
try (var writer = c.createBatchWriter(MetadataTable.NAME)) {
var extent = new KeyExtent(tableId, new Text("m"), new Text("f"));
var opid = TabletOperationId.from(TabletOperationType.SPLITTING, 42L);
Mutation m = new Mutation(extent.toMetaRow());
TabletsSection.ServerColumnFamily.OPID_COLUMN.put(m, new Value(opid.canonical()));
writer.addMutation(m);
}
// there are four tablets, three should be assigned as one has a OperationId
Wait.waitFor(() -> countTabletsWithLocation(c, tableId) == 3);
assertNull(
ample.readTablet(new KeyExtent(tableId, new Text("m"), new Text("f"))).getLocation());
// Delete the OperationId column, tablet should be assigned again
try (var writer = c.createBatchWriter(MetadataTable.NAME)) {
var extent = new KeyExtent(tableId, new Text("m"), new Text("f"));
Mutation m = new Mutation(extent.toMetaRow());
TabletsSection.ServerColumnFamily.OPID_COLUMN.putDelete(m);
writer.addMutation(m);
}
// after the operation id is deleted the tablet should be assigned
Wait.waitFor(() -> countTabletsWithLocation(c, tableId) == 4);
}
}
public static void loadDataForScan(AccumuloClient c, String tableName)
throws MutationsRejectedException, TableNotFoundException {
final byte[] empty = new byte[0];
try (BatchWriter bw = c.createBatchWriter(tableName)) {
IntStream.range(97, 122).forEach((i) -> {
try {
Mutation m = new Mutation(String.valueOf((char) i));
m.put(empty, empty, empty);
bw.addMutation(m);
} catch (MutationsRejectedException e) {
fail("Error inserting data", e);
}
});
}
}
public static Ample getAmple(AccumuloClient c) {
return ((ClientContext) c).getAmple();
}
public static long countTabletsWithLocation(AccumuloClient c, TableId tableId) {
return getAmple(c).readTablets().forTable(tableId).fetch(TabletMetadata.ColumnType.LOCATION)
.build().stream().filter(tabletMetadata -> tabletMetadata.getLocation() != null).count();
}
public static List<TabletStats> getTabletStats(AccumuloClient c, String tableId)
throws AccumuloException, AccumuloSecurityException {
return ThriftClientTypes.TABLET_SERVER.execute((ClientContext) c, client -> client
.getTabletStats(TraceUtil.traceInfo(), ((ClientContext) c).rpcCreds(), tableId));
}
@Test
public void testShutdownOnlyTServerWithUserTable() throws Exception {
// 2 TabletServers started for this test, shut them down so we only have 1.
getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
getCluster().getClusterControl().start(ServerType.TABLET_SERVER, Collections.emptyMap(), 1);
String tableName = getUniqueNames(1)[0];
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
Wait.waitFor(() -> client.instanceOperations().getTabletServers().size() == 1,
SECONDS.toMillis(60), SECONDS.toMillis(2));
client.tableOperations().create(tableName);
TableId tid = TableId.of(client.tableOperations().tableIdMap().get(tableName));
// wait for everything to be hosted and balanced
client.instanceOperations().waitForBalance();
try (var writer = client.createBatchWriter(tableName)) {
for (int i = 0; i < 1000000; i++) {
Mutation m = new Mutation(String.format("%08d", i));
m.put("", "", "");
writer.addMutation(m);
}
}
client.tableOperations().flush(tableName, null, null, true);
final CountDownLatch latch = new CountDownLatch(10);
Runnable task = () -> {
while (true) {
try (var scanner = new IsolatedScanner(client.createScanner(tableName))) {
// TODO maybe do not close scanner? The following limit was placed on the stream to
// avoid reading all the data possibly leaving a scan session active on the tserver
AtomicInteger count = new AtomicInteger(0);
scanner.forEach(e -> {
// let the test thread know that this thread has read some data
if (count.incrementAndGet() == 1_000) {
latch.countDown();
}
});
} catch (Exception e) {
e.printStackTrace();
break;
}
}
};
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
service.execute(task);
}
// Wait until all threads are reading some data
latch.await();
// getClusterControl().stopAllServers(ServerType.TABLET_SERVER)
// could potentially send a kill -9 to the process. Shut the tablet
// servers down in a more graceful way.
final Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
ClientTabletCache.getInstance((ClientContext) client, tid).binRanges((ClientContext) client,
Collections.singletonList(TabletsSection.getRange()), binnedRanges);
binnedRanges.keySet().forEach((location) -> {
HostAndPort address = HostAndPort.fromString(location);
String addressWithSession = address.toString();
var zLockPath = ServiceLock.path(getCluster().getServerContext().getZooKeeperRoot()
+ Constants.ZTSERVERS + "/" + address);
long sessionId =
ServiceLock.getSessionId(getCluster().getServerContext().getZooCache(), zLockPath);
if (sessionId != 0) {
addressWithSession = address + "[" + Long.toHexString(sessionId) + "]";
}
final String finalAddress = addressWithSession;
System.out.println("Attempting to shutdown TabletServer at: " + address);
try {
ThriftClientTypes.MANAGER.executeVoid((ClientContext) client,
c -> c.shutdownTabletServer(TraceUtil.traceInfo(),
getCluster().getServerContext().rpcCreds(), finalAddress, false));
} catch (AccumuloException | AccumuloSecurityException e) {
fail("Error shutting down TabletServer", e);
}
});
Wait.waitFor(() -> client.instanceOperations().getTabletServers().size() == 0);
}
}
@Test
public void testShutdownOnlyTServerWithoutUserTable() throws Exception {
// 2 TabletServers started for this test, shut them down so we only have 1.
getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
getCluster().getClusterControl().start(ServerType.TABLET_SERVER, Collections.emptyMap(), 1);
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
Wait.waitFor(() -> client.instanceOperations().getTabletServers().size() == 1,
SECONDS.toMillis(60), SECONDS.toMillis(2));
client.instanceOperations().waitForBalance();
// getClusterControl().stopAllServers(ServerType.TABLET_SERVER)
// could potentially send a kill -9 to the process. Shut the tablet
// servers down in a more graceful way.
Locations locs = client.tableOperations().locate(RootTable.NAME,
Collections.singletonList(TabletsSection.getRange()));
locs.groupByTablet().keySet().stream().map(locs::getTabletLocation).forEach(location -> {
HostAndPort address = HostAndPort.fromString(location);
String addressWithSession = address.toString();
var zLockPath = ServiceLock.path(getCluster().getServerContext().getZooKeeperRoot()
+ Constants.ZTSERVERS + "/" + address);
long sessionId =
ServiceLock.getSessionId(getCluster().getServerContext().getZooCache(), zLockPath);
if (sessionId != 0) {
addressWithSession = address + "[" + Long.toHexString(sessionId) + "]";
}
final String finalAddress = addressWithSession;
System.out.println("Attempting to shutdown TabletServer at: " + address);
try {
ThriftClientTypes.MANAGER.executeVoid((ClientContext) client,
c -> c.shutdownTabletServer(TraceUtil.traceInfo(),
getCluster().getServerContext().rpcCreds(), finalAddress, false));
} catch (AccumuloException | AccumuloSecurityException e) {
fail("Error shutting down TabletServer", e);
}
});
Wait.waitFor(() -> client.instanceOperations().getTabletServers().size() == 0);
}
}
public static TabletManagement getManagerTabletInfo(AccumuloClient c, String tableId,
Text endRow) {
try (TabletManagementScanner s = new TabletManagementScanner((ClientContext) c,
new Range(TabletsSection.encodeRow(TableId.of(tableId), endRow)), MetadataTable.NAME)) {
return s.next();
}
}
}