blob: 9241644dcb9c2c7877761c19a5da7d9c93deeebc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.server.namenode.FsImageValidation.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.hadoop.hdfs.server.namenode.FsImageValidation.Cli.*;
/** For validating {@link INodeReference} subclasses. */
public class INodeReferenceValidation {
public static final Logger LOG = LoggerFactory.getLogger(
INodeReferenceValidation.class);
private static final AtomicReference<INodeReferenceValidation> INSTANCE
= new AtomicReference<>();
public static void start() {
INSTANCE.compareAndSet(null, new INodeReferenceValidation());
println("%s started", INodeReferenceValidation.class.getSimpleName());
}
public static void end(AtomicInteger errorCount) {
final INodeReferenceValidation instance = INSTANCE.getAndSet(null);
if (instance == null) {
return;
}
final int initCount = errorCount.get();
instance.assertReferences(errorCount);
println("%s ended successfully: %d error(s) found.",
INodeReferenceValidation.class.getSimpleName(),
errorCount.get() - initCount);
}
static <REF extends INodeReference> void add(REF ref, Class<REF> clazz) {
final INodeReferenceValidation validation = INSTANCE.get();
if (validation != null) {
final boolean added = validation.getReferences(clazz).add(ref);
Preconditions.checkState(added);
LOG.trace("add {}: {}", clazz, ref.toDetailString());
}
}
static <REF extends INodeReference> void remove(REF ref, Class<REF> clazz) {
final INodeReferenceValidation validation = INSTANCE.get();
if (validation != null) {
final boolean removed = validation.getReferences(clazz).remove(ref);
Preconditions.checkState(removed);
LOG.trace("remove {}: {}", clazz, ref.toDetailString());
}
}
static class ReferenceSet<REF extends INodeReference> {
private final Class<REF> clazz;
private final List<REF> references = new LinkedList<>();
private volatile List<Task<REF>> tasks;
private volatile List<Future<Integer>> futures;
private final AtomicInteger taskCompleted = new AtomicInteger();
ReferenceSet(Class<REF> clazz) {
this.clazz = clazz;
}
boolean add(REF ref) {
return references.add(ref);
}
boolean remove(REF ref) {
for(final Iterator<REF> i = references.iterator(); i.hasNext();) {
if (i.next() == ref) {
i.remove();
return true;
}
}
return false;
}
void submit(AtomicInteger errorCount, ExecutorService service)
throws InterruptedException {
final int size = references.size();
tasks = createTasks(references, errorCount);
println("Submitting %d tasks for validating %s %s(s)",
tasks.size(), Util.toCommaSeparatedNumber(size),
clazz.getSimpleName());
futures = service.invokeAll(tasks);
}
void waitForFutures() throws Exception {
for(Future<Integer> f : futures) {
f.get();
taskCompleted.incrementAndGet();
}
}
double getTaskCompletedPercent() {
final List<Task<REF>> t = tasks;
return t == null? 0
: t.isEmpty()? 100
: taskCompleted.get()*100.0/tasks.size();
}
@Override
public String toString() {
return String.format("%s %.1f%%",
clazz.getSimpleName(), getTaskCompletedPercent());
}
}
private final ReferenceSet<INodeReference.WithCount> withCounts
= new ReferenceSet<>(INodeReference.WithCount.class);
private final ReferenceSet<INodeReference.WithName> withNames
= new ReferenceSet<>(INodeReference.WithName.class);
private final ReferenceSet<INodeReference.DstReference> dstReferences
= new ReferenceSet<>(INodeReference.DstReference.class);
<REF extends INodeReference> ReferenceSet<REF> getReferences(
Class<REF> clazz) {
if (clazz == INodeReference.WithCount.class) {
return (ReferenceSet<REF>) withCounts;
} else if (clazz == INodeReference.WithName.class) {
return (ReferenceSet<REF>) withNames;
} else if (clazz == INodeReference.DstReference.class) {
return (ReferenceSet<REF>) dstReferences;
}
throw new IllegalArgumentException("References not found for " + clazz);
}
private void assertReferences(AtomicInteger errorCount) {
final int p = Runtime.getRuntime().availableProcessors();
LOG.info("Available Processors: {}", p);
final ExecutorService service = Executors.newFixedThreadPool(p);
final TimerTask checkProgress = new TimerTask() {
@Override
public void run() {
LOG.info("ASSERT_REFERENCES Progress: {}, {}, {}",
dstReferences, withCounts, withNames);
}
};
final Timer t = new Timer();
t.scheduleAtFixedRate(checkProgress, 0, 1_000);
try {
dstReferences.submit(errorCount, service);
withCounts.submit(errorCount, service);
withNames.submit(errorCount, service);
dstReferences.waitForFutures();
withCounts.waitForFutures();
withNames.waitForFutures();
} catch (Throwable e) {
printError("Failed to assertReferences", e);
} finally {
service.shutdown();
t.cancel();
}
}
static <REF extends INodeReference> List<Task<REF>> createTasks(
List<REF> references, AtomicInteger errorCount) {
final List<Task<REF>> tasks = new LinkedList<>();
for (final Iterator<REF> i = references.iterator(); i.hasNext();) {
tasks.add(new Task<>(i, errorCount));
}
return tasks;
}
static class Task<REF extends INodeReference> implements Callable<Integer> {
static final int BATCH_SIZE = 100_000;
private final List<REF> references = new LinkedList<>();
private final AtomicInteger errorCount;
Task(Iterator<REF> i, AtomicInteger errorCount) {
for(int n = 0; i.hasNext() && n < BATCH_SIZE; n++) {
references.add(i.next());
i.remove();
}
this.errorCount = errorCount;
}
@Override
public Integer call() throws Exception {
for (final REF ref : references) {
try {
ref.assertReferences();
} catch (Throwable t) {
printError(errorCount, "%s", t);
}
}
return references.size();
}
}
}