| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.io.Files; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.protocol.Block; |
| import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation; |
| import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap; |
| import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer; |
| import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap; |
| import org.apache.hadoop.hdfs.server.common.FileRegion; |
| import org.apache.hadoop.test.LambdaTestUtils; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY; |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.junit.Assert.assertArrayEquals; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.List; |
| import java.util.Optional; |
| import java.util.Random; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.stream.Collectors; |
| |
| /** |
| * Tests the {@link InMemoryLevelDBAliasMapClient}. |
| */ |
| public class TestInMemoryLevelDBAliasMapClient { |
| |
| private InMemoryLevelDBAliasMapServer levelDBAliasMapServer; |
| private InMemoryLevelDBAliasMapClient inMemoryLevelDBAliasMapClient; |
| private File tempDir; |
| private Configuration conf; |
| private final static String BPID = "BPID-0"; |
| |
| @Rule |
| public final ExpectedException exception = ExpectedException.none(); |
| |
| @Before |
| public void setUp() throws IOException { |
| conf = new Configuration(); |
| int port = 9876; |
| |
| conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, |
| "localhost:" + port); |
| tempDir = Files.createTempDir(); |
| File levelDBDir = new File(tempDir, BPID); |
| levelDBDir.mkdirs(); |
| conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR, |
| tempDir.getAbsolutePath()); |
| levelDBAliasMapServer = |
| new InMemoryLevelDBAliasMapServer(InMemoryAliasMap::init, BPID); |
| inMemoryLevelDBAliasMapClient = new InMemoryLevelDBAliasMapClient(); |
| } |
| |
| @After |
| public void tearDown() throws IOException { |
| levelDBAliasMapServer.close(); |
| inMemoryLevelDBAliasMapClient.close(); |
| FileUtils.deleteDirectory(tempDir); |
| } |
| |
| @Test |
| public void writeRead() throws Exception { |
| levelDBAliasMapServer.setConf(conf); |
| levelDBAliasMapServer.start(); |
| inMemoryLevelDBAliasMapClient.setConf(conf); |
| Block block = new Block(42, 43, 44); |
| byte[] nonce = "blackbird".getBytes(); |
| ProvidedStorageLocation providedStorageLocation |
| = new ProvidedStorageLocation(new Path("cuckoo"), |
| 45, 46, nonce); |
| BlockAliasMap.Writer<FileRegion> writer = |
| inMemoryLevelDBAliasMapClient.getWriter(null, BPID); |
| writer.store(new FileRegion(block, providedStorageLocation)); |
| |
| BlockAliasMap.Reader<FileRegion> reader = |
| inMemoryLevelDBAliasMapClient.getReader(null, BPID); |
| Optional<FileRegion> fileRegion = reader.resolve(block); |
| assertEquals(new FileRegion(block, providedStorageLocation), |
| fileRegion.get()); |
| } |
| |
| @Test |
| public void iterateSingleBatch() throws Exception { |
| levelDBAliasMapServer.setConf(conf); |
| levelDBAliasMapServer.start(); |
| inMemoryLevelDBAliasMapClient.setConf(conf); |
| Block block1 = new Block(42, 43, 44); |
| Block block2 = new Block(43, 44, 45); |
| byte[] nonce1 = "blackbird".getBytes(); |
| byte[] nonce2 = "cuckoo".getBytes(); |
| ProvidedStorageLocation providedStorageLocation1 = |
| new ProvidedStorageLocation(new Path("eagle"), |
| 46, 47, nonce1); |
| ProvidedStorageLocation providedStorageLocation2 = |
| new ProvidedStorageLocation(new Path("falcon"), |
| 46, 47, nonce2); |
| BlockAliasMap.Writer<FileRegion> writer1 = |
| inMemoryLevelDBAliasMapClient.getWriter(null, BPID); |
| writer1.store(new FileRegion(block1, providedStorageLocation1)); |
| BlockAliasMap.Writer<FileRegion> writer2 = |
| inMemoryLevelDBAliasMapClient.getWriter(null, BPID); |
| writer2.store(new FileRegion(block2, providedStorageLocation2)); |
| |
| BlockAliasMap.Reader<FileRegion> reader = |
| inMemoryLevelDBAliasMapClient.getReader(null, BPID); |
| List<FileRegion> actualFileRegions = |
| Lists.newArrayListWithCapacity(2); |
| for (FileRegion fileRegion : reader) { |
| actualFileRegions.add(fileRegion); |
| } |
| |
| assertArrayEquals( |
| new FileRegion[] {new FileRegion(block1, providedStorageLocation1), |
| new FileRegion(block2, providedStorageLocation2)}, |
| actualFileRegions.toArray()); |
| } |
| |
| @Test |
| public void iterateThreeBatches() throws Exception { |
| conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE, "2"); |
| levelDBAliasMapServer.setConf(conf); |
| levelDBAliasMapServer.start(); |
| inMemoryLevelDBAliasMapClient.setConf(conf); |
| Block block1 = new Block(42, 43, 44); |
| Block block2 = new Block(43, 44, 45); |
| Block block3 = new Block(44, 45, 46); |
| Block block4 = new Block(47, 48, 49); |
| Block block5 = new Block(50, 51, 52); |
| Block block6 = new Block(53, 54, 55); |
| byte[] nonce1 = "blackbird".getBytes(); |
| byte[] nonce2 = "cuckoo".getBytes(); |
| byte[] nonce3 = "sparrow".getBytes(); |
| byte[] nonce4 = "magpie".getBytes(); |
| byte[] nonce5 = "seagull".getBytes(); |
| byte[] nonce6 = "finch".getBytes(); |
| ProvidedStorageLocation providedStorageLocation1 = |
| new ProvidedStorageLocation(new Path("eagle"), |
| 46, 47, nonce1); |
| ProvidedStorageLocation providedStorageLocation2 = |
| new ProvidedStorageLocation(new Path("falcon"), |
| 48, 49, nonce2); |
| ProvidedStorageLocation providedStorageLocation3 = |
| new ProvidedStorageLocation(new Path("robin"), |
| 50, 51, nonce3); |
| ProvidedStorageLocation providedStorageLocation4 = |
| new ProvidedStorageLocation(new Path("parakeet"), |
| 52, 53, nonce4); |
| ProvidedStorageLocation providedStorageLocation5 = |
| new ProvidedStorageLocation(new Path("heron"), |
| 54, 55, nonce5); |
| ProvidedStorageLocation providedStorageLocation6 = |
| new ProvidedStorageLocation(new Path("duck"), |
| 56, 57, nonce6); |
| inMemoryLevelDBAliasMapClient |
| .getWriter(null, BPID) |
| .store(new FileRegion(block1, providedStorageLocation1)); |
| inMemoryLevelDBAliasMapClient |
| .getWriter(null, BPID) |
| .store(new FileRegion(block2, providedStorageLocation2)); |
| inMemoryLevelDBAliasMapClient |
| .getWriter(null, BPID) |
| .store(new FileRegion(block3, providedStorageLocation3)); |
| inMemoryLevelDBAliasMapClient |
| .getWriter(null, BPID) |
| .store(new FileRegion(block4, providedStorageLocation4)); |
| inMemoryLevelDBAliasMapClient |
| .getWriter(null, BPID) |
| .store(new FileRegion(block5, providedStorageLocation5)); |
| inMemoryLevelDBAliasMapClient |
| .getWriter(null, BPID) |
| .store(new FileRegion(block6, providedStorageLocation6)); |
| |
| BlockAliasMap.Reader<FileRegion> reader = |
| inMemoryLevelDBAliasMapClient.getReader(null, BPID); |
| List<FileRegion> actualFileRegions = |
| Lists.newArrayListWithCapacity(6); |
| for (FileRegion fileRegion : reader) { |
| actualFileRegions.add(fileRegion); |
| } |
| |
| FileRegion[] expectedFileRegions = |
| new FileRegion[] {new FileRegion(block1, providedStorageLocation1), |
| new FileRegion(block2, providedStorageLocation2), |
| new FileRegion(block3, providedStorageLocation3), |
| new FileRegion(block4, providedStorageLocation4), |
| new FileRegion(block5, providedStorageLocation5), |
| new FileRegion(block6, providedStorageLocation6)}; |
| assertArrayEquals(expectedFileRegions, actualFileRegions.toArray()); |
| } |
| |
| |
| class ReadThread implements Runnable { |
| private final Block block; |
| private final BlockAliasMap.Reader<FileRegion> reader; |
| private int delay; |
| private Optional<FileRegion> fileRegionOpt; |
| |
| ReadThread(Block block, BlockAliasMap.Reader<FileRegion> reader, |
| int delay) { |
| this.block = block; |
| this.reader = reader; |
| this.delay = delay; |
| } |
| |
| public Optional<FileRegion> getFileRegion() { |
| return fileRegionOpt; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| Thread.sleep(delay); |
| fileRegionOpt = reader.resolve(block); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| class WriteThread implements Runnable { |
| private final Block block; |
| private final BlockAliasMap.Writer<FileRegion> writer; |
| private final ProvidedStorageLocation providedStorageLocation; |
| private int delay; |
| |
| WriteThread(Block block, ProvidedStorageLocation providedStorageLocation, |
| BlockAliasMap.Writer<FileRegion> writer, int delay) { |
| this.block = block; |
| this.writer = writer; |
| this.providedStorageLocation = providedStorageLocation; |
| this.delay = delay; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| Thread.sleep(delay); |
| writer.store(new FileRegion(block, providedStorageLocation)); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| public FileRegion generateRandomFileRegion(int seed) { |
| Block block = new Block(seed, seed + 1, seed + 2); |
| Path path = new Path("koekoek"); |
| byte[] nonce = new byte[0]; |
| ProvidedStorageLocation providedStorageLocation = |
| new ProvidedStorageLocation(path, seed + 3, seed + 4, nonce); |
| return new FileRegion(block, providedStorageLocation); |
| } |
| |
| @Test |
| public void multipleReads() throws IOException { |
| levelDBAliasMapServer.setConf(conf); |
| levelDBAliasMapServer.start(); |
| inMemoryLevelDBAliasMapClient.setConf(conf); |
| |
| Random r = new Random(); |
| List<FileRegion> expectedFileRegions = r.ints(0, 200) |
| .limit(50) |
| .boxed() |
| .map(i -> generateRandomFileRegion(i)) |
| .collect(Collectors.toList()); |
| |
| |
| BlockAliasMap.Reader<FileRegion> reader = |
| inMemoryLevelDBAliasMapClient.getReader(null, BPID); |
| BlockAliasMap.Writer<FileRegion> writer = |
| inMemoryLevelDBAliasMapClient.getWriter(null, BPID); |
| |
| ExecutorService executor = Executors.newCachedThreadPool(); |
| |
| List<ReadThread> readThreads = expectedFileRegions |
| .stream() |
| .map(fileRegion -> new ReadThread(fileRegion.getBlock(), |
| reader, |
| 4000)) |
| .collect(Collectors.toList()); |
| |
| |
| List<? extends Future<?>> readFutures = |
| readThreads.stream() |
| .map(readThread -> executor.submit(readThread)) |
| .collect(Collectors.toList()); |
| |
| List<? extends Future<?>> writeFutures = expectedFileRegions |
| .stream() |
| .map(fileRegion -> new WriteThread(fileRegion.getBlock(), |
| fileRegion.getProvidedStorageLocation(), |
| writer, |
| 1000)) |
| .map(writeThread -> executor.submit(writeThread)) |
| .collect(Collectors.toList()); |
| |
| readFutures.stream() |
| .map(readFuture -> { |
| try { |
| return readFuture.get(); |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } catch (ExecutionException e) { |
| throw new RuntimeException(e); |
| } |
| }) |
| .collect(Collectors.toList()); |
| |
| List<FileRegion> actualFileRegions = readThreads.stream() |
| .map(readThread -> readThread.getFileRegion().get()) |
| .collect(Collectors.toList()); |
| |
| assertThat(actualFileRegions).containsExactlyInAnyOrder( |
| expectedFileRegions.toArray(new FileRegion[0])); |
| } |
| |
| @Test |
| public void testServerBindHost() throws Exception { |
| conf.set(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY, "0.0.0.0"); |
| writeRead(); |
| } |
| |
| @Test |
| public void testNonExistentBlock() throws Exception { |
| inMemoryLevelDBAliasMapClient.setConf(conf); |
| levelDBAliasMapServer.setConf(conf); |
| levelDBAliasMapServer.start(); |
| Block block1 = new Block(100, 43, 44); |
| ProvidedStorageLocation providedStorageLocation1 = null; |
| BlockAliasMap.Writer<FileRegion> writer1 = |
| inMemoryLevelDBAliasMapClient.getWriter(null, BPID); |
| try { |
| writer1.store(new FileRegion(block1, providedStorageLocation1)); |
| fail("Should fail on writing a region with null ProvidedLocation"); |
| } catch (IOException | IllegalArgumentException e) { |
| assertTrue(e.getMessage().contains("not be null")); |
| } |
| |
| BlockAliasMap.Reader<FileRegion> reader = |
| inMemoryLevelDBAliasMapClient.getReader(null, BPID); |
| LambdaTestUtils.assertOptionalUnset("Expected empty BlockAlias", |
| reader.resolve(block1)); |
| } |
| } |