| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hdfs; |
| |
| import java.io.File; |
| import java.io.FileFilter; |
| import java.io.FileInputStream; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.RandomAccessFile; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure; |
| import org.apache.hadoop.hdfs.server.namenode.NameNode; |
| import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import org.apache.hadoop.util.Preconditions; |
| import org.slf4j.event.Level; |
| |
| /** |
| * Test randomly mixing append, snapshot and truncate operations. |
| * Use local file system to simulate the each operation and verify |
| * the correctness. |
| */ |
| public class TestAppendSnapshotTruncate { |
| static { |
| GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.TRACE); |
| } |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TestAppendSnapshotTruncate.class); |
| private static final int BLOCK_SIZE = 1024; |
| private static final int DATANODE_NUM = 4; |
| private static final short REPLICATION = 3; |
| private static final int FILE_WORKER_NUM = 10; |
| private static final long TEST_TIME_SECOND = 20; |
| private static final long TEST_TIMEOUT_SECOND = TEST_TIME_SECOND + 60; |
| |
| static final int SHORT_HEARTBEAT = 1; |
| static final String[] EMPTY_STRINGS = {}; |
| |
| static Configuration conf; |
| static MiniDFSCluster cluster; |
| static DistributedFileSystem dfs; |
| |
| @BeforeClass |
| public static void startUp() throws IOException { |
| conf = new HdfsConfiguration(); |
| conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE); |
| conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, BLOCK_SIZE); |
| conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, SHORT_HEARTBEAT); |
| conf.setLong( |
| DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 1); |
| conf.setBoolean(ReplaceDatanodeOnFailure.BEST_EFFORT_KEY, true); |
| cluster = new MiniDFSCluster.Builder(conf) |
| .format(true) |
| .numDataNodes(DATANODE_NUM) |
| .waitSafeMode(true) |
| .build(); |
| dfs = cluster.getFileSystem(); |
| } |
| |
| @AfterClass |
| public static void tearDown() throws IOException { |
| if(dfs != null) { |
| dfs.close(); |
| } |
| if(cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| |
| |
| /** Test randomly mixing append, snapshot and truncate operations. */ |
| @Test(timeout=TEST_TIMEOUT_SECOND*1000) |
| public void testAST() throws Exception { |
| final String dirPathString = "/dir"; |
| final Path dir = new Path(dirPathString); |
| dfs.mkdirs(dir); |
| dfs.allowSnapshot(dir); |
| |
| final File localDir = GenericTestUtils.getTestDir(dirPathString); |
| if (localDir.exists()) { |
| FileUtil.fullyDelete(localDir); |
| } |
| localDir.mkdirs(); |
| |
| final DirWorker w = new DirWorker(dir, localDir, FILE_WORKER_NUM); |
| w.startAllFiles(); |
| w.start(); |
| Worker.sleep(TEST_TIME_SECOND * 1000); |
| w.stop(); |
| w.stopAllFiles(); |
| w.checkEverything(); |
| } |
| |
| static final FileFilter FILE_ONLY = new FileFilter() { |
| @Override |
| public boolean accept(File f) { |
| return f.isFile(); |
| } |
| }; |
| |
| static class DirWorker extends Worker { |
| final Path dir; |
| final File localDir; |
| |
| final FileWorker[] files; |
| |
| private Map<String, Path> snapshotPaths = new HashMap<String, Path>(); |
| private AtomicInteger snapshotCount = new AtomicInteger(); |
| |
| DirWorker(Path dir, File localDir, int nFiles) throws IOException { |
| super(dir.getName()); |
| this.dir = dir; |
| this.localDir = localDir; |
| |
| this.files = new FileWorker[nFiles]; |
| for(int i = 0; i < files.length; i++) { |
| files[i] = new FileWorker(dir, localDir, String.format("file%02d", i)); |
| } |
| } |
| |
| static String getSnapshotName(int n) { |
| return String.format("s%02d", n); |
| } |
| |
| String createSnapshot(String snapshot) throws IOException { |
| final StringBuilder b = new StringBuilder("createSnapshot: ") |
| .append(snapshot).append(" for ").append(dir); |
| |
| { |
| //copy all local files to a sub dir to simulate snapshot. |
| final File subDir = new File(localDir, snapshot); |
| Assert.assertFalse(subDir.exists()); |
| subDir.mkdir(); |
| |
| for(File f : localDir.listFiles(FILE_ONLY)) { |
| FileUtils.copyFile(f, new File(subDir, f.getName())); |
| } |
| } |
| |
| final Path p = dfs.createSnapshot(dir, snapshot); |
| snapshotPaths.put(snapshot, p); |
| return b.toString(); |
| } |
| |
| String checkSnapshot(String snapshot) throws IOException { |
| final StringBuilder b = new StringBuilder("checkSnapshot: ") |
| .append(snapshot); |
| |
| final File subDir = new File(localDir, snapshot); |
| Assert.assertTrue(subDir.exists()); |
| |
| final File[] localFiles = subDir.listFiles(FILE_ONLY); |
| final Path p = snapshotPaths.get(snapshot); |
| final FileStatus[] statuses = dfs.listStatus(p); |
| Assert.assertEquals(localFiles.length, statuses.length); |
| b.append(p).append(" vs ").append(subDir).append(", ") |
| .append(statuses.length).append(" entries"); |
| |
| Arrays.sort(localFiles); |
| Arrays.sort(statuses); |
| for(int i = 0; i < statuses.length; i++) { |
| FileWorker.checkFullFile(statuses[i].getPath(), localFiles[i]); |
| } |
| return b.toString(); |
| } |
| |
| String deleteSnapshot(String snapshot) throws IOException { |
| final StringBuilder b = new StringBuilder("deleteSnapshot: ") |
| .append(snapshot).append(" from ").append(dir); |
| FileUtil.fullyDelete(new File(localDir, snapshot)); |
| dfs.deleteSnapshot(dir, snapshot); |
| snapshotPaths.remove(snapshot); |
| return b.toString(); |
| } |
| |
| |
| @Override |
| public String call() throws Exception { |
| final int op = ThreadLocalRandom.current().nextInt(6); |
| if (op <= 1) { |
| pauseAllFiles(); |
| try { |
| final String snapshot = getSnapshotName(snapshotCount.getAndIncrement()); |
| return createSnapshot(snapshot); |
| } finally { |
| startAllFiles(); |
| } |
| } else if (op <= 3) { |
| final String[] keys = snapshotPaths.keySet().toArray(EMPTY_STRINGS); |
| if (keys.length == 0) { |
| return "NO-OP"; |
| } |
| final String snapshot = keys[ThreadLocalRandom.current() |
| .nextInt(keys.length)]; |
| final String s = checkSnapshot(snapshot); |
| |
| if (op == 2) { |
| return deleteSnapshot(snapshot); |
| } |
| return s; |
| } else { |
| return "NO-OP"; |
| } |
| } |
| |
| void pauseAllFiles() { |
| for(FileWorker f : files) { |
| f.pause(); |
| } |
| |
| for(int i = 0; i < files.length; ) { |
| sleep(100); |
| for(; i < files.length && files[i].isPaused(); i++); |
| } |
| } |
| |
| void startAllFiles() { |
| for(FileWorker f : files) { |
| f.start(); |
| } |
| } |
| |
| void stopAllFiles() throws InterruptedException { |
| for(FileWorker f : files) { |
| f.stop(); |
| } |
| } |
| |
| void checkEverything() throws IOException { |
| LOG.info("checkEverything"); |
| for(FileWorker f : files) { |
| f.checkFullFile(); |
| f.checkErrorState(); |
| } |
| for(String snapshot : snapshotPaths.keySet()) { |
| checkSnapshot(snapshot); |
| } |
| checkErrorState(); |
| } |
| } |
| |
| static class FileWorker extends Worker { |
| final Path file; |
| final File localFile; |
| |
| FileWorker(Path dir, File localDir, String filename) throws IOException { |
| super(filename); |
| this.file = new Path(dir, filename); |
| this.localFile = new File(localDir, filename); |
| |
| localFile.createNewFile(); |
| dfs.create(file, false, 4096, REPLICATION, BLOCK_SIZE).close(); |
| } |
| |
| @Override |
| public String call() throws IOException { |
| final int op = ThreadLocalRandom.current().nextInt(9); |
| if (op == 0) { |
| return checkFullFile(); |
| } else { |
| final int nBlocks = ThreadLocalRandom.current().nextInt(4) + 1; |
| final int lastBlockSize = ThreadLocalRandom.current() |
| .nextInt(BLOCK_SIZE) + 1; |
| final int nBytes = nBlocks*BLOCK_SIZE + lastBlockSize; |
| |
| if (op <= 4) { |
| return append(nBytes); |
| } else if (op <= 6) { |
| return truncateArbitrarily(nBytes); |
| } else { |
| return truncateToBlockBoundary(nBlocks); |
| } |
| } |
| } |
| |
| String append(int n) throws IOException { |
| final StringBuilder b = new StringBuilder("append ") |
| .append(n).append(" bytes to ").append(file.getName()); |
| |
| final byte[] bytes = new byte[n]; |
| ThreadLocalRandom.current().nextBytes(bytes); |
| |
| { // write to local file |
| final FileOutputStream out = new FileOutputStream(localFile, true); |
| out.write(bytes, 0, bytes.length); |
| out.close(); |
| } |
| |
| { |
| final FSDataOutputStream out = dfs.append(file); |
| out.write(bytes, 0, bytes.length); |
| out.close(); |
| } |
| return b.toString(); |
| } |
| |
| String truncateArbitrarily(int nBytes) throws IOException { |
| Preconditions.checkArgument(nBytes > 0); |
| final int length = checkLength(); |
| final StringBuilder b = new StringBuilder("truncateArbitrarily: ") |
| .append(nBytes).append(" bytes from ").append(file.getName()) |
| .append(", length=" + length); |
| |
| truncate(length > nBytes? length - nBytes: 0, b); |
| return b.toString(); |
| } |
| |
| String truncateToBlockBoundary(int nBlocks) throws IOException { |
| Preconditions.checkArgument(nBlocks > 0); |
| final int length = checkLength(); |
| final StringBuilder b = new StringBuilder("truncateToBlockBoundary: ") |
| .append(nBlocks).append(" blocks from ").append(file.getName()) |
| .append(", length=" + length); |
| final int n = (nBlocks - 1)*BLOCK_SIZE + (length%BLOCK_SIZE); |
| Preconditions.checkState(truncate(length > n? length - n: 0, b), b); |
| return b.toString(); |
| } |
| |
| private boolean truncate(long newLength, StringBuilder b) throws IOException { |
| final RandomAccessFile raf = new RandomAccessFile(localFile, "rw"); |
| raf.setLength(newLength); |
| raf.close(); |
| |
| final boolean isReady = dfs.truncate(file, newLength); |
| b.append(", newLength=").append(newLength) |
| .append(", isReady=").append(isReady); |
| if (!isReady) { |
| TestFileTruncate.checkBlockRecovery(file, dfs, 100, 300L); |
| } |
| return isReady; |
| } |
| |
| int checkLength() throws IOException { |
| return checkLength(file, localFile); |
| } |
| |
| static int checkLength(Path file, File localFile) throws IOException { |
| final long length = dfs.getFileStatus(file).getLen(); |
| Assert.assertEquals(localFile.length(), length); |
| Assert.assertTrue(length <= Integer.MAX_VALUE); |
| return (int)length; |
| } |
| |
| String checkFullFile() throws IOException { |
| return checkFullFile(file, localFile); |
| } |
| |
| static String checkFullFile(Path file, File localFile) throws IOException { |
| final StringBuilder b = new StringBuilder("checkFullFile: ") |
| .append(file.getName()).append(" vs ").append(localFile); |
| final byte[] bytes = new byte[checkLength(file, localFile)]; |
| b.append(", length=").append(bytes.length); |
| |
| final FileInputStream in = new FileInputStream(localFile); |
| for(int n = 0; n < bytes.length; ) { |
| n += in.read(bytes, n, bytes.length - n); |
| } |
| in.close(); |
| |
| AppendTestUtil.checkFullFile(dfs, file, bytes.length, bytes, |
| "File content mismatch: " + b, false); |
| return b.toString(); |
| } |
| } |
| |
| static abstract class Worker implements Callable<String> { |
| enum State { |
| IDLE(false), RUNNING(false), STOPPED(true), ERROR(true); |
| |
| final boolean isTerminated; |
| |
| State(boolean isTerminated) { |
| this.isTerminated = isTerminated; |
| } |
| }; |
| |
| final String name; |
| final AtomicReference<State> state = new AtomicReference<State>(State.IDLE); |
| final AtomicBoolean isCalling = new AtomicBoolean(); |
| final AtomicReference<Thread> thread = new AtomicReference<Thread>(); |
| |
| private Throwable thrown = null; |
| |
| Worker(String name) { |
| this.name = name; |
| } |
| |
| State checkErrorState() { |
| final State s = state.get(); |
| if (s == State.ERROR) { |
| throw new IllegalStateException(name + " has " + s, thrown); |
| } |
| return s; |
| } |
| |
| void setErrorState(Throwable t) { |
| checkErrorState(); |
| |
| LOG.error("Worker " + name + " failed.", t); |
| state.set(State.ERROR); |
| thrown = t; |
| } |
| |
| void start() { |
| Preconditions.checkState(state.compareAndSet(State.IDLE, State.RUNNING)); |
| |
| if (thread.get() == null) { |
| final Thread t = new Thread(null, new Runnable() { |
| @Override |
| public void run() { |
| for(State s; !(s = checkErrorState()).isTerminated;) { |
| if (s == State.RUNNING) { |
| isCalling.set(true); |
| try { |
| LOG.info(call()); |
| } catch(Throwable t) { |
| setErrorState(t); |
| return; |
| } |
| isCalling.set(false); |
| } |
| sleep(ThreadLocalRandom.current().nextInt(100) + 50); |
| } |
| } |
| }, name); |
| Preconditions.checkState(thread.compareAndSet(null, t)); |
| t.start(); |
| } |
| } |
| |
| boolean isPaused() { |
| final State s = checkErrorState(); |
| if (s == State.STOPPED) { |
| throw new IllegalStateException(name + " is " + s); |
| } |
| return s == State.IDLE && !isCalling.get(); |
| } |
| |
| void pause() { |
| checkErrorState(); |
| Preconditions.checkState(state.compareAndSet(State.RUNNING, State.IDLE), |
| "%s: state=%s != %s", name, state.get(), State.RUNNING); |
| } |
| |
| void stop() throws InterruptedException { |
| checkErrorState(); |
| |
| state.set(State.STOPPED); |
| thread.get().join(); |
| } |
| |
| static void sleep(final long sleepTimeMs) { |
| try { |
| Thread.sleep(sleepTimeMs); |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| } |