HDFS-13794. [PROVIDED Phase 2] Teach BlockAliasMap.Writer remove method. Contributed by Ewan Higgs
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 1c3a71f..e2f2f38 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -98,7 +98,6 @@
HdfsClientConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS;
public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT = "0.0.0.0:50200";
public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_BIND_HOST = "dfs.provided.aliasmap.inmemory.rpc.bind-host";
-
public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR = "dfs.provided.aliasmap.inmemory.leveldb.dir";
public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE = "dfs.provided.aliasmap.inmemory.batch-size";
public static final int DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE_DEFAULT = 500;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/AliasMapProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/AliasMapProtocolServerSideTranslatorPB.java
index 8d89c40..48da058 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/AliasMapProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/AliasMapProtocolServerSideTranslatorPB.java
@@ -57,6 +57,9 @@
private static final WriteResponseProto VOID_WRITE_RESPONSE =
WriteResponseProto.newBuilder().build();
+ private static final RemoveResponseProto VOID_REMOVE_RESPONSE =
+ RemoveResponseProto.newBuilder().build();
+
@Override
public WriteResponseProto write(RpcController controller,
WriteRequestProto request) throws ServiceException {
@@ -72,6 +75,19 @@
}
@Override
+ public RemoveResponseProto remove(RpcController controller,
+ RemoveRequestProto request) throws ServiceException {
+ try {
+ Block toRemove =
+ PBHelperClient.convert(request.getKey());
+ aliasMap.remove(toRemove);
+ return VOID_REMOVE_RESPONSE;
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
public ReadResponseProto read(RpcController controller,
ReadRequestProto request) throws ServiceException {
try {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java
index d9e984b..dc5bd3b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java
@@ -214,6 +214,21 @@
}
@Override
+ public void remove(@Nonnull Block block) throws IOException {
+ RemoveRequestProto request =
+ RemoveRequestProto
+ .newBuilder()
+ .setKey(PBHelperClient.convert(block))
+ .build();
+
+ try {
+ rpcProxy.remove(null, request);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
public String getBlockPoolId() throws IOException {
try {
BlockPoolResponseProto response = rpcProxy.getBlockPoolId(null,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMap.java
index 8df27cd..777e673 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMap.java
@@ -180,6 +180,11 @@
levelDb.put(extendedBlockDbFormat, providedStorageLocationDbFormat);
}
+ public void remove(@Nonnull Block block) throws IOException {
+ byte[] extendedBlockDbFormat = toProtoBufBytes(block);
+ levelDb.delete(extendedBlockDbFormat);
+ }
+
@Override
public String getBlockPoolId() {
return blockPoolID;
@@ -218,8 +223,7 @@
public static byte[] toProtoBufBytes(@Nonnull Block block)
throws IOException {
- BlockProto blockProto =
- PBHelperClient.convert(block);
+ BlockProto blockProto = PBHelperClient.convert(block);
ByteArrayOutputStream blockOutputStream = new ByteArrayOutputStream();
blockProto.writeTo(blockOutputStream);
return blockOutputStream.toByteArray();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMapProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMapProtocol.java
index c3824e5..170d1f6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMapProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMapProtocol.java
@@ -99,6 +99,15 @@
throws IOException;
/**
+ * Removed the block and it's associated {@link ProvidedStorageLocation}
+ * from the alias map.
+ * As this is for in memory alias map, we do not require the bpid.
+ * @param block
+ * @throws IOException
+ */
+ void remove(@Nonnull Block block) throws IOException;
+
+ /**
* Get the associated block pool id.
* @return the block pool id associated with the Namenode running
* the in-memory alias map.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryLevelDBAliasMapServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryLevelDBAliasMapServer.java
index 6a2fa56..ea6a2d8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryLevelDBAliasMapServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryLevelDBAliasMapServer.java
@@ -122,6 +122,11 @@
}
@Override
+ public void remove(@Nonnull Block block) throws IOException {
+ aliasMap.remove(block);
+ }
+
+ @Override
public String getBlockPoolId() {
return blockPoolId;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/BlockAliasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/BlockAliasMap.java
index 897aefd..9905039 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/BlockAliasMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/BlockAliasMap.java
@@ -88,8 +88,20 @@
*/
public interface Options { }
- public abstract void store(U token) throws IOException;
+ /**
+ * Store the BlockAlias; this may be a destructive store, replacing old
+ * data.
+ * @param blockAlias
+ * @throws IOException
+ */
+ public abstract void store(U blockAlias) throws IOException;
+ /**
+ * Remove the block from the AliasMap.
+ * @param block
+ * @throws IOException
+ */
+ public abstract void remove(Block block) throws IOException;
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/InMemoryLevelDBAliasMapClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/InMemoryLevelDBAliasMapClient.java
index cacf8f1..7677258 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/InMemoryLevelDBAliasMapClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/InMemoryLevelDBAliasMapClient.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -144,11 +145,20 @@
}
@Override
+ public void remove(Block block) throws IOException {
+ aliasMap.remove(block);
+ }
+
+ @Override
public void close() throws IOException {
}
}
InMemoryLevelDBAliasMapClient() {
+ if (UserGroupInformation.isSecurityEnabled()) {
+ throw new UnsupportedOperationException("Unable to start "
+ + "InMemoryLevelDBAliasMapClient as security is enabled");
+ }
aliasMaps = new ArrayList<>();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/LevelDBFileRegionAliasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/LevelDBFileRegionAliasMap.java
index 6afe6bb..81756e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/LevelDBFileRegionAliasMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/LevelDBFileRegionAliasMap.java
@@ -265,6 +265,12 @@
}
@Override
+ public void remove(Block block) throws IOException {
+ byte[] key = toProtoBufBytes(block);
+ db.delete(key);
+ }
+
+ @Override
public void close() throws IOException {
if (db != null) {
db.close();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TextFileRegionAliasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TextFileRegionAliasMap.java
index 4d65142..fb73b41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TextFileRegionAliasMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TextFileRegionAliasMap.java
@@ -458,6 +458,12 @@
}
@Override
+ public void remove(Block block) throws IOException {
+ throw new RuntimeException("TextFileWriter does not support " +
+ "block removal");
+ }
+
+ @Override
public void close() throws IOException {
out.close();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/AliasMapProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/AliasMapProtocol.proto
index 8050f35..16936c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/AliasMapProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/AliasMapProtocol.proto
@@ -36,6 +36,13 @@
message WriteResponseProto {
}
+message RemoveRequestProto {
+ required BlockProto key = 1;
+}
+
+message RemoveResponseProto {
+}
+
message ReadRequestProto {
required BlockProto key = 1;
}
@@ -62,6 +69,7 @@
service AliasMapProtocolService {
rpc write(WriteRequestProto) returns(WriteResponseProto);
+ rpc remove(RemoveRequestProto) returns(RemoveResponseProto);
rpc read(ReadRequestProto) returns(ReadResponseProto);
rpc list(ListRequestProto) returns(ListResponseProto);
rpc getBlockPoolId(BlockPoolRequestProto) returns(BlockPoolResponseProto);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/aliasmap/ITestInMemoryAliasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/aliasmap/ITestInMemoryAliasMap.java
index 2785f68..a9f5c73 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/aliasmap/ITestInMemoryAliasMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/aliasmap/ITestInMemoryAliasMap.java
@@ -64,7 +64,7 @@
}
@Test
- public void readNotFoundReturnsNothing() throws IOException {
+ public void testReadNotFoundReturnsNothing() throws IOException {
Block block = new Block(42, 43, 44);
Optional<ProvidedStorageLocation> actualProvidedStorageLocationOpt
@@ -74,7 +74,7 @@
}
@Test
- public void readWrite() throws Exception {
+ public void testReadWriteRemove() throws Exception {
Block block = new Block(42, 43, 44);
Path path = new Path("eagle", "mouse");
@@ -92,14 +92,18 @@
Optional<ProvidedStorageLocation> actualProvidedStorageLocationOpt
= aliasMap.read(block);
+
assertTrue(actualProvidedStorageLocationOpt.isPresent());
assertEquals(expectedProvidedStorageLocation,
actualProvidedStorageLocationOpt.get());
+ aliasMap.remove(block);
+ actualProvidedStorageLocationOpt = aliasMap.read(block);
+ assertFalse(actualProvidedStorageLocationOpt.isPresent());
}
@Test
- public void list() throws IOException {
+ public void testIteration() throws IOException {
Block block1 = new Block(42, 43, 44);
Block block2 = new Block(43, 44, 45);
Block block3 = new Block(44, 45, 46);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestInMemoryLevelDBAliasMapClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestInMemoryLevelDBAliasMapClient.java
index 1e42a88..c4fea99 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestInMemoryLevelDBAliasMapClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestInMemoryLevelDBAliasMapClient.java
@@ -28,7 +28,7 @@
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import org.apache.hadoop.hdfs.server.common.FileRegion;
-import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.net.NetUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -36,6 +36,7 @@
import org.junit.rules.ExpectedException;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY;
+import static org.apache.hadoop.test.LambdaTestUtils.assertOptionalUnset;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -70,7 +71,7 @@
@Before
public void setUp() throws IOException {
conf = new Configuration();
- int port = 9876;
+ int port = NetUtils.getFreeSocketPort();
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
"localhost:" + port);
@@ -92,7 +93,7 @@
}
@Test
- public void writeRead() throws Exception {
+ public void writeReadRemove() throws Exception {
levelDBAliasMapServer.setConf(conf);
levelDBAliasMapServer.start();
inMemoryLevelDBAliasMapClient.setConf(conf);
@@ -110,6 +111,8 @@
Optional<FileRegion> fileRegion = reader.resolve(block);
assertEquals(new FileRegion(block, providedStorageLocation),
fileRegion.get());
+ writer.remove(block);
+ assertOptionalUnset("Block should not exist", reader.resolve(block));
}
@Test
@@ -354,7 +357,7 @@
@Test
public void testServerBindHost() throws Exception {
conf.set(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY, "0.0.0.0");
- writeRead();
+ writeReadRemove();
}
@Test
@@ -375,7 +378,7 @@
BlockAliasMap.Reader<FileRegion> reader =
inMemoryLevelDBAliasMapClient.getReader(null, BPID);
- LambdaTestUtils.assertOptionalUnset("Expected empty BlockAlias",
+ assertOptionalUnset("Expected empty BlockAlias",
reader.resolve(block1));
}
}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestLevelDbMockAliasMapClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestLevelDbMockAliasMapClient.java
index 534bc36..ab1fa63 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestLevelDbMockAliasMapClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestLevelDbMockAliasMapClient.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer;
import org.apache.hadoop.hdfs.server.common.FileRegion;
+import org.apache.hadoop.net.NetUtils;
import org.iq80.leveldb.DBException;
import org.junit.After;
import org.junit.Before;
@@ -56,7 +57,7 @@
levelDBAliasMapServer = new InMemoryLevelDBAliasMapServer(
(config, blockPoolID) -> aliasMapMock, bpid);
conf = new Configuration();
- int port = 9877;
+ int port = NetUtils.getFreeSocketPort();
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
"localhost:" + port);
diff --git a/hadoop-tools/hadoop-fs2img/src/main/java/org/apache/hadoop/hdfs/server/namenode/NullBlockAliasMap.java b/hadoop-tools/hadoop-fs2img/src/main/java/org/apache/hadoop/hdfs/server/namenode/NullBlockAliasMap.java
index 41e202d..e439bab 100644
--- a/hadoop-tools/hadoop-fs2img/src/main/java/org/apache/hadoop/hdfs/server/namenode/NullBlockAliasMap.java
+++ b/hadoop-tools/hadoop-fs2img/src/main/java/org/apache/hadoop/hdfs/server/namenode/NullBlockAliasMap.java
@@ -74,7 +74,12 @@
throws IOException {
return new Writer<FileRegion>() {
@Override
- public void store(FileRegion token) throws IOException {
+ public void store(FileRegion blockAlias) throws IOException {
+ // do nothing
+ }
+
+ @Override
+ public void remove(Block block) throws IOException {
// do nothing
}