blob: 8316c42c0cdb9ba5d9955c712f225d49e37d8f9e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test.functional;
import static org.apache.accumulo.core.conf.Property.GC_CYCLE_DELAY;
import static org.apache.accumulo.core.conf.Property.GC_CYCLE_START;
import static org.apache.accumulo.core.conf.Property.INSTANCE_ZK_TIMEOUT;
import static org.apache.accumulo.core.conf.Property.TSERV_WALOG_MAX_SIZE;
import static org.apache.accumulo.core.conf.Property.TSERV_WAL_REPLICATION;
import static org.apache.accumulo.core.security.Authorizations.EMPTY;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import static org.apache.accumulo.minicluster.ServerType.GARBAGE_COLLECTOR;
import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.manager.state.SetGoalState;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterControl;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.log.WalStateManager;
import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
import org.apache.accumulo.server.log.WalStateManager.WalState;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.Text;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.junit.Test;
import com.google.common.collect.Iterators;
public class WALSunnyDayIT extends ConfigurableMacBase {
private static final Text CF = new Text(new byte[0]);
@Override
protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
cfg.setProperty(GC_CYCLE_DELAY, "1s");
cfg.setProperty(GC_CYCLE_START, "0s");
cfg.setProperty(TSERV_WALOG_MAX_SIZE, "1M");
cfg.setProperty(TSERV_WAL_REPLICATION, "1");
cfg.setProperty(INSTANCE_ZK_TIMEOUT, "15s");
cfg.setNumTservers(1);
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}
int countInUse(Collection<WalState> bools) {
int result = 0;
for (WalState b : bools) {
if (b != WalState.UNREFERENCED)
result++;
}
return result;
}
@Test
public void test() throws Exception {
MiniAccumuloClusterImpl mac = getCluster();
MiniAccumuloClusterControl control = mac.getClusterControl();
control.stop(GARBAGE_COLLECTOR);
ServerContext context = getServerContext();
try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) {
String tableName = getUniqueNames(1)[0];
c.tableOperations().create(tableName);
writeSomeData(c, tableName, 1, 1);
// wal markers are added lazily
Map<String,WalState> wals = getWALsAndAssertCount(context, 2);
assertEquals("all WALs should be in use", 2, countInUse(wals.values()));
// roll log, get a new next
writeSomeData(c, tableName, 1001, 50);
Map<String,WalState> walsAfterRoll = getWALsAndAssertCount(context, 3);
assertTrue("new WALs should be a superset of the old WALs",
walsAfterRoll.keySet().containsAll(wals.keySet()));
assertEquals("all WALs should be in use", 3, countInUse(walsAfterRoll.values()));
// flush the tables
for (String table : new String[] {tableName, MetadataTable.NAME, RootTable.NAME}) {
c.tableOperations().flush(table, null, null, true);
}
sleepUninterruptibly(1, TimeUnit.SECONDS);
// rolled WAL is no longer in use, but needs to be GC'd
Map<String,WalState> walsAfterflush = getWALsAndAssertCount(context, 3);
assertEquals("inUse should be 2", 2, countInUse(walsAfterflush.values()));
// let the GC run for a little bit
control.start(GARBAGE_COLLECTOR);
sleepUninterruptibly(5, TimeUnit.SECONDS);
// make sure the unused WAL goes away
getWALsAndAssertCount(context, 2);
control.stop(GARBAGE_COLLECTOR);
// restart the tserver, but don't run recovery on all tablets
control.stop(TABLET_SERVER);
// this delays recovery on the normal tables
assertEquals(0, cluster.exec(SetGoalState.class, "SAFE_MODE").getProcess().waitFor());
control.start(TABLET_SERVER);
// wait for the metadata table to go back online
getRecoveryMarkers(c);
// allow a little time for the manager to notice ASSIGNED_TO_DEAD_SERVER tablets
sleepUninterruptibly(5, TimeUnit.SECONDS);
Map<KeyExtent,List<String>> markers = getRecoveryMarkers(c);
// log.debug("markers " + markers);
assertEquals("one tablet should have markers", 1, markers.size());
assertEquals("tableId of the keyExtent should be 1", "1",
markers.keySet().iterator().next().tableId().canonical());
// put some data in the WAL
assertEquals(0, cluster.exec(SetGoalState.class, "NORMAL").getProcess().waitFor());
verifySomeData(c, tableName, 1001 * 50 + 1);
writeSomeData(c, tableName, 100, 100);
Map<String,WalState> walsAfterRestart = getWALsAndAssertCount(context, 4);
// log.debug("wals after " + walsAfterRestart);
assertEquals("used WALs after restart should be 4", 4, countInUse(walsAfterRestart.values()));
control.start(GARBAGE_COLLECTOR);
sleepUninterruptibly(5, TimeUnit.SECONDS);
Map<String,WalState> walsAfterRestartAndGC = getWALsAndAssertCount(context, 2);
assertEquals("logs in use should be 2", 2, countInUse(walsAfterRestartAndGC.values()));
}
}
private void verifySomeData(AccumuloClient c, String tableName, int expected) throws Exception {
try (Scanner scan = c.createScanner(tableName, EMPTY)) {
int result = Iterators.size(scan.iterator());
assertEquals(expected, result);
}
}
private void writeSomeData(AccumuloClient client, String tableName, int row, int col)
throws Exception {
Random rand = new SecureRandom();
try (BatchWriter bw = client.createBatchWriter(tableName)) {
byte[] rowData = new byte[10];
byte[] cq = new byte[10];
byte[] value = new byte[10];
for (int r = 0; r < row; r++) {
rand.nextBytes(rowData);
Mutation m = new Mutation(rowData);
for (int c = 0; c < col; c++) {
rand.nextBytes(cq);
rand.nextBytes(value);
m.put(CF, new Text(cq), new Value(value));
}
bw.addMutation(m);
if (r % 100 == 0) {
bw.flush();
}
}
}
}
private Map<KeyExtent,List<String>> getRecoveryMarkers(AccumuloClient c) throws Exception {
Map<KeyExtent,List<String>> result = new HashMap<>();
try (Scanner root = c.createScanner(RootTable.NAME, EMPTY);
Scanner meta = c.createScanner(MetadataTable.NAME, EMPTY)) {
root.setRange(TabletsSection.getRange());
root.fetchColumnFamily(LogColumnFamily.NAME);
TabletColumnFamily.PREV_ROW_COLUMN.fetch(root);
meta.setRange(TabletsSection.getRange());
meta.fetchColumnFamily(LogColumnFamily.NAME);
TabletColumnFamily.PREV_ROW_COLUMN.fetch(meta);
List<String> logs = new ArrayList<>();
Iterator<Entry<Key,Value>> both = Iterators.concat(root.iterator(), meta.iterator());
while (both.hasNext()) {
Entry<Key,Value> entry = both.next();
Key key = entry.getKey();
if (key.getColumnFamily().equals(LogColumnFamily.NAME)) {
logs.add(key.getColumnQualifier().toString());
}
if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) && !logs.isEmpty()) {
KeyExtent extent = KeyExtent.fromMetaPrevRow(entry);
result.put(extent, logs);
logs = new ArrayList<>();
}
}
}
return result;
}
private final int TIMES_TO_COUNT = 20;
private final int PAUSE_BETWEEN_COUNTS = 100;
private Map<String,WalState> getWALsAndAssertCount(ServerContext c, int expectedCount)
throws Exception {
// see https://issues.apache.org/jira/browse/ACCUMULO-4110. Sometimes this test counts the logs
// before
// the new standby log is actually ready. So let's try a few times before failing, returning the
// last
// wals variable with the the correct count.
Map<String,WalState> wals = _getWals(c);
if (wals.size() == expectedCount) {
return wals;
}
int waitLonger = getWaitFactor();
for (int i = 1; i <= TIMES_TO_COUNT; i++) {
Thread.sleep(i * PAUSE_BETWEEN_COUNTS * waitLonger);
wals = _getWals(c);
if (wals.size() == expectedCount) {
return wals;
}
}
fail(
"Unable to get the correct number of WALs, expected " + expectedCount + " but got " + wals);
return new HashMap<>();
}
private int getWaitFactor() {
int waitLonger = 1;
String timeoutString = System.getProperty("timeout.factor");
if (timeoutString != null && !timeoutString.isEmpty()) {
int timeout = Integer.parseInt(timeoutString);
if (timeout > 1) {
waitLonger = timeout;
}
}
return waitLonger;
}
static Map<String,WalState> _getWals(ServerContext c) throws Exception {
while (true) {
try {
Map<String,WalState> result = new HashMap<>();
WalStateManager wals = new WalStateManager(c);
for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
// WALs are in use if they are not unreferenced
result.put(entry.getKey().toString(), entry.getValue());
}
return result;
} catch (WalMarkerException wme) {
if (wme.getCause() instanceof NoNodeException) {
log.debug("WALs changed while reading, retrying", wme);
} else {
throw wme;
}
}
}
}
}