blob: 37081b954e62cd9f0a5b1f967f325a2b9c4b6468 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.Consumer;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.DiskUsage;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.init.Initialize;
import org.apache.accumulo.server.log.WalStateManager;
import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
import org.apache.accumulo.server.log.WalStateManager.WalState;
import org.apache.accumulo.server.util.Admin;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.Text;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.junit.Test;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
public class VolumeIT extends ConfigurableMacBase {
private File volDirBase;
private Path v1, v2, v3;
private List<String> expected = new ArrayList<>();
@Override
protected int defaultTimeoutSeconds() {
return 10 * 60;
}
@Override
public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
File baseDir = cfg.getDir();
volDirBase = new File(baseDir, "volumes");
File v1f = new File(volDirBase, "v1");
File v2f = new File(volDirBase, "v2");
v1 = new Path("file://" + v1f.getAbsolutePath());
v2 = new Path("file://" + v2f.getAbsolutePath());
File v3f = new File(volDirBase, "v3");
v3 = new Path("file://" + v3f.getAbsolutePath());
// setup expected rows
for (int i = 0; i < 100; i++) {
String row = String.format("%06d", i * 100 + 3);
expected.add(row + ":cf1:cq1:1");
}
// Run MAC on two locations in the local file system
cfg.setProperty(Property.INSTANCE_VOLUMES, v1 + "," + v2);
cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
cfg.setClientProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT.getKey(), "15s");
// use raw local file system so walogs sync and flush will work
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
super.configure(cfg, hadoopCoreSite);
}
@Test
public void test() throws Exception {
// create a table
try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
String tableName = getUniqueNames(1)[0];
// create set of splits
SortedSet<Text> partitions = new TreeSet<>();
for (String s : "d,m,t".split(","))
partitions.add(new Text(s));
// create table with splits
NewTableConfiguration ntc = new NewTableConfiguration().withSplits(partitions);
client.tableOperations().create(tableName, ntc);
// scribble over the splits
VolumeChooserIT.writeDataToTable(client, tableName, VolumeChooserIT.alpha_rows);
// write the data to disk, read it back
client.tableOperations().flush(tableName, null, null, true);
try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
int i = 0;
for (Entry<Key,Value> entry : scanner) {
assertEquals(VolumeChooserIT.alpha_rows[i++], entry.getKey().getRow().toString());
}
}
// verify the new files are written to the different volumes
try (Scanner scanner = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
scanner.setRange(new Range("1", "1<"));
scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
int fileCount = 0;
for (Entry<Key,Value> entry : scanner) {
boolean inV1 = entry.getKey().getColumnQualifier().toString().contains(v1.toString());
boolean inV2 = entry.getKey().getColumnQualifier().toString().contains(v2.toString());
assertTrue(inV1 || inV2);
fileCount++;
}
assertEquals(4, fileCount);
List<DiskUsage> diskUsage =
client.tableOperations().getDiskUsage(Collections.singleton(tableName));
assertEquals(1, diskUsage.size());
long usage = diskUsage.get(0).getUsage();
log.debug("usage {}", usage);
assertTrue(usage > 700 && usage < 900);
}
}
}
private void verifyData(List<String> expected, Scanner createScanner) {
List<String> actual = new ArrayList<>();
for (Entry<Key,Value> entry : createScanner) {
Key k = entry.getKey();
actual.add(k.getRow() + ":" + k.getColumnFamily() + ":" + k.getColumnQualifier() + ":"
+ entry.getValue());
}
Collections.sort(expected);
Collections.sort(actual);
createScanner.close();
assertEquals(expected, actual);
}
@Test
public void testAddVolumes() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
String[] tableNames = getUniqueNames(2);
String uuid = verifyAndShutdownCluster(client, tableNames[0]);
updateConfig(config -> config.setProperty(Property.INSTANCE_VOLUMES.getKey(),
v1 + "," + v2 + "," + v3));
// initialize volume
assertEquals(0, cluster.exec(Initialize.class, "--add-volumes").getProcess().waitFor());
checkVolumesInitialized(Arrays.asList(v1, v2, v3), uuid);
// start cluster and verify that new volume is used
cluster.start();
verifyVolumesUsed(client, tableNames[1], false, v1, v2, v3);
}
}
// grab uuid before shutting down cluster
private String verifyAndShutdownCluster(AccumuloClient c, String tableName) throws Exception {
String uuid = c.instanceOperations().getInstanceID();
verifyVolumesUsed(c, tableName, false, v1, v2);
assertEquals(0, cluster.exec(Admin.class, "stopAll").getProcess().waitFor());
cluster.stop();
return uuid;
}
@Test
public void testNonConfiguredVolumes() throws Exception {
String[] tableNames = getUniqueNames(2);
try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
String uuid = verifyAndShutdownCluster(client, tableNames[0]);
updateConfig(config -> config.setProperty(Property.INSTANCE_VOLUMES.getKey(), v2 + "," + v3));
// initialize volume
assertEquals(0, cluster.exec(Initialize.class, "--add-volumes").getProcess().waitFor());
checkVolumesInitialized(Arrays.asList(v1, v2, v3), uuid);
// start cluster and verify that new volume is used
cluster.start();
// verify we can still read the tables (tableNames[0] is likely to have a file still on v1)
verifyData(expected, client.createScanner(tableNames[0], Authorizations.EMPTY));
// v1 should not have any data for tableNames[1]
verifyVolumesUsed(client, tableNames[1], false, v2, v3);
}
}
// check that all volumes are initialized
private void checkVolumesInitialized(List<Path> volumes, String uuid) throws Exception {
for (Path volumePath : volumes) {
FileSystem fs = volumePath.getFileSystem(cluster.getServerContext().getHadoopConf());
Path vp = new Path(volumePath, ServerConstants.INSTANCE_ID_DIR);
FileStatus[] iids = fs.listStatus(vp);
assertEquals(1, iids.length);
assertEquals(uuid, iids[0].getPath().getName());
}
}
private void writeData(String tableName, AccumuloClient client) throws AccumuloException,
AccumuloSecurityException, TableExistsException, TableNotFoundException {
TreeSet<Text> splits = new TreeSet<>();
for (int i = 1; i < 100; i++) {
splits.add(new Text(String.format("%06d", i * 100)));
}
NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splits);
client.tableOperations().create(tableName, ntc);
try (BatchWriter bw = client.createBatchWriter(tableName)) {
for (int i = 0; i < 100; i++) {
String row = String.format("%06d", i * 100 + 3);
Mutation m = new Mutation(row);
m.put("cf1", "cq1", "1");
bw.addMutation(m);
}
}
}
private void verifyVolumesUsed(AccumuloClient client, String tableName, boolean shouldExist,
Path... paths) throws Exception {
if (!client.tableOperations().exists(tableName)) {
assertFalse(shouldExist);
writeData(tableName, client);
verifyData(expected, client.createScanner(tableName, Authorizations.EMPTY));
client.tableOperations().flush(tableName, null, null, true);
}
verifyData(expected, client.createScanner(tableName, Authorizations.EMPTY));
TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName));
try (Scanner metaScanner = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
metaScanner.fetchColumnFamily(DataFileColumnFamily.NAME);
metaScanner.setRange(new KeyExtent(tableId, null, null).toMetaRange());
int[] counts = new int[paths.length];
outer: for (Entry<Key,Value> entry : metaScanner) {
String path = entry.getKey().getColumnQualifier().toString();
for (int i = 0; i < paths.length; i++) {
if (path.startsWith(paths[i].toString())) {
counts[i]++;
continue outer;
}
}
fail("Unexpected volume " + path);
}
// keep retrying until WAL state information in ZooKeeper stabilizes or until test times out
retry: while (true) {
WalStateManager wals = new WalStateManager(getServerContext());
try {
outer: for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
for (Path path : paths) {
if (entry.getKey().toString().startsWith(path.toString())) {
continue outer;
}
}
log.warn("Unexpected volume " + entry.getKey() + " (" + entry.getValue() + ")");
continue retry;
}
} catch (WalMarkerException e) {
Throwable cause = e.getCause();
if (cause instanceof NoNodeException) {
// ignore WALs being cleaned up
continue retry;
}
throw e;
}
break;
}
// if a volume is chosen randomly for each tablet, then the probability that a volume will not
// be chosen for any tablet is ((num_volumes -
// 1)/num_volumes)^num_tablets. For 100 tablets and 3 volumes the probability that only 2
// volumes would be chosen is 2.46e-18
int sum = 0;
for (int count : counts) {
assertTrue(count > 0);
sum += count;
}
assertEquals(100, sum);
}
}
@Test
public void testRemoveVolumes() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
String[] tableNames = getUniqueNames(2);
verifyVolumesUsed(client, tableNames[0], false, v1, v2);
assertEquals(0, cluster.exec(Admin.class, "stopAll").getProcess().waitFor());
cluster.stop();
updateConfig(config -> config.setProperty(Property.INSTANCE_VOLUMES.getKey(), v2.toString()));
// start cluster and verify that volume was decommissioned
cluster.start();
client.tableOperations().compact(tableNames[0], null, null, true, true);
verifyVolumesUsed(client, tableNames[0], true, v2);
client.tableOperations().compact(RootTable.NAME, new CompactionConfig().setWait(true));
// check that root tablet is not on volume 1
int count = 0;
for (StoredTabletFile file : ((ClientContext) client).getAmple().readTablet(RootTable.EXTENT)
.getFiles()) {
assertTrue(file.getMetaUpdateDelete().startsWith(v2.toString()));
count++;
}
assertTrue(count > 0);
client.tableOperations().clone(tableNames[0], tableNames[1], true, new HashMap<>(),
new HashSet<>());
client.tableOperations().flush(MetadataTable.NAME, null, null, true);
client.tableOperations().flush(RootTable.NAME, null, null, true);
verifyVolumesUsed(client, tableNames[0], true, v2);
verifyVolumesUsed(client, tableNames[1], true, v2);
}
}
private void testReplaceVolume(AccumuloClient client, boolean cleanShutdown) throws Exception {
String[] tableNames = getUniqueNames(3);
verifyVolumesUsed(client, tableNames[0], false, v1, v2);
// write to 2nd table, but do not flush data to disk before shutdown
try (AccumuloClient c2 =
cluster.createAccumuloClient("root", new PasswordToken(ROOT_PASSWORD))) {
writeData(tableNames[1], c2);
}
if (cleanShutdown)
assertEquals(0, cluster.exec(Admin.class, "stopAll").getProcess().waitFor());
cluster.stop();
File v1f = new File(v1.toUri());
File v8f = new File(new File(v1.getParent().toUri()), "v8");
assertTrue("Failed to rename " + v1f + " to " + v8f, v1f.renameTo(v8f));
Path v8 = new Path(v8f.toURI());
File v2f = new File(v2.toUri());
File v9f = new File(new File(v2.getParent().toUri()), "v9");
assertTrue("Failed to rename " + v2f + " to " + v9f, v2f.renameTo(v9f));
Path v9 = new Path(v9f.toURI());
updateConfig(config -> {
config.setProperty(Property.INSTANCE_VOLUMES.getKey(), v8 + "," + v9);
config.setProperty(Property.INSTANCE_VOLUMES_REPLACEMENTS.getKey(),
v1 + " " + v8 + "," + v2 + " " + v9);
});
// start cluster and verify that volumes were replaced
cluster.start();
verifyVolumesUsed(client, tableNames[0], true, v8, v9);
verifyVolumesUsed(client, tableNames[1], true, v8, v9);
// verify writes to new dir
client.tableOperations().compact(tableNames[0], null, null, true, true);
client.tableOperations().compact(tableNames[1], null, null, true, true);
verifyVolumesUsed(client, tableNames[0], true, v8, v9);
verifyVolumesUsed(client, tableNames[1], true, v8, v9);
client.tableOperations().compact(RootTable.NAME, new CompactionConfig().setWait(true));
// check that root tablet is not on volume 1 or 2
int count = 0;
for (StoredTabletFile file : ((ClientContext) client).getAmple().readTablet(RootTable.EXTENT)
.getFiles()) {
assertTrue(file.getMetaUpdateDelete().startsWith(v8.toString())
|| file.getMetaUpdateDelete().startsWith(v9.toString()));
count++;
}
assertTrue(count > 0);
client.tableOperations().clone(tableNames[1], tableNames[2], true, new HashMap<>(),
new HashSet<>());
client.tableOperations().flush(MetadataTable.NAME, null, null, true);
client.tableOperations().flush(RootTable.NAME, null, null, true);
verifyVolumesUsed(client, tableNames[0], true, v8, v9);
verifyVolumesUsed(client, tableNames[1], true, v8, v9);
verifyVolumesUsed(client, tableNames[2], true, v8, v9);
}
@Test
public void testCleanReplaceVolumes() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
testReplaceVolume(client, true);
}
}
@Test
public void testDirtyReplaceVolumes() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
testReplaceVolume(client, false);
}
}
@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths provided by test")
private void updateConfig(Consumer<PropertiesConfiguration> updater) throws Exception {
var file = new File(cluster.getAccumuloPropertiesPath());
var config = new PropertiesConfiguration();
try (FileReader out = new FileReader(file, UTF_8)) {
config.read(out);
}
updater.accept(config);
try (FileWriter out = new FileWriter(file, UTF_8)) {
config.write(out);
}
}
}