blob: f0ac9c15eb14018a1e477f585f198df61970a4c1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test.functional;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection.SkewedKeyValue;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.util.ServerServices;
import org.apache.accumulo.core.util.ServerServices.Service;
import org.apache.accumulo.fate.zookeeper.ServiceLock;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooUtil;
import org.apache.accumulo.gc.SimpleGarbageCollector;
import org.apache.accumulo.minicluster.MemoryUnit;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.miniclusterImpl.ProcessNotFoundException;
import org.apache.accumulo.miniclusterImpl.ProcessReference;
import org.apache.accumulo.test.TestIngest;
import org.apache.accumulo.test.VerifyIngest;
import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.Text;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.junit.Test;
import com.google.common.collect.Iterators;
public class GarbageCollectorIT extends ConfigurableMacBase {
private static final String OUR_SECRET = "itsreallysecret";
@Override
public int defaultTimeoutSeconds() {
return 5 * 60;
}
@Override
public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
cfg.setProperty(Property.INSTANCE_SECRET, OUR_SECRET);
cfg.setProperty(Property.GC_CYCLE_START, "1");
cfg.setProperty(Property.GC_CYCLE_DELAY, "1");
cfg.setProperty(Property.GC_PORT, "0");
cfg.setProperty(Property.TSERV_MAXMEM, "5K");
cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
// use raw local file system so walogs sync and flush will work
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}
private void killMacGc() throws ProcessNotFoundException, InterruptedException, KeeperException {
// kill gc started by MAC
getCluster().killProcess(ServerType.GARBAGE_COLLECTOR,
getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR).iterator().next());
// delete lock in zookeeper if there, this will allow next GC to start quickly
var path = ServiceLock.path(getServerContext().getZooKeeperRoot() + Constants.ZGC_LOCK);
ZooReaderWriter zk = new ZooReaderWriter(cluster.getZooKeepers(), 30000, OUR_SECRET);
try {
ServiceLock.deleteLock(zk, path);
} catch (IllegalStateException e) {
log.error("Unable to delete ZooLock for mini accumulo-gc", e);
}
assertNull(getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR));
}
@Test
public void gcTest() throws Exception {
killMacGc();
final String table = "test_ingest";
try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) {
c.tableOperations().create(table);
c.tableOperations().setProperty(table, Property.TABLE_SPLIT_THRESHOLD.getKey(), "5K");
VerifyParams params = new VerifyParams(getClientProperties(), table, 10_000);
params.cols = 1;
TestIngest.ingest(c, cluster.getFileSystem(), params);
c.tableOperations().compact(table, null, null, true, true);
int before = countFiles();
while (true) {
sleepUninterruptibly(1, TimeUnit.SECONDS);
int more = countFiles();
if (more <= before)
break;
before = more;
}
// restart GC
getCluster().start();
sleepUninterruptibly(15, TimeUnit.SECONDS);
int after = countFiles();
VerifyIngest.verifyIngest(c, params);
assertTrue(after < before);
}
}
@Test
public void gcLotsOfCandidatesIT() throws Exception {
killMacGc();
log.info("Filling metadata table with bogus delete flags");
try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) {
addEntries(c);
cluster.getConfig().setDefaultMemory(32, MemoryUnit.MEGABYTE);
ProcessInfo gc = cluster.exec(SimpleGarbageCollector.class);
sleepUninterruptibly(20, TimeUnit.SECONDS);
String output = "";
while (!output.contains("has exceeded the threshold")) {
try {
output = gc.readStdOut();
} catch (UncheckedIOException ex) {
log.error("IO error reading the IT's accumulo-gc STDOUT", ex);
break;
}
}
gc.getProcess().destroy();
assertTrue(output.contains("has exceeded the threshold"));
}
}
@Test
public void dontGCRootLog() throws Exception {
killMacGc();
// dirty metadata
try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) {
String table = getUniqueNames(1)[0];
c.tableOperations().create(table);
// let gc run for a bit
cluster.start();
sleepUninterruptibly(20, TimeUnit.SECONDS);
killMacGc();
// kill tservers
for (ProcessReference ref : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
cluster.killProcess(ServerType.TABLET_SERVER, ref);
}
// run recovery
cluster.start();
// did it recover?
try (Scanner scanner = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
Iterators.size(scanner.iterator());
}
}
}
private Mutation createDelMutation(String path, String cf, String cq, String val) {
Text row = new Text(DeletesSection.encodeRow(path));
Mutation delFlag = new Mutation(row);
delFlag.put(cf, cq, val);
return delFlag;
}
@Test
public void testInvalidDelete() throws Exception {
killMacGc();
try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) {
String table = getUniqueNames(1)[0];
c.tableOperations().create(table);
try (BatchWriter bw = c.createBatchWriter(table)) {
Mutation m1 = new Mutation("r1");
m1.put("cf1", "cq1", "v1");
bw.addMutation(m1);
}
c.tableOperations().flush(table, null, null, true);
// ensure an invalid delete entry does not cause GC to go berserk ACCUMULO-2520
c.securityOperations().grantTablePermission(c.whoami(), MetadataTable.NAME,
TablePermission.WRITE);
try (BatchWriter bw = c.createBatchWriter(MetadataTable.NAME)) {
bw.addMutation(createDelMutation("", "", "", ""));
bw.addMutation(createDelMutation("", "testDel", "test", "valueTest"));
// path is invalid but value is expected - only way the invalid entry will come through
// processing and
// show up to produce error in output to allow while loop to end
bw.addMutation(createDelMutation("/", "", "", SkewedKeyValue.STR_NAME));
}
ProcessInfo gc = cluster.exec(SimpleGarbageCollector.class);
try {
String output = "";
while (!output.contains("Ignoring invalid deletion candidate")) {
sleepUninterruptibly(250, TimeUnit.MILLISECONDS);
try {
output = gc.readStdOut();
} catch (UncheckedIOException ioe) {
log.error("Could not read all from cluster.", ioe);
}
}
} finally {
gc.getProcess().destroy();
}
try (Scanner scanner = c.createScanner(table, Authorizations.EMPTY)) {
Iterator<Entry<Key,Value>> iter = scanner.iterator();
assertTrue(iter.hasNext());
Entry<Key,Value> entry = iter.next();
assertEquals("r1", entry.getKey().getRow().toString());
assertEquals("cf1", entry.getKey().getColumnFamily().toString());
assertEquals("cq1", entry.getKey().getColumnQualifier().toString());
assertEquals("v1", entry.getValue().toString());
assertFalse(iter.hasNext());
}
}
}
@Test
public void testProperPortAdvertisement() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
ZooReaderWriter zk = new ZooReaderWriter(cluster.getZooKeepers(), 30000, OUR_SECRET);
var path = ServiceLock
.path(ZooUtil.getRoot(client.instanceOperations().getInstanceID()) + Constants.ZGC_LOCK);
for (int i = 0; i < 5; i++) {
List<String> locks;
try {
locks = ServiceLock.validateAndSort(path, zk.getChildren(path.toString()));
} catch (NoNodeException e) {
Thread.sleep(5000);
continue;
}
if (locks != null && !locks.isEmpty()) {
String lockPath = path + "/" + locks.get(0);
String gcLoc = new String(zk.getData(lockPath));
assertTrue("Found unexpected data in zookeeper for GC location: " + gcLoc,
gcLoc.startsWith(Service.GC_CLIENT.name()));
int loc = gcLoc.indexOf(ServerServices.SEPARATOR_CHAR);
assertNotEquals("Could not find split point of GC location for: " + gcLoc, -1, loc);
String addr = gcLoc.substring(loc + 1);
int addrSplit = addr.indexOf(':');
assertNotEquals("Could not find split of GC host:port for: " + addr, -1, addrSplit);
String host = addr.substring(0, addrSplit), port = addr.substring(addrSplit + 1);
// We shouldn't have the "bindall" address in zk
assertNotEquals("0.0.0.0", host);
// Nor should we have the "random port" in zk
assertNotEquals(0, Integer.parseInt(port));
return;
}
Thread.sleep(5000);
}
fail("Could not find advertised GC address");
}
}
private int countFiles() throws Exception {
Path path = new Path(cluster.getConfig().getDir() + "/accumulo/tables/1/*/*.rf");
return Iterators.size(Arrays.asList(cluster.getFileSystem().globStatus(path)).iterator());
}
private void addEntries(AccumuloClient client) throws Exception {
Ample ample = getServerContext().getAmple();
client.securityOperations().grantTablePermission(client.whoami(), MetadataTable.NAME,
TablePermission.WRITE);
try (BatchWriter bw = client.createBatchWriter(MetadataTable.NAME)) {
for (int i = 0; i < 100000; ++i) {
String longpath = "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeee"
+ "ffffffffffgggggggggghhhhhhhhhhiiiiiiiiiijjjjjjjjjj";
Mutation delFlag = ample.createDeleteMutation(String.format("file:/%020d/%s", i, longpath));
bw.addMutation(delFlag);
}
}
}
}