Merge r1293034 through r1293500 from 0.23.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1293501 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 99c5111..ea4ffe4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -187,6 +187,11 @@
dfs.client.block.write.replace-datanode-on-failure.enable should be true.
(szetszwo)
+ HDFS-3008. Negative caching of local addrs doesn't work. (eli)
+
+ HDFS-3006. In WebHDFS, when the return body is empty, set the Content-Type
+ to application/octet-stream instead of application/json. (szetszwo)
+
Release 0.23.1 - 2012-02-17
INCOMPATIBLE CHANGES
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index be52b48..cdaf07e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -542,11 +542,12 @@
private static boolean isLocalAddress(InetSocketAddress targetAddr) {
InetAddress addr = targetAddr.getAddress();
Boolean cached = localAddrMap.get(addr.getHostAddress());
- if (cached != null && cached) {
+ if (cached != null) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Address " + targetAddr + " is local");
+ LOG.trace("Address " + targetAddr +
+ (cached ? " is local" : " is not local"));
}
- return true;
+ return cached;
}
// Check if the address is any local or loop back
@@ -561,7 +562,8 @@
}
}
if (LOG.isTraceEnabled()) {
- LOG.trace("Address " + targetAddr + " is local");
+ LOG.trace("Address " + targetAddr +
+ (local ? " is local" : " is not local"));
}
localAddrMap.put(addr.getHostAddress(), local);
return local;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
index 093cd9c..44ef527 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
@@ -117,7 +117,7 @@
@PUT
@Path("/")
@Consumes({"*/*"})
- @Produces({MediaType.APPLICATION_JSON})
+ @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response putRoot(
final InputStream in,
@Context final UserGroupInformation ugi,
@@ -147,7 +147,7 @@
@PUT
@Path("{" + UriFsPathParam.NAME + ":.*}")
@Consumes({"*/*"})
- @Produces({MediaType.APPLICATION_JSON})
+ @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response put(
final InputStream in,
@Context final UserGroupInformation ugi,
@@ -209,7 +209,7 @@
final InetSocketAddress nnHttpAddr = NameNode.getHttpAddress(conf);
final URI uri = new URI(WebHdfsFileSystem.SCHEME, null,
nnHttpAddr.getHostName(), nnHttpAddr.getPort(), fullpath, null, null);
- return Response.created(uri).type(MediaType.APPLICATION_JSON).build();
+ return Response.created(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
default:
throw new UnsupportedOperationException(op + " is not supported");
@@ -222,7 +222,7 @@
@POST
@Path("/")
@Consumes({"*/*"})
- @Produces({MediaType.APPLICATION_JSON})
+ @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response postRoot(
final InputStream in,
@Context final UserGroupInformation ugi,
@@ -243,7 +243,7 @@
@POST
@Path("{" + UriFsPathParam.NAME + ":.*}")
@Consumes({"*/*"})
- @Produces({MediaType.APPLICATION_JSON})
+ @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response post(
final InputStream in,
@Context final UserGroupInformation ugi,
@@ -287,7 +287,7 @@
IOUtils.cleanup(LOG, out);
IOUtils.cleanup(LOG, dfsclient);
}
- return Response.ok().type(MediaType.APPLICATION_JSON).build();
+ return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
default:
throw new UnsupportedOperationException(op + " is not supported");
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
index 44e5e13..e041b03 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
@@ -215,7 +215,7 @@
@PUT
@Path("/")
@Consumes({"*/*"})
- @Produces({MediaType.APPLICATION_JSON})
+ @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response putRoot(
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
@@ -263,7 +263,7 @@
@PUT
@Path("{" + UriFsPathParam.NAME + ":.*}")
@Consumes({"*/*"})
- @Produces({MediaType.APPLICATION_JSON})
+ @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response put(
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
@@ -324,7 +324,7 @@
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), -1L,
permission, overwrite, bufferSize, replication, blockSize);
- return Response.temporaryRedirect(uri).build();
+ return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case MKDIRS:
{
@@ -336,7 +336,7 @@
{
np.createSymlink(destination.getValue(), fullpath,
PermissionParam.getDefaultFsPermission(), createParent.getValue());
- return Response.ok().type(MediaType.APPLICATION_JSON).build();
+ return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case RENAME:
{
@@ -348,7 +348,7 @@
} else {
np.rename2(fullpath, destination.getValue(),
s.toArray(new Options.Rename[s.size()]));
- return Response.ok().type(MediaType.APPLICATION_JSON).build();
+ return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
}
case SETREPLICATION:
@@ -364,17 +364,17 @@
}
np.setOwner(fullpath, owner.getValue(), group.getValue());
- return Response.ok().type(MediaType.APPLICATION_JSON).build();
+ return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case SETPERMISSION:
{
np.setPermission(fullpath, permission.getFsPermission());
- return Response.ok().type(MediaType.APPLICATION_JSON).build();
+ return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case SETTIMES:
{
np.setTimes(fullpath, modificationTime.getValue(), accessTime.getValue());
- return Response.ok().type(MediaType.APPLICATION_JSON).build();
+ return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case RENEWDELEGATIONTOKEN:
{
@@ -389,7 +389,7 @@
final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
token.decodeFromUrlString(delegationTokenArgument.getValue());
np.cancelDelegationToken(token);
- return Response.ok().type(MediaType.APPLICATION_JSON).build();
+ return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
default:
throw new UnsupportedOperationException(op + " is not supported");
@@ -406,7 +406,7 @@
@POST
@Path("/")
@Consumes({"*/*"})
- @Produces({MediaType.APPLICATION_JSON})
+ @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response postRoot(
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
@@ -427,7 +427,7 @@
@POST
@Path("{" + UriFsPathParam.NAME + ":.*}")
@Consumes({"*/*"})
- @Produces({MediaType.APPLICATION_JSON})
+ @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response post(
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
@@ -459,7 +459,7 @@
{
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), -1L, bufferSize);
- return Response.temporaryRedirect(uri).build();
+ return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
default:
throw new UnsupportedOperationException(op + " is not supported");
@@ -542,7 +542,7 @@
{
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), offset.getValue(), offset, length, bufferSize);
- return Response.temporaryRedirect(uri).build();
+ return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case GET_BLOCK_LOCATIONS:
{
@@ -578,7 +578,7 @@
{
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), -1L);
- return Response.temporaryRedirect(uri).build();
+ return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case GETDELEGATIONTOKEN:
{
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
index 0c2372c..b551dd1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
@@ -27,6 +27,7 @@
import java.util.Map;
import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.MediaType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
@@ -314,6 +315,8 @@
conn.setRequestMethod(op.getType().toString());
conn.connect();
assertEquals(HttpServletResponse.SC_OK, conn.getResponseCode());
+ assertEquals(0, conn.getContentLength());
+ assertEquals(MediaType.APPLICATION_OCTET_STREAM, conn.getContentType());
assertEquals((short)0755, webhdfs.getFileStatus(dir).getPermission().toShort());
conn.disconnect();
}
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 674f0a8..e22241b 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -36,6 +36,7 @@
NEW FEATURES
IMPROVEMENTS
+
MAPREDUCE-3849. Change TokenCache's reading of the binary token file
(Daryn Sharp via bobby)
@@ -45,9 +46,17 @@
MAPREDUCE-3877 Add a test to formalise the current state transitions
of the yarn lifecycle. (stevel)
+ MAPREDUCE-3866. Fixed the bin/yarn script to not print the command line
+ unnecessarily. (vinodkv)
+
+ MAPREDUCE-3730. Modified RM to allow restarted NMs to be able to join the
+ cluster without waiting for expiry. (Jason Lowe via vinodkv)
+
OPTIMIZATIONS
BUG FIXES
+ MAPREDUCE-3918 proc_historyserver no longer in command line arguments for
+ HistoryServer (Jon Eagles via bobby)
MAPREDUCE-3862. Nodemanager can appear to hang on shutdown due to lingering
DeletionService threads (Jason Lowe via bobby)
@@ -74,6 +83,13 @@
MAPREDUCE-3878. Null user on filtered jobhistory job page (Jonathon Eagles
via tgraves)
+
+ MAPREDUCE-3738. MM can hang during shutdown if AppLogAggregatorImpl thread
+ dies unexpectedly (Jason Lowe via sseth)
+
+ MAPREDUCE-3904 Job history produced with mapreduce.cluster.acls.enabled
+ false can not be viewed with mapreduce.cluster.acls.enabled true
+ (Jonathon Eagles via tgraves)
Release 0.23.1 - 2012-02-17
diff --git a/hadoop-mapreduce-project/bin/mapred b/hadoop-mapreduce-project/bin/mapred
index 898e053..eb13e60 100755
--- a/hadoop-mapreduce-project/bin/mapred
+++ b/hadoop-mapreduce-project/bin/mapred
@@ -136,4 +136,4 @@
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,NullAppender}"
export CLASSPATH
-exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"
+exec "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index cd357a2..0abccc4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -438,6 +438,9 @@
public boolean checkAccess(UserGroupInformation callerUGI,
JobACL jobOperation) {
AccessControlList jobACL = jobACLs.get(jobOperation);
+ if (jobACL == null) {
+ return true;
+ }
return aclsManager.checkAccess(callerUGI, jobOperation, username, jobACL);
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
index 2461760..81387a0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
@@ -191,5 +191,16 @@
null, null, null, true, null, 0, null);
Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
+
+ // Setup configuration access without security enabled
+ Configuration conf5 = new Configuration();
+ conf5.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+ conf5.set(MRJobConfig.JOB_ACL_VIEW_JOB, "");
+
+ // Verify access
+ JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
+ null, null, null, true, null, 0, null);
+ Assert.assertTrue(job5.checkAccess(ugi1, null));
+ Assert.assertTrue(job5.checkAccess(ugi2, null));
}
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
index d0465d3..9584d05 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
@@ -330,6 +330,9 @@
boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) {
Map<JobACL, AccessControlList> jobACLs = jobInfo.getJobACLs();
AccessControlList jobACL = jobACLs.get(jobOperation);
+ if (jobACL == null) {
+ return true;
+ }
return aclsMgr.checkAccess(callerUGI, jobOperation,
jobInfo.getUsername(), jobACL);
}
diff --git a/hadoop-mapreduce-project/hadoop-yarn/bin/yarn b/hadoop-mapreduce-project/hadoop-yarn/bin/yarn
index d7dae8b..fe80061 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/bin/yarn
+++ b/hadoop-mapreduce-project/hadoop-yarn/bin/yarn
@@ -221,6 +221,5 @@
YARN_OPTS="$YARN_OPTS -Djava.library.path=$JAVA_LIBRARY_PATH"
fi
-echo "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $YARN_OPTS -classpath "$CLASSPATH" $CLASS "$@"
exec "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $YARN_OPTS -classpath "$CLASSPATH" $CLASS "$@"
fi
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index fdd4ecb..d226464 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -133,8 +133,16 @@
}
@Override
+ public void run() {
+ try {
+ doAppLogAggregation();
+ } finally {
+ this.appAggregationFinished.set(true);
+ }
+ }
+
@SuppressWarnings("unchecked")
- public void run() {
+ private void doAppLogAggregation() {
ContainerId containerId;
while (!this.appFinishing.get()) {
@@ -189,8 +197,6 @@
this.dispatcher.getEventHandler().handle(
new ApplicationEvent(this.appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
-
- this.appAggregationFinished.set(true);
}
private Path getRemoteNodeTmpLogFileForApp() {
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index a1853b3..18b8d9b 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -21,6 +21,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.when;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;
@@ -69,6 +70,7 @@
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
@@ -536,4 +538,31 @@
appAcls.put(ApplicationAccessType.VIEW_APP, "*");
return appAcls;
}
+
+ @Test(timeout=20000)
+ @SuppressWarnings("unchecked")
+ public void testStopAfterError() throws Exception {
+ DeletionService delSrvc = mock(DeletionService.class);
+
+ // get the AppLogAggregationImpl thread to crash
+ LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class);
+ when(mockedDirSvc.getLogDirs()).thenThrow(new RuntimeException());
+
+ DrainDispatcher dispatcher = createDispatcher();
+ EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
+ dispatcher.register(ApplicationEventType.class, appEventHandler);
+
+ LogAggregationService logAggregationService =
+ new LogAggregationService(dispatcher, this.context, delSrvc,
+ mockedDirSvc);
+ logAggregationService.init(this.conf);
+ logAggregationService.start();
+
+ ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
+ logAggregationService.handle(new LogHandlerAppStartedEvent(
+ application1, this.user, null,
+ ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+
+ logAggregationService.stop();
+ }
}
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 75c91aa..d762766 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -51,6 +51,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
@@ -177,17 +178,17 @@
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
resolve(host), capability);
- if (this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode) != null) {
- LOG.info("Duplicate registration from the node at: " + host
- + ", Sending SHUTDOWN Signal to the NodeManager");
- regResponse.setNodeAction(NodeAction.SHUTDOWN);
- response.setRegistrationResponse(regResponse);
- return response;
+ RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
+ if (oldNode == null) {
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeEvent(nodeId, RMNodeEventType.STARTED));
+ } else {
+ LOG.info("Reconnect from the node at: " + host);
+ this.nmLivelinessMonitor.unregister(nodeId);
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeReconnectEvent(nodeId, rmNode));
}
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(nodeId, RMNodeEventType.STARTED));
-
this.nmLivelinessMonitor.register(nodeId);
LOG.info("NodeManager from node " + host + "(cmPort: " + cmPort
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
index d562836..ef644be 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
@@ -28,6 +28,7 @@
// ResourceTrackerService
STATUS_UPDATE,
REBOOTING,
+ RECONNECTED,
// Source: Application
CLEANUP_APP,
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 9b8892a..f0384da 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -110,9 +110,11 @@
RMNodeEventType,
RMNodeEvent>(RMNodeState.NEW)
- //Transitions from RUNNING state
+ //Transitions from NEW state
.addTransition(RMNodeState.NEW, RMNodeState.RUNNING,
RMNodeEventType.STARTED, new AddNodeTransition())
+
+ //Transitions from RUNNING state
.addTransition(RMNodeState.RUNNING,
EnumSet.of(RMNodeState.RUNNING, RMNodeState.UNHEALTHY),
RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
@@ -129,11 +131,15 @@
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
.addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING,
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
+ .addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING,
+ RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
//Transitions from UNHEALTHY state
.addTransition(RMNodeState.UNHEALTHY,
EnumSet.of(RMNodeState.UNHEALTHY, RMNodeState.RUNNING),
RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenUnHealthyTransition())
+ .addTransition(RMNodeState.UNHEALTHY, RMNodeState.UNHEALTHY,
+ RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
// create the topology tables
.installTopology();
@@ -372,6 +378,39 @@
}
}
+ public static class ReconnectNodeTransition implements
+ SingleArcTransition<RMNodeImpl, RMNodeEvent> {
+
+ @Override
+ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+ // Kill containers since node is rejoining.
+ rmNode.context.getDispatcher().getEventHandler().handle(
+ new NodeRemovedSchedulerEvent(rmNode));
+
+ RMNode newNode = ((RMNodeReconnectEvent)event).getReconnectedNode();
+ if (rmNode.getTotalCapability().equals(newNode.getTotalCapability())
+ && rmNode.getHttpPort() == newNode.getHttpPort()) {
+ // Reset heartbeat ID since node just restarted.
+ rmNode.getLastHeartBeatResponse().setResponseId(0);
+ rmNode.context.getDispatcher().getEventHandler().handle(
+ new NodeAddedSchedulerEvent(rmNode));
+ } else {
+ // Reconnected node differs, so replace old node and start new node
+ switch (rmNode.getState()) {
+ case RUNNING:
+ ClusterMetrics.getMetrics().decrNumActiveNodes();
+ break;
+ case UNHEALTHY:
+ ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
+ break;
+ }
+ rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
+ rmNode.context.getDispatcher().getEventHandler().handle(
+ new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED));
+ }
+ }
+ }
+
public static class CleanUpAppTransition
implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java
new file mode 100644
index 0000000..b1fa0ad
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java
@@ -0,0 +1,34 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class RMNodeReconnectEvent extends RMNodeEvent {
+ private RMNode reconnectedNode;
+
+ public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode) {
+ super(nodeId, RMNodeEventType.RECONNECTED);
+ reconnectedNode = newNode;
+ }
+
+ public RMNode getReconnectedNode() {
+ return reconnectedNode;
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index aa7d23e..0f90f6c 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -666,7 +666,10 @@
private synchronized void removeNode(RMNode nodeInfo) {
SchedulerNode node = this.nodes.get(nodeInfo.getNodeID());
- Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
+ if (node == null) {
+ return;
+ }
+ Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
root.updateClusterResource(clusterResource);
--numNodeManagers;
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 6fcf1fe..1526683 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -731,6 +731,9 @@
private synchronized void removeNode(RMNode nodeInfo) {
SchedulerNode node = getNode(nodeInfo.getNodeID());
+ if (node == null) {
+ return;
+ }
// Kill running containers
for(RMContainer container : node.getRunningContainers()) {
containerCompleted(container,
@@ -744,7 +747,7 @@
this.nodes.remove(nodeInfo.getNodeID());
// Update cluster metrics
- Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
+ Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
}
@Override
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 05b17a3..f30883f 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -19,23 +19,18 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
import com.google.common.collect.Lists;
@@ -195,8 +190,12 @@
};
private static RMNode buildRMNode(int rack, final Resource perNode, RMNodeState state, String httpAddr) {
+ return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++);
+ }
+
+ private static RMNode buildRMNode(int rack, final Resource perNode, RMNodeState state, String httpAddr, int hostnum) {
final String rackName = "rack"+ rack;
- final int nid = NODE_ID++;
+ final int nid = hostnum;
final String hostName = "host"+ nid;
final int port = 123;
final NodeId nodeID = newNodeID(hostName, port);
@@ -219,4 +218,8 @@
public static RMNode newNodeInfo(int rack, final Resource perNode) {
return buildRMNode(rack, perNode, RMNodeState.RUNNING, "localhost:0");
}
+
+ public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum) {
+ return buildRMNode(rack, perNode, null, "localhost:0", hostnum);
+ }
}
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
index 2cca6f0..e995e51 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import java.util.ArrayList;
import java.util.List;
import junit.framework.Assert;
@@ -27,10 +28,17 @@
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -167,10 +175,37 @@
testMinimumAllocation(conf);
}
+ @Test
+ public void testReconnectedNode() throws Exception {
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ conf.setQueues("default", new String[] {"default"});
+ conf.setCapacity("default", 100);
+ FifoScheduler fs = new FifoScheduler();
+ fs.reinitialize(conf, null, null);
+
+ RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
+ RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
+
+ fs.handle(new NodeAddedSchedulerEvent(n1));
+ fs.handle(new NodeAddedSchedulerEvent(n2));
+ List<ContainerStatus> emptyList = new ArrayList<ContainerStatus>();
+ fs.handle(new NodeUpdateSchedulerEvent(n1, emptyList, emptyList));
+ Assert.assertEquals(6 * GB, fs.getRootQueueMetrics().getAvailableMB());
+
+ // reconnect n1 with downgraded memory
+ n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1);
+ fs.handle(new NodeRemovedSchedulerEvent(n1));
+ fs.handle(new NodeAddedSchedulerEvent(n1));
+ fs.handle(new NodeUpdateSchedulerEvent(n1, emptyList, emptyList));
+
+ Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB());
+ }
+
public static void main(String[] args) throws Exception {
TestFifoScheduler t = new TestFifoScheduler();
t.test();
t.testDefaultMinimumAllocation();
t.testNonDefaultMinimumAllocation();
+ t.testReconnectedNode();
}
}
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index 8b3f4a0..7826819 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -31,12 +31,17 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.Test;
@@ -189,7 +194,7 @@
conf.set("yarn.resourcemanager.nodes.exclude-path", hostFile
.getAbsolutePath());
- MockRM rm = new MockRM(conf);
+ rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 5120);
@@ -223,6 +228,61 @@
ClusterMetrics.getMetrics().getUnhealthyNMs());
}
+ @Test
+ public void testReconnectNode() throws Exception {
+ final DrainDispatcher dispatcher = new DrainDispatcher();
+ MockRM rm = new MockRM() {
+ @Override
+ protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
+ return new SchedulerEventDispatcher(this.scheduler) {
+ @Override
+ public void handle(SchedulerEvent event) {
+ scheduler.handle(event);
+ }
+ };
+ }
+
+ @Override
+ protected Dispatcher createDispatcher() {
+ return dispatcher;
+ }
+ };
+ rm.start();
+
+ MockNM nm1 = rm.registerNode("host1:1234", 5120);
+ MockNM nm2 = rm.registerNode("host2:5678", 5120);
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(false);
+ checkUnealthyNMCount(rm, nm2, true, 1);
+ final int expectedNMs = ClusterMetrics.getMetrics().getNumActiveNMs();
+ QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
+ Assert.assertEquals(5120 + 5120, metrics.getAvailableMB());
+
+ // reconnect of healthy node
+ nm1 = rm.registerNode("host1:1234", 5120);
+ HeartbeatResponse response = nm1.nodeHeartbeat(true);
+ Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
+ dispatcher.await();
+ Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
+ checkUnealthyNMCount(rm, nm2, true, 1);
+
+ // reconnect of unhealthy node
+ nm2 = rm.registerNode("host2:5678", 5120);
+ response = nm2.nodeHeartbeat(false);
+ Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
+ dispatcher.await();
+ Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
+ checkUnealthyNMCount(rm, nm2, true, 1);
+
+ // reconnect of node with changed capability
+ nm1 = rm.registerNode("host2:5678", 10240);
+ dispatcher.await();
+ response = nm2.nodeHeartbeat(true);
+ dispatcher.await();
+ Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
+ Assert.assertEquals(5120 + 10240, metrics.getAvailableMB());
+ }
+
private void writeToHostsFile(String... hosts) throws IOException {
if (!hostFile.exists()) {
TEMP_DIR.mkdirs();
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index bcfd09d..dbe21d1 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
@@ -41,12 +42,15 @@
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestCapacityScheduler {
private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
+ private final int GB = 1024;
private static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
private static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
@@ -97,8 +101,6 @@
LOG.info("--- START: testCapacityScheduler ---");
- final int GB = 1024;
-
// Register node1
String host_0 = "host_0";
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
@@ -340,4 +342,27 @@
cs.reinitialize(conf, null, null);
}
+ @Test
+ public void testReconnectedNode() throws Exception {
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(csConf);
+ CapacityScheduler cs = new CapacityScheduler();
+ cs.reinitialize(csConf, null, null);
+
+ RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
+ RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
+
+ cs.handle(new NodeAddedSchedulerEvent(n1));
+ cs.handle(new NodeAddedSchedulerEvent(n2));
+
+ Assert.assertEquals(6 * GB, cs.getClusterResources().getMemory());
+
+ // reconnect n1 with downgraded memory
+ n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1);
+ cs.handle(new NodeRemovedSchedulerEvent(n1));
+ cs.handle(new NodeAddedSchedulerEvent(n1));
+
+ Assert.assertEquals(4 * GB, cs.getClusterResources().getMemory());
+ }
}