| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.namenode; |
| |
| import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; |
| import org.apache.hadoop.hdfs.server.namenode.FsImageValidation.Util; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Timer; |
| import java.util.TimerTask; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import static org.apache.hadoop.hdfs.server.namenode.FsImageValidation.Cli.*; |
| |
| /** For validating {@link INodeReference} subclasses. */ |
| public class INodeReferenceValidation { |
| public static final Logger LOG = LoggerFactory.getLogger( |
| INodeReferenceValidation.class); |
| |
| private static final AtomicReference<INodeReferenceValidation> INSTANCE |
| = new AtomicReference<>(); |
| |
| public static void start() { |
| INSTANCE.compareAndSet(null, new INodeReferenceValidation()); |
| println("%s started", INodeReferenceValidation.class.getSimpleName()); |
| } |
| |
| public static void end(AtomicInteger errorCount) { |
| final INodeReferenceValidation instance = INSTANCE.getAndSet(null); |
| if (instance == null) { |
| return; |
| } |
| |
| final int initCount = errorCount.get(); |
| instance.assertReferences(errorCount); |
| println("%s ended successfully: %d error(s) found.", |
| INodeReferenceValidation.class.getSimpleName(), |
| errorCount.get() - initCount); |
| } |
| |
| static <REF extends INodeReference> void add(REF ref, Class<REF> clazz) { |
| final INodeReferenceValidation validation = INSTANCE.get(); |
| if (validation != null) { |
| final boolean added = validation.getReferences(clazz).add(ref); |
| Preconditions.checkState(added); |
| LOG.trace("add {}: {}", clazz, ref.toDetailString()); |
| } |
| } |
| |
| static <REF extends INodeReference> void remove(REF ref, Class<REF> clazz) { |
| final INodeReferenceValidation validation = INSTANCE.get(); |
| if (validation != null) { |
| final boolean removed = validation.getReferences(clazz).remove(ref); |
| Preconditions.checkState(removed); |
| LOG.trace("remove {}: {}", clazz, ref.toDetailString()); |
| } |
| } |
| |
| static class ReferenceSet<REF extends INodeReference> { |
| private final Class<REF> clazz; |
| private final List<REF> references = new LinkedList<>(); |
| private volatile List<Task<REF>> tasks; |
| private volatile List<Future<Integer>> futures; |
| private final AtomicInteger taskCompleted = new AtomicInteger(); |
| |
| ReferenceSet(Class<REF> clazz) { |
| this.clazz = clazz; |
| } |
| |
| boolean add(REF ref) { |
| return references.add(ref); |
| } |
| |
| boolean remove(REF ref) { |
| for(final Iterator<REF> i = references.iterator(); i.hasNext();) { |
| if (i.next() == ref) { |
| i.remove(); |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| void submit(AtomicInteger errorCount, ExecutorService service) |
| throws InterruptedException { |
| final int size = references.size(); |
| tasks = createTasks(references, errorCount); |
| println("Submitting %d tasks for validating %s %s(s)", |
| tasks.size(), Util.toCommaSeparatedNumber(size), |
| clazz.getSimpleName()); |
| futures = service.invokeAll(tasks); |
| } |
| |
| void waitForFutures() throws Exception { |
| for(Future<Integer> f : futures) { |
| f.get(); |
| taskCompleted.incrementAndGet(); |
| } |
| } |
| |
| double getTaskCompletedPercent() { |
| final List<Task<REF>> t = tasks; |
| return t == null? 0 |
| : t.isEmpty()? 100 |
| : taskCompleted.get()*100.0/tasks.size(); |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("%s %.1f%%", |
| clazz.getSimpleName(), getTaskCompletedPercent()); |
| } |
| } |
| |
| private final ReferenceSet<INodeReference.WithCount> withCounts |
| = new ReferenceSet<>(INodeReference.WithCount.class); |
| private final ReferenceSet<INodeReference.WithName> withNames |
| = new ReferenceSet<>(INodeReference.WithName.class); |
| private final ReferenceSet<INodeReference.DstReference> dstReferences |
| = new ReferenceSet<>(INodeReference.DstReference.class); |
| |
| <REF extends INodeReference> ReferenceSet<REF> getReferences( |
| Class<REF> clazz) { |
| if (clazz == INodeReference.WithCount.class) { |
| return (ReferenceSet<REF>) withCounts; |
| } else if (clazz == INodeReference.WithName.class) { |
| return (ReferenceSet<REF>) withNames; |
| } else if (clazz == INodeReference.DstReference.class) { |
| return (ReferenceSet<REF>) dstReferences; |
| } |
| throw new IllegalArgumentException("References not found for " + clazz); |
| } |
| |
| private void assertReferences(AtomicInteger errorCount) { |
| final int p = Runtime.getRuntime().availableProcessors(); |
| LOG.info("Available Processors: {}", p); |
| final ExecutorService service = Executors.newFixedThreadPool(p); |
| |
| final TimerTask checkProgress = new TimerTask() { |
| @Override |
| public void run() { |
| LOG.info("ASSERT_REFERENCES Progress: {}, {}, {}", |
| dstReferences, withCounts, withNames); |
| } |
| }; |
| final Timer t = new Timer(); |
| t.scheduleAtFixedRate(checkProgress, 0, 1_000); |
| |
| try { |
| dstReferences.submit(errorCount, service); |
| withCounts.submit(errorCount, service); |
| withNames.submit(errorCount, service); |
| |
| dstReferences.waitForFutures(); |
| withCounts.waitForFutures(); |
| withNames.waitForFutures(); |
| } catch (Throwable e) { |
| printError("Failed to assertReferences", e); |
| } finally { |
| service.shutdown(); |
| t.cancel(); |
| } |
| } |
| |
| static <REF extends INodeReference> List<Task<REF>> createTasks( |
| List<REF> references, AtomicInteger errorCount) { |
| final List<Task<REF>> tasks = new LinkedList<>(); |
| for (final Iterator<REF> i = references.iterator(); i.hasNext();) { |
| tasks.add(new Task<>(i, errorCount)); |
| } |
| return tasks; |
| } |
| |
| static class Task<REF extends INodeReference> implements Callable<Integer> { |
| static final int BATCH_SIZE = 100_000; |
| |
| private final List<REF> references = new LinkedList<>(); |
| private final AtomicInteger errorCount; |
| |
| Task(Iterator<REF> i, AtomicInteger errorCount) { |
| for(int n = 0; i.hasNext() && n < BATCH_SIZE; n++) { |
| references.add(i.next()); |
| i.remove(); |
| } |
| this.errorCount = errorCount; |
| } |
| |
| @Override |
| public Integer call() throws Exception { |
| for (final REF ref : references) { |
| try { |
| ref.assertReferences(); |
| } catch (Throwable t) { |
| printError(errorCount, "%s", t); |
| } |
| } |
| return references.size(); |
| } |
| } |
| } |