blob: 9fc425aafb287c210ea9a04c08d698d4568d0d07 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.tserver.log;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH;
import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS;
import static org.apache.accumulo.tserver.logger.LogEvents.MUTATION;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.tserver.logger.LogEvents;
import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Collections2;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
/**
* Extract Mutations for a tablet from a set of logs that have been sorted by operation and tablet.
*
*/
public class SortedLogRecovery {
private static final Logger log = LoggerFactory.getLogger(SortedLogRecovery.class);
private VolumeManager fs;
public SortedLogRecovery(VolumeManager fs) {
this.fs = fs;
}
static LogFileKey maxKey(LogEvents event) {
LogFileKey key = new LogFileKey();
key.event = event;
key.tabletId = Integer.MAX_VALUE;
key.seq = Long.MAX_VALUE;
return key;
}
static LogFileKey maxKey(LogEvents event, int tabletId) {
LogFileKey key = maxKey(event);
key.tabletId = tabletId;
return key;
}
static LogFileKey minKey(LogEvents event) {
LogFileKey key = new LogFileKey();
key.event = event;
// see GitHub issue #477. There was a bug that caused -1 to end up in tabletId. If this happens
// want to detect it and fail since recovery is dubious in this situation . Other code should
// fail if the id is actually -1 in data.
key.tabletId = -1;
key.seq = 0;
return key;
}
static LogFileKey minKey(LogEvents event, int tabletId) {
LogFileKey key = minKey(event);
key.tabletId = tabletId;
return key;
}
private int findMaxTabletId(KeyExtent extent, List<Path> recoveryLogs) throws IOException {
int tabletId = -1;
try (RecoveryLogsIterator rli =
new RecoveryLogsIterator(fs, recoveryLogs, minKey(DEFINE_TABLET), maxKey(DEFINE_TABLET))) {
KeyExtent alternative = extent;
if (extent.isRootTablet()) {
alternative = RootTable.OLD_EXTENT;
}
while (rli.hasNext()) {
LogFileKey key = rli.next().getKey();
checkState(key.event == DEFINE_TABLET); // should only fail if bug elsewhere
if (key.tablet.equals(extent) || key.tablet.equals(alternative)) {
checkState(key.tabletId >= 0, "tabletId %s for %s is negative", key.tabletId, extent);
checkState(tabletId == -1 || key.tabletId >= tabletId); // should only fail if bug in
// RecoveryLogsIterator
if (tabletId != key.tabletId) {
tabletId = key.tabletId;
}
}
}
}
return tabletId;
}
/**
* This function opens recovery logs one at a time to see if they define the tablet. This is done
* so that later recovery steps that open all of the logs at once can possibly open a smaller set
* of logs. Opening a recovery log requires holding its index in memory and few key/values from
* it. For a lot of recovery logs this could possibly be a lot of memory.
*
* @return The maximum tablet ID observed AND the list of logs that contained the maximum tablet
* ID.
*/
private Entry<Integer,List<Path>> findLogsThatDefineTablet(KeyExtent extent,
List<Path> recoveryLogs) throws IOException {
Map<Integer,List<Path>> logsThatDefineTablet = new HashMap<>();
for (Path wal : recoveryLogs) {
int tabletId = findMaxTabletId(extent, Collections.singletonList(wal));
if (tabletId == -1) {
log.debug("Did not find tablet {} in recovery log {}", extent, wal.getName());
} else {
logsThatDefineTablet.computeIfAbsent(tabletId, k -> new ArrayList<>()).add(wal);
log.debug("Found tablet {} with id {} in recovery log {}", extent, tabletId, wal.getName());
}
}
if (logsThatDefineTablet.isEmpty()) {
return new AbstractMap.SimpleEntry<>(-1, Collections.<Path>emptyList());
} else {
return Collections.max(logsThatDefineTablet.entrySet(),
(o1, o2) -> Integer.compare(o1.getKey(), o2.getKey()));
}
}
private String getPathSuffix(String pathString) {
Path path = new Path(pathString);
if (path.depth() < 2)
throw new IllegalArgumentException("Bad path " + pathString);
return path.getParent().getName() + "/" + path.getName();
}
static class DeduplicatingIterator implements Iterator<Entry<LogFileKey,LogFileValue>> {
private PeekingIterator<Entry<LogFileKey,LogFileValue>> source;
public DeduplicatingIterator(Iterator<Entry<LogFileKey,LogFileValue>> source) {
this.source = Iterators.peekingIterator(source);
}
@Override
public boolean hasNext() {
return source.hasNext();
}
@Override
public Entry<LogFileKey,LogFileValue> next() {
Entry<LogFileKey,LogFileValue> next = source.next();
while (source.hasNext() && next.getKey().compareTo(source.peek().getKey()) == 0) {
source.next();
}
return next;
}
}
private long findRecoverySeq(List<Path> recoveryLogs, Set<String> tabletFiles, int tabletId)
throws IOException {
HashSet<String> suffixes = new HashSet<>();
for (String path : tabletFiles)
suffixes.add(getPathSuffix(path));
long lastStart = 0;
long lastFinish = 0;
long recoverySeq = 0;
try (RecoveryLogsIterator rli = new RecoveryLogsIterator(fs, recoveryLogs,
minKey(COMPACTION_START, tabletId), maxKey(COMPACTION_START, tabletId))) {
DeduplicatingIterator ddi = new DeduplicatingIterator(rli);
String lastStartFile = null;
LogEvents lastEvent = null;
while (ddi.hasNext()) {
LogFileKey key = ddi.next().getKey();
checkState(key.seq >= 0, "Unexpected negative seq %s for tabletId %s", key.seq, tabletId);
checkState(key.tabletId == tabletId); // should only fail if bug elsewhere
checkState(key.seq >= Math.max(lastFinish, lastStart)); // should only fail if bug elsewhere
switch (key.event) {
case COMPACTION_START:
lastStart = key.seq;
lastStartFile = key.filename;
break;
case COMPACTION_FINISH:
checkState(key.seq > lastStart, "Compaction finish <= start %s %s %s", key.tabletId,
key.seq, lastStart);
checkState(lastEvent != COMPACTION_FINISH,
"Saw consecutive COMPACTION_FINISH events %s %s %s", key.tabletId, lastFinish,
key.seq);
lastFinish = key.seq;
break;
default:
throw new IllegalStateException("Non compaction event seen " + key.event);
}
lastEvent = key.event;
}
if (lastEvent == COMPACTION_START && suffixes.contains(getPathSuffix(lastStartFile))) {
// There was no compaction finish event following this start, however the last compaction
// start event has a file in the metadata table, so the compaction finished.
log.debug("Considering compaction start {} {} finished because file {} in metadata table",
tabletId, lastStart, getPathSuffix(lastStartFile));
recoverySeq = lastStart;
} else {
// Recover everything >= the maximum finish sequence number if its set, otherwise return 0.
recoverySeq = Math.max(0, lastFinish - 1);
}
}
return recoverySeq;
}
private void playbackMutations(List<Path> recoveryLogs, MutationReceiver mr, int tabletId,
long recoverySeq) throws IOException {
LogFileKey start = minKey(MUTATION, tabletId);
start.seq = recoverySeq;
LogFileKey end = maxKey(MUTATION, tabletId);
try (RecoveryLogsIterator rli = new RecoveryLogsIterator(fs, recoveryLogs, start, end)) {
while (rli.hasNext()) {
Entry<LogFileKey,LogFileValue> entry = rli.next();
checkState(entry.getKey().tabletId == tabletId); // should only fail if bug elsewhere
checkState(entry.getKey().seq >= recoverySeq); // should only fail if bug elsewhere
if (entry.getKey().event == MUTATION) {
mr.receive(entry.getValue().mutations.get(0));
} else if (entry.getKey().event == MANY_MUTATIONS) {
for (Mutation m : entry.getValue().mutations) {
mr.receive(m);
}
} else {
throw new IllegalStateException("Non mutation event seen " + entry.getKey().event);
}
}
}
}
Collection<String> asNames(List<Path> recoveryLogs) {
return Collections2.transform(recoveryLogs, Path::getName);
}
public void recover(KeyExtent extent, List<Path> recoveryLogs, Set<String> tabletFiles,
MutationReceiver mr) throws IOException {
Entry<Integer,List<Path>> maxEntry = findLogsThatDefineTablet(extent, recoveryLogs);
// A tablet may leave a tserver and then come back, in which case it would have a different and
// higher tablet id. Only want to consider events in the log related to the last time the tablet
// was loaded.
int tabletId = maxEntry.getKey();
List<Path> logsThatDefineTablet = maxEntry.getValue();
if (tabletId == -1) {
log.info("Tablet {} is not defined in recovery logs {} ", extent, asNames(recoveryLogs));
return;
} else {
log.info("Found {} of {} logs with max id {} for tablet {}", logsThatDefineTablet.size(),
recoveryLogs.size(), tabletId, extent);
}
// Find the seq # for the last compaction that started and finished
long recoverySeq = findRecoverySeq(logsThatDefineTablet, tabletFiles, tabletId);
log.info("Recovering mutations, tablet:{} tabletId:{} seq:{} logs:{}", extent, tabletId,
recoverySeq, asNames(logsThatDefineTablet));
// Replay all mutations that were written after the last successful compaction started.
playbackMutations(logsThatDefineTablet, mr, tabletId, recoverySeq);
}
}