| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.datanode; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.Channels; |
| import java.nio.channels.ReadableByteChannel; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.commons.io.input.BoundedInputStream; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystemTestHelper; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Tests the implementation of {@link ProvidedReplica}. |
| */ |
| public class TestProvidedReplicaImpl { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TestProvidedReplicaImpl.class); |
| private static final String BASE_DIR = |
| new FileSystemTestHelper().getTestRootDir(); |
| private static final String FILE_NAME = "provided-test"; |
| // length of the file that is associated with the provided blocks. |
| private static final long FILE_LEN = 128 * 1024 * 10L + 64 * 1024; |
| // length of each provided block. |
| private static final long BLK_LEN = 128 * 1024L; |
| |
| private static List<ProvidedReplica> replicas; |
| |
| private static void createFileIfNotExists(String baseDir) throws IOException { |
| File newFile = new File(baseDir, FILE_NAME); |
| newFile.getParentFile().mkdirs(); |
| if(!newFile.exists()) { |
| newFile.createNewFile(); |
| OutputStream writer = new FileOutputStream(newFile.getAbsolutePath()); |
| byte[] bytes = new byte[1]; |
| bytes[0] = (byte) 0; |
| for(int i=0; i< FILE_LEN; i++) { |
| writer.write(bytes); |
| } |
| writer.flush(); |
| writer.close(); |
| LOG.info("Created provided file " + newFile + |
| " of length " + newFile.length()); |
| } |
| } |
| |
| private static void createProvidedReplicas(Configuration conf) { |
| long numReplicas = (long) Math.ceil((double) FILE_LEN/BLK_LEN); |
| File providedFile = new File(BASE_DIR, FILE_NAME); |
| replicas = new ArrayList<ProvidedReplica>(); |
| |
| LOG.info("Creating " + numReplicas + " provided replicas"); |
| for (int i=0; i<numReplicas; i++) { |
| long currentReplicaLength = |
| FILE_LEN >= (i+1)*BLK_LEN ? BLK_LEN : FILE_LEN - i*BLK_LEN; |
| replicas.add( |
| new FinalizedProvidedReplica(i, providedFile.toURI(), i*BLK_LEN, |
| currentReplicaLength, 0, null, null, conf, null)); |
| } |
| } |
| |
| @Before |
| public void setUp() throws IOException { |
| createFileIfNotExists(new File(BASE_DIR).getAbsolutePath()); |
| createProvidedReplicas(new Configuration()); |
| } |
| |
| /** |
| * Checks if {@code ins} matches the provided file from offset |
| * {@code fileOffset} for length {@ dataLength}. |
| * @param file the local file |
| * @param ins input stream to compare against |
| * @param fileOffset offset |
| * @param dataLength length |
| * @throws IOException |
| */ |
| public static void verifyReplicaContents(File file, |
| InputStream ins, long fileOffset, long dataLength) |
| throws IOException { |
| |
| InputStream fileIns = new FileInputStream(file); |
| fileIns.skip(fileOffset); |
| |
| try (ReadableByteChannel i = |
| Channels.newChannel(new BoundedInputStream(fileIns, dataLength))) { |
| try (ReadableByteChannel j = Channels.newChannel(ins)) { |
| ByteBuffer ib = ByteBuffer.allocate(4096); |
| ByteBuffer jb = ByteBuffer.allocate(4096); |
| while (true) { |
| int il = i.read(ib); |
| int jl = j.read(jb); |
| if (il < 0 || jl < 0) { |
| assertEquals(il, jl); |
| break; |
| } |
| ib.flip(); |
| jb.flip(); |
| int cmp = Math.min(ib.remaining(), jb.remaining()); |
| for (int k = 0; k < cmp; ++k) { |
| assertEquals(ib.get(), jb.get()); |
| } |
| ib.compact(); |
| jb.compact(); |
| } |
| } |
| } |
| } |
| |
| @Test |
| public void testProvidedReplicaRead() throws IOException { |
| |
| File providedFile = new File(BASE_DIR, FILE_NAME); |
| for (int i = 0; i < replicas.size(); i++) { |
| ProvidedReplica replica = replicas.get(i); |
| // block data should exist! |
| assertTrue(replica.blockDataExists()); |
| assertEquals(providedFile.toURI(), replica.getBlockURI()); |
| verifyReplicaContents(providedFile, replica.getDataInputStream(0), |
| BLK_LEN*i, replica.getBlockDataLength()); |
| } |
| LOG.info("All replica contents verified"); |
| |
| providedFile.delete(); |
| // the block data should no longer be found! |
| for(int i=0; i < replicas.size(); i++) { |
| ProvidedReplica replica = replicas.get(i); |
| assertTrue(!replica.blockDataExists()); |
| } |
| } |
| |
| } |