Merge branch 'develop' into feature/SLIDER-82-pass-3.1
# Conflicts:
# slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
# slider-core/src/test/groovy/org/apache/slider/client/TestClientBadArgs.groovy
diff --git a/app-packages/accumulo/appConfig-default.json b/app-packages/accumulo/appConfig-default.json
index ad644f7..78e63ba 100644
--- a/app-packages/accumulo/appConfig-default.json
+++ b/app-packages/accumulo/appConfig-default.json
@@ -52,7 +52,7 @@
"site.accumulo-site.tserver.walog.max.size": "40M",
"site.accumulo-site.trace.user": "root",
- "site.accumulo-site.trace.zookeeper.path": "${DEFAULT_ZK_PATH}/tracers",
+ "site.accumulo-site.trace.zookeeper.path": "/accumulo/tracers-${USER}-${CLUSTER_NAME}",
"site.accumulo-site.master.port.client": "0",
"site.accumulo-site.trace.port.client": "0",
diff --git a/app-packages/accumulo/appConfig-secured-default.json b/app-packages/accumulo/appConfig-secured-default.json
index 984de61..1c9bfbe 100644
--- a/app-packages/accumulo/appConfig-secured-default.json
+++ b/app-packages/accumulo/appConfig-secured-default.json
@@ -51,7 +51,7 @@
"site.accumulo-site.general.delegation.token.update.interval": "1d",
"site.accumulo-site.trace.user": "${USER_NAME}@EXAMPLE.COM",
- "site.accumulo-site.trace.zookeeper.path": "${DEFAULT_ZK_PATH}/tracers",
+ "site.accumulo-site.trace.zookeeper.path": "/accumulo/tracers-${USER}-${CLUSTER_NAME}",
"site.accumulo-site.trace.token.property.keytab": "${AGENT_WORK_ROOT}/keytabs/${USER_NAME}.ACCUMULO.headless.keytab",
"site.accumulo-site.trace.token.type": "org.apache.accumulo.core.client.security.tokens.KerberosToken",
diff --git a/app-packages/accumulo/appConfig-ssl-default.json b/app-packages/accumulo/appConfig-ssl-default.json
index 9615eff..b6ac499 100644
--- a/app-packages/accumulo/appConfig-ssl-default.json
+++ b/app-packages/accumulo/appConfig-ssl-default.json
@@ -59,7 +59,7 @@
"site.accumulo-site.tserver.walog.max.size": "40M",
"site.accumulo-site.trace.user": "root",
- "site.accumulo-site.trace.zookeeper.path": "${DEFAULT_ZK_PATH}/tracers",
+ "site.accumulo-site.trace.zookeeper.path": "/accumulo/tracers-${USER}-${CLUSTER_NAME}",
"site.accumulo-site.master.port.client": "0",
"site.accumulo-site.trace.port.client": "0",
diff --git a/slider-agent/src/main/python/agent/ActionQueue.py b/slider-agent/src/main/python/agent/ActionQueue.py
index ca68d5d..497d4f4 100644
--- a/slider-agent/src/main/python/agent/ActionQueue.py
+++ b/slider-agent/src/main/python/agent/ActionQueue.py
@@ -185,6 +185,7 @@
# In future we might check status of STOP command and take other measures
# if graceful STOP fails (like force kill the processes)
if command['roleCommand'] == 'STOP':
+ logger.info("Stop command received")
self.controller.appGracefulStopTriggered = True
# dumping results
diff --git a/slider-agent/src/main/python/agent/Controller.py b/slider-agent/src/main/python/agent/Controller.py
index e99b78c..ec3bed7 100644
--- a/slider-agent/src/main/python/agent/Controller.py
+++ b/slider-agent/src/main/python/agent/Controller.py
@@ -214,6 +214,9 @@
if (self.componentActualState == State.FAILED) \
and (self.componentExpectedState == State.STARTED) \
and (self.failureCount >= Controller.MAX_FAILURE_COUNT_TO_STOP):
+ logger.info("Component instance has failed, stopping the agent ...")
+ shouldStopAgent = True
+ if (self.componentActualState == State.STOPPED):
logger.info("Component instance has stopped, stopping the agent ...")
shouldStopAgent = True
if self.terminateAgent:
@@ -272,6 +275,8 @@
try:
if self.appGracefulStopQueued and not self.isAppGracefullyStopped():
# Continue to wait until app is stopped
+ logger.info("Graceful stop in progress..")
+ time.sleep(1)
continue
if self.shouldStopAgent():
ProcessHelper.stopAgent()
@@ -467,9 +472,18 @@
# The STOP command index is stored to be deleted
if command["roleCommand"] == "STOP":
+ logger.info("Got stop command = %s", (command))
self.stopCommand = command
+ '''
+ If app is already running then stopApp() will initiate graceful stop
+ '''
+ self.stopApp()
delete = True
deleteIndex = index
+ if self.componentActualState == State.STARTED:
+ self.componentExpectedState = State.STOPPED
+ self.componentActualState = State.STOPPING
+ self.failureCount = 0
if command["roleCommand"] == "INSTALL":
self.componentExpectedState = State.INSTALLED
diff --git a/slider-agent/src/main/python/agent/main.py b/slider-agent/src/main/python/agent/main.py
index bfd4a27..68f46b7 100644
--- a/slider-agent/src/main/python/agent/main.py
+++ b/slider-agent/src/main/python/agent/main.py
@@ -56,7 +56,9 @@
logger.info('signal received, exiting.')
global controller
if controller is not None and hasattr(controller, 'actionQueue'):
- tmpdir = controller.actionQueue.dockerManager.stop_container()
+ docker_mode = controller.actionQueue.docker_mode
+ if docker_mode:
+ tmpdir = controller.actionQueue.dockerManager.stop_container()
ProcessHelper.stopAgent()
diff --git a/slider-agent/src/test/python/agent/TestController.py b/slider-agent/src/test/python/agent/TestController.py
index 69ed8cc..7eeecb9 100644
--- a/slider-agent/src/test/python/agent/TestController.py
+++ b/slider-agent/src/test/python/agent/TestController.py
@@ -527,7 +527,7 @@
@patch("time.sleep")
@patch("json.loads")
@patch("json.dumps")
- def test_heartbeatWithServerTerminateAgent(self, dumpsMock, loadsMock, sleepMock, event_mock):
+ def test_heartbeatWithServerStopAgent(self, dumpsMock, loadsMock, sleepMock, event_mock):
original_value = self.controller.config
self.controller.config = AgentConfig("", "")
out = StringIO.StringIO()
@@ -626,7 +626,108 @@
self.assertFalse(self.controller.terminateAgent)
assert not self.controller.stopCommand == None
- # now no need to have STOP command in response, just send terminateAgent
+ # Now STOP execution command stops the agent completely so terminateAgent
+ # flag test is moved to test_heartbeatWithServerTerminateAgent
+
+ sleepMock.assert_called_with(
+ self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
+
+ sys.stdout = sys.__stdout__
+ self.controller.sendRequest = Controller.Controller.sendRequest
+ self.controller.addToQueue = Controller.Controller.addToQueue
+
+ self.controller.config = original_value
+ pass
+
+ @patch.object(threading._Event, "wait")
+ @patch("time.sleep")
+ @patch("json.loads")
+ @patch("json.dumps")
+ def test_heartbeatWithServerTerminateAgent(self, dumpsMock, loadsMock, sleepMock, event_mock):
+ original_value = self.controller.config
+ self.controller.config = AgentConfig("", "")
+ out = StringIO.StringIO()
+ sys.stdout = out
+
+ hearbeat = MagicMock()
+ self.controller.heartbeat = hearbeat
+
+ dumpsMock.return_value = "data"
+
+ sendRequest = MagicMock(name="sendRequest")
+ self.controller.sendRequest = sendRequest
+
+ self.controller.responseId = 1
+ response = {"responseId":"2", "restartAgent": False}
+ loadsMock.return_value = response
+
+ def one_heartbeat(*args, **kwargs):
+ self.controller.DEBUG_STOP_HEARTBEATING = True
+ return "data"
+
+ sendRequest.side_effect = one_heartbeat
+
+ actionQueue = MagicMock()
+ actionQueue.isIdle.return_value = True
+
+ # one successful request, after stop
+ self.controller.actionQueue = actionQueue
+ self.controller.heartbeatWithServer()
+ self.assertTrue(sendRequest.called)
+
+ calls = []
+ def retry(*args, **kwargs):
+ if len(calls) == 0:
+ calls.append(1)
+ response["responseId"] = "3"
+ raise Exception()
+ if len(calls) > 0:
+ self.controller.DEBUG_STOP_HEARTBEATING = True
+ return "data"
+
+ # exception, retry, successful and stop
+ sendRequest.side_effect = retry
+ self.controller.DEBUG_STOP_HEARTBEATING = False
+ self.controller.heartbeatWithServer()
+
+ self.assertEqual(1, self.controller.DEBUG_SUCCESSFULL_HEARTBEATS)
+
+ original_stopApp = self.controller.stopApp
+
+ # terminateAgent command - test 1
+ self.controller.responseId = 1
+ self.controller.DEBUG_STOP_HEARTBEATING = False
+ response = {"responseId":"2", "terminateAgent": True}
+ loadsMock.return_value = response
+ stopApp = MagicMock(name="stopApp")
+ self.controller.stopApp = stopApp
+ self.controller.heartbeatWithServer()
+ stopApp.assert_called_once_with()
+
+ # reset for next test
+ self.controller.terminateAgent = False
+
+ # terminateAgent command - test 2
+ self.controller.responseId = 1
+ self.controller.DEBUG_STOP_HEARTBEATING = False
+ response = {"responseId":"2", "terminateAgent": True}
+ loadsMock.return_value = response
+ self.controller.stopApp = original_stopApp
+ stopCommand = {"roleCommand": "STOP"}
+ self.controller.stopCommand = stopCommand
+ addToQueue = MagicMock(name="addToQueue")
+ self.controller.addToQueue = addToQueue
+ self.controller.componentActualState = State.STARTED
+ self.controller.heartbeatWithServer()
+ self.assertTrue(self.controller.terminateAgent)
+ self.assertTrue(self.controller.appGracefulStopQueued)
+ addToQueue.assert_has_calls([call([stopCommand])])
+
+ # reset for next test
+ self.controller.terminateAgent = False
+ self.controller.appGracefulStopQueued = False
+
+ # terminateAgent command - test 3
self.controller.responseId = 2
self.controller.DEBUG_STOP_HEARTBEATING = False
response = {"responseId":"3", "terminateAgent": True}
diff --git a/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java b/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java
index f481c6a..f92a58d 100644
--- a/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java
+++ b/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java
@@ -167,4 +167,16 @@
*/
String YARN_LOG_INCLUDE_PATTERNS = "yarn.log.include.patterns";
String YARN_LOG_EXCLUDE_PATTERNS = "yarn.log.exclude.patterns";
+
+ /**
+ * Window of time where application master's failure count
+ * can be reset to 0.
+ */
+ String YARN_RESOURCEMANAGER_AM_RETRY_COUNT_WINDOW_MS =
+ "yarn.resourcemanager.am.retry-count-window-ms";
+
+ /**
+ * The default window for Slider.
+ */
+ long DEFAULT_AM_RETRY_COUNT_WINDOW_MS = 300000;
}
diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
index ed7d4c7..da94dd4 100644
--- a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
+++ b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
@@ -2721,11 +2721,15 @@
@Override
@VisibleForTesting
public int actionFlex(String name, ActionFlexArgs args) throws YarnException, IOException {
- verifyBindingsDefined();
validateClusterName(name);
+ Map<String, String> roleMap = args.getComponentMap();
+ // throw usage exception if no changes proposed
+ if (roleMap.size() == 0) {
+ actionHelp(ACTION_FLEX);
+ }
+ verifyBindingsDefined();
log.debug("actionFlex({})", name);
Map<String, Integer> roleInstances = new HashMap<>();
- Map<String, String> roleMap = args.getComponentMap();
for (Map.Entry<String, String> roleEntry : roleMap.entrySet()) {
String key = roleEntry.getKey();
String val = roleEntry.getValue();
diff --git a/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java b/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
index 22798e3..1d2d5f8 100644
--- a/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
+++ b/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
@@ -76,6 +76,12 @@
String APP_VERSION_UNKNOWN = "awaiting heartbeat...";
/**
+ * Keys for application container specific properties, like release timeout
+ */
+ String APP_CONTAINER_RELEASE_TIMEOUT = "site.global.app_container.release_timeout_secs";
+ int APP_CONTAINER_HEARTBEAT_INTERVAL_SEC = 10; // look for HEARTBEAT_IDDLE_INTERVAL_SEC
+
+ /**
* JVM arg to force IPv4 {@value}
*/
String JVM_ENABLE_ASSERTIONS = "-ea";
diff --git a/slider-core/src/main/java/org/apache/slider/core/conf/MapOperations.java b/slider-core/src/main/java/org/apache/slider/core/conf/MapOperations.java
index 5f7b5f0..e58178c 100644
--- a/slider-core/src/main/java/org/apache/slider/core/conf/MapOperations.java
+++ b/slider-core/src/main/java/org/apache/slider/core/conf/MapOperations.java
@@ -138,6 +138,21 @@
String val = getOption(option, Integer.toString(defVal));
return Integer.decode(val);
}
+
+ /**
+ * Get a long option; use {@link Long#decode(String)} so as to take hex
+ * oct and bin values too.
+ *
+ * @param option option name
+ * @param defVal default value
+ * @return parsed value
+ * @throws NumberFormatException
+ */
+ public long getOptionLong(String option, long defVal) {
+ String val = getOption(option, Long.toString(defVal));
+ return Long.decode(val);
+ }
+
/**
* Get a mandatory integer option; use {@link Integer#decode(String)} so as to take hex
* oct and bin values too.
diff --git a/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java b/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
index 93aff08..22bf328 100644
--- a/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
+++ b/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
@@ -279,6 +280,30 @@
}
}
+ /**
+ * Extract the value for option
+ * yarn.resourcemanager.am.retry-count-window-ms
+ * and set it on the ApplicationSubmissionContext. Use the default value
+ * if option is not set.
+ *
+ * @param submissionContext
+ * @param map
+ */
+ public void extractAmRetryCount(ApplicationSubmissionContext submissionContext,
+ Map<String, String> map) {
+
+ if (map != null) {
+ MapOperations options = new MapOperations("", map);
+ long amRetryCountWindow = options.getOptionLong(ResourceKeys
+ .YARN_RESOURCEMANAGER_AM_RETRY_COUNT_WINDOW_MS,
+ ResourceKeys.DEFAULT_AM_RETRY_COUNT_WINDOW_MS);
+ log.info("Setting {} to {}",
+ ResourceKeys.YARN_RESOURCEMANAGER_AM_RETRY_COUNT_WINDOW_MS,
+ amRetryCountWindow);
+ submissionContext.setAttemptFailuresValidityInterval(amRetryCountWindow);
+ }
+ }
+
public void extractLogAggregationContext(Map<String, String> map) {
if (map != null) {
String logPatternSepStr = "\\|";
@@ -423,7 +448,7 @@
}
/**
- * Suubmit an entire directory
+ * Submit an entire directory
* @param srcDir src path in filesystem
* @param destRelativeDir relative path under destination local dir
* @throws IOException IO problems
diff --git a/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java b/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java
index 06dbfea..c82affa 100644
--- a/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java
+++ b/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java
@@ -106,6 +106,8 @@
submissionContext.setApplicationTags(applicationTags);
}
submissionContext.setNodeLabelExpression(extractLabelExpression(options));
+
+ extractAmRetryCount(submissionContext, resourceGlobalOptions);
extractResourceRequirements(resource, options);
extractLogAggregationContext(resourceGlobalOptions);
}
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
index e3dc791..7e3e87b 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
@@ -173,6 +173,7 @@
private String clusterName = null;
private boolean isInUpgradeMode;
private Set<String> upgradeContainers = new HashSet<String>();
+ private boolean appStopInitiated;
private final Map<String, ComponentInstanceState> componentStatuses =
new ConcurrentHashMap<String, ComponentInstanceState>();
@@ -879,6 +880,12 @@
componentStatus.getState(), componentStatus.getTargetState());
}
+ if (appStopInitiated && !componentStatus.isStopInitiated()) {
+ log.info("Stop initiated for label {}", label);
+ componentStatus.setTargetState(State.STOPPED);
+ componentStatus.setStopInitiated(true);
+ }
+
publishConfigAndExportGroups(heartBeat, componentStatus, roleName);
CommandResult result = null;
List<CommandReport> reports = heartBeat.getReports();
@@ -1001,6 +1008,8 @@
timeout);
componentStatus.commandIssued(command, true);
} else if (command == Command.STOP) {
+ log.info("Stop command being sent to container with id {}",
+ containerId);
addStopCommand(roleName, containerId, response, scriptPath, timeout,
doUpgrade);
componentStatus.commandIssued(command);
@@ -1276,6 +1285,10 @@
this.upgradeContainers.addAll(upgradeContainers);
}
+ public void setAppStopInitiated(boolean appStopInitiated) {
+ this.appStopInitiated = appStopInitiated;
+ }
+
/**
* Read all default configs
*
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java b/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java
index c4a694e..55fdba6 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java
@@ -51,6 +51,8 @@
private Map<String, State> pkgStatuses;
private String nextPkgToInstall;
+ private boolean stopInitiated;
+
public ComponentInstanceState(String componentName,
ContainerId containerId,
String applicationId) {
@@ -234,7 +236,7 @@
return Command.INSTALL_ADDON;
}
}
- return this.state.getSupportedCommand(isInUpgradeMode);
+ return this.state.getSupportedCommand(isInUpgradeMode, stopInitiated);
}
public State getState() {
@@ -254,6 +256,18 @@
this.targetState = targetState;
}
+ public String getNextPkgToInstall() {
+ return nextPkgToInstall;
+ }
+
+ public boolean isStopInitiated() {
+ return stopInitiated;
+ }
+
+ public void setStopInitiated(boolean stopInitiated) {
+ this.stopInitiated = stopInitiated;
+ }
+
@Override
public int hashCode() {
int hashCode = 1;
@@ -303,8 +317,4 @@
sb.append('}');
return sb.toString();
}
-
- public String getNextPkgToInstall() {
- return nextPkgToInstall;
- }
}
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/State.java b/slider-core/src/main/java/org/apache/slider/providers/agent/State.java
index 11105fb..5603f8d 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/State.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/State.java
@@ -61,6 +61,11 @@
}
public Command getSupportedCommand(boolean isInUpgradeMode) {
+ return getSupportedCommand(isInUpgradeMode, false);
+ }
+
+ public Command getSupportedCommand(boolean isInUpgradeMode,
+ boolean stopInitiated) {
switch (this) {
case INIT:
case INSTALL_FAILED:
@@ -68,7 +73,8 @@
case INSTALLED:
return Command.START;
case STARTED:
- return isInUpgradeMode ? Command.UPGRADE : Command.NOP;
+ return isInUpgradeMode ? Command.UPGRADE : (stopInitiated) ? Command.STOP
+ : Command.NOP;
case UPGRADED:
return Command.STOP;
case STOPPED:
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index cc2dc6d..18d5bfa 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -1825,9 +1825,42 @@
* Shutdown operation: release all containers
*/
private void releaseAllContainers() {
- List<AbstractRMOperation> operations = appState.releaseAllContainers();
- //now apply the operations
- execute(operations);
+ if (providerService instanceof AgentProviderService) {
+ log.info("Setting stopInitiated flag to true");
+ AgentProviderService agentProviderService = (AgentProviderService) providerService;
+ agentProviderService.setAppStopInitiated(true);
+ }
+ // Add the sleep here (before releasing containers) so that applications get
+ // time to perform graceful shutdown
+ try {
+ long timeout = getContainerReleaseTimeout();
+ if (timeout > 0) {
+ Thread.sleep(timeout);
+ }
+ } catch (InterruptedException e) {
+ log.info("Sleep for container release interrupted");
+ } finally {
+ List<AbstractRMOperation> operations = appState.releaseAllContainers();
+ providerRMOperationHandler.execute(operations);
+ // now apply the operations
+ execute(operations);
+ }
+ }
+
+ private long getContainerReleaseTimeout() {
+ // Get container release timeout in millis or 0 if the property is not set.
+ // If non-zero then add the agent heartbeat delay time, since it can take up
+ // to that much time for agents to receive the stop command.
+ int timeout = getInstanceDefinition().getAppConfOperations()
+ .getGlobalOptions()
+ .getOptionInt(SliderKeys.APP_CONTAINER_RELEASE_TIMEOUT, 0);
+ if (timeout > 0) {
+ timeout += SliderKeys.APP_CONTAINER_HEARTBEAT_INTERVAL_SEC;
+ }
+ // convert to millis
+ long timeoutInMillis = timeout * 1000l;
+ log.info("Container release timeout in millis = {}", timeoutInMillis);
+ return timeoutInMillis;
}
/**
@@ -1856,7 +1889,7 @@
if (!outcome.operations.isEmpty()) {
execute(outcome.operations);
}
- // rigger a review if the cluster changed
+ // trigger a review if the cluster changed
if (outcome.clusterChanged) {
reviewRequestAndReleaseNodes("nodes updated");
}
diff --git a/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy b/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy
index bdcf615..6947156 100644
--- a/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy
@@ -20,10 +20,13 @@
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
+
import org.apache.hadoop.yarn.api.records.ApplicationReport
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+import org.apache.hadoop.yarn.api.records.YarnApplicationState
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.slider.agent.AgentMiniClusterTestBase
+import org.apache.slider.api.ResourceKeys
import org.apache.slider.client.SliderClient
import org.apache.slider.common.SliderXmlConfKeys
import org.apache.slider.common.params.ActionAMSuicideArgs
@@ -94,6 +97,125 @@
assert 0 == clusterActionFreeze(sliderClient, clustername, "force", true)
}
+
+ @Test
+ public void testStandaloneAMRestartWithRetryWindow() throws Throwable {
+ describe "kill a Standalone AM and verify that the AM failure count " +
+ "is reset after the AM retry-count-window elapses"
+ // patch the configuration for AM restart
+ YarnConfiguration conf = getRestartableConfiguration(5)
+
+ int restartLimit = 3;
+ int amRetryWindow = 60000;
+ String amRetryWindowStr = amRetryWindow.toString()
+ String clustername = createMiniCluster("", conf, 1, true)
+ ServiceLauncher<SliderClient> launcher =
+ createStandaloneAMWithArgs(clustername,
+ [
+ Arguments.ARG_DEFINE,
+ SliderXmlConfKeys.KEY_AM_RESTART_LIMIT + "=" + restartLimit,
+ Arguments.ARG_RESOURCE_OPT,
+ ResourceKeys.YARN_RESOURCEMANAGER_AM_RETRY_COUNT_WINDOW_MS,
+ amRetryWindowStr
+ ],
+ true,
+ false)
+ SliderClient sliderClient = launcher.service
+ addToTeardown(sliderClient);
+
+ ApplicationReport report = waitForClusterLive(sliderClient)
+ logReport(report)
+ waitUntilClusterLive(sliderClient, 30000)
+
+ def diagnosticArgs = new ActionDiagnosticArgs()
+ diagnosticArgs.client = true
+ diagnosticArgs.yarn = true
+ sliderClient.actionDiagnostic(diagnosticArgs)
+
+ describe "kill AM #1"
+ int iteration = 1;
+ killAMAndWaitForRestart(sliderClient, iteration, clustername)
+
+ describe "kill AM #2"
+ killAMAndWaitForRestart(sliderClient, iteration++, clustername)
+
+ // app should be running here
+ assert 0 == sliderClient.actionExists(clustername, true)
+
+ // make sure the am reset window has elapsed
+ describe "sleeping to ensure reset window elapsed"
+ sleep (amRetryWindow)
+
+ // kill again & expect the app to still be running
+ describe "kill AM #3 after window elapsed"
+ killAMAndWaitForRestart(sliderClient, iteration++, clustername)
+ assert 0 == sliderClient.actionExists(clustername, true)
+
+ report = sliderClient.applicationReport
+ assert report.getYarnApplicationState() == YarnApplicationState.RUNNING
+
+ logReport(report)
+ describe("stopping the cluster")
+ assert 0 == clusterActionFreeze(sliderClient, clustername, "force", true)
+
+ report = sliderClient.applicationReport
+ assert report.finalApplicationStatus == FinalApplicationStatus.KILLED
+ }
+
+
+ @Test
+ public void testStandaloneAMRestartWithDefaultRetryWindow() throws Throwable {
+ describe "kill AM more than the max limit allowed within the AM " +
+ "retry-count-window and expect the app to fail"
+ // patch the configuration for AM restart
+ YarnConfiguration conf = getRestartableConfiguration(5)
+
+ int restartLimit = 3;
+ String clustername = createMiniCluster("", conf, 1, true)
+ ServiceLauncher<SliderClient> launcher =
+ createStandaloneAMWithArgs(clustername,
+ [
+ Arguments.ARG_DEFINE,
+ SliderXmlConfKeys.KEY_AM_RESTART_LIMIT + "=" + restartLimit,
+ ],
+ true,
+ false)
+ SliderClient sliderClient = launcher.service
+ addToTeardown(sliderClient);
+
+ ApplicationReport report = waitForClusterLive(sliderClient)
+ logReport(report)
+ waitUntilClusterLive(sliderClient, 30000)
+
+ def diagnosticArgs = new ActionDiagnosticArgs()
+ diagnosticArgs.client = true
+ diagnosticArgs.yarn = true
+ sliderClient.actionDiagnostic(diagnosticArgs)
+
+ describe "kill AM #1"
+ int iteration = 1;
+ killAMAndWaitForRestart(sliderClient, iteration, clustername)
+
+ describe "kill AM #2"
+ killAMAndWaitForRestart(sliderClient, iteration++, clustername)
+
+ // app should be running here
+ assert 0 == sliderClient.actionExists(clustername, true)
+
+ // kill again & expect the app to fail
+ describe "kill AM #3"
+ killAmAndWaitForDeath(sliderClient, iteration++, clustername)
+ sleep(40000)
+
+ report = sliderClient.applicationReport
+ assert report.finalApplicationStatus == FinalApplicationStatus.FAILED
+
+ logReport(report)
+ describe("stopping the cluster")
+ assert 0 == clusterActionFreeze(sliderClient, clustername, "force", true)
+ }
+
+
/**
* Kill an AM. take an iteration count for the message sent to the
* AM (hence its logs)
diff --git a/slider-core/src/test/groovy/org/apache/slider/client/TestClientBadArgs.groovy b/slider-core/src/test/groovy/org/apache/slider/client/TestClientBadArgs.groovy
index 9b6c7dc..17b176c 100644
--- a/slider-core/src/test/groovy/org/apache/slider/client/TestClientBadArgs.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/client/TestClientBadArgs.groovy
@@ -243,9 +243,20 @@
@Test
public void testNodesMissingFile() throws Throwable {
def exception = launchExpectingException(SliderClient,
- createTestConfig(),
- "after parameter --out",
- [SliderActions.ACTION_NODES, Arguments.ARG_OUTPUT])
+ createTestConfig(),
+ "after parameter --out",
+ [SliderActions.ACTION_NODES, Arguments.ARG_OUTPUT])
assert exception instanceof BadCommandArgumentsException
}
+
+ @Test
+ public void testFlexWithNoCompoents() throws Throwable {
+ def exception = launchExpectingException(SliderClient,
+ new Configuration(),
+ "Usage: slider flex <application>",
+ [SliderActions.ACTION_FLEX,
+ "flex1"])
+ assert exception instanceof UsageException
+ log.info(exception.toString())
+ }
}
diff --git a/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java b/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java
new file mode 100644
index 0000000..cc64cab
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.launch;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.slider.api.ResourceKeys;
+import org.apache.slider.client.SliderYarnClientImpl;
+import org.apache.slider.common.SliderKeys;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAppMasterLauncherWithAmReset {
+ SliderYarnClientImpl mockYarnClient;
+ YarnClientApplication yarnClientApp;
+ ApplicationSubmissionContext appSubmissionContext;
+ GetNewApplicationResponse newApp;
+ Set<String> tags = Collections.emptySet();
+ AppMasterLauncher appMasterLauncher = null;
+ boolean isOldApi = true;
+
+ @Before
+ public void initialize() throws Exception {
+ mockYarnClient = EasyMock.createNiceMock(SliderYarnClientImpl.class);
+ yarnClientApp = EasyMock.createNiceMock(YarnClientApplication.class);
+ newApp = EasyMock.createNiceMock(GetNewApplicationResponse.class);
+ EasyMock.expect(mockYarnClient.createApplication())
+ .andReturn(new YarnClientApplication(newApp,
+ Records.newRecord(ApplicationSubmissionContext.class)));
+ }
+
+ @Test
+ public void testExtractYarnResourceManagerAmRetryCountWindowMs() throws
+ Exception {
+ Map<String, String> options = new HashMap<String, String>();
+ final String expectedInterval = Integer.toString (120000);
+ options.put(ResourceKeys.YARN_RESOURCEMANAGER_AM_RETRY_COUNT_WINDOW_MS,
+ expectedInterval);
+ EasyMock.replay(mockYarnClient, yarnClientApp);
+
+ appMasterLauncher = new AppMasterLauncher("am1", SliderKeys.APP_TYPE, null,
+ null, mockYarnClient, false, null, options, tags);
+
+ ApplicationSubmissionContext ctx = appMasterLauncher.application
+ .getApplicationSubmissionContext();
+ String retryIntervalWindow = Long.toString(ctx
+ .getAttemptFailuresValidityInterval());
+ Assert.assertEquals(expectedInterval, retryIntervalWindow);
+ }
+
+ @Test
+ public void testExtractYarnResourceManagerAmRetryCountWindowMsDefaultValue()
+ throws Exception {
+ Map<String, String> options = new HashMap<String, String>();
+ EasyMock.replay(mockYarnClient, yarnClientApp);
+
+ appMasterLauncher = new AppMasterLauncher("am1", SliderKeys.APP_TYPE, null,
+ null, mockYarnClient, false, null, options, tags);
+
+ ApplicationSubmissionContext ctx = appMasterLauncher.application
+ .getApplicationSubmissionContext();
+ long retryIntervalWindow = ctx.getAttemptFailuresValidityInterval();
+ Assert.assertEquals(ResourceKeys.DEFAULT_AM_RETRY_COUNT_WINDOW_MS,
+ retryIntervalWindow);
+ }
+
+}
diff --git a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
index 9e1c135..0f31d73 100644
--- a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
+++ b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
@@ -1718,6 +1718,93 @@
}
@Test
+ public void testAddStopCommand() throws Exception {
+ AgentProviderService aps = createAgentProviderService(new Configuration());
+ HeartBeatResponse hbr = new HeartBeatResponse();
+
+ StateAccessForProviders access = createNiceMock(StateAccessForProviders.class);
+ AgentProviderService mockAps = Mockito.spy(aps);
+ doReturn(access).when(mockAps).getAmState();
+
+ AggregateConf aggConf = new AggregateConf();
+ ConfTreeOperations treeOps = aggConf.getAppConfOperations();
+ treeOps.getGlobalOptions().put(AgentKeys.JAVA_HOME, "java_home");
+ treeOps.set(OptionKeys.APPLICATION_NAME, "HBASE");
+ treeOps.set("site.fs.defaultFS", "hdfs://HOST1:8020/");
+ treeOps.set("internal.data.dir.path", "hdfs://HOST1:8020/database");
+ treeOps.set(OptionKeys.ZOOKEEPER_HOSTS, "HOST1");
+ treeOps.getGlobalOptions().put("site.hbase-site.a.port", "${HBASE_MASTER.ALLOCATED_PORT}");
+ treeOps.getGlobalOptions().put("site.hbase-site.b.port", "${HBASE_MASTER.ALLOCATED_PORT}");
+ treeOps.getGlobalOptions().put("site.hbase-site.random.port", "${HBASE_MASTER.ALLOCATED_PORT}{PER_CONTAINER}");
+ treeOps.getGlobalOptions().put("site.hbase-site.random2.port", "${HBASE_MASTER.ALLOCATED_PORT}");
+
+ Map<String, DefaultConfig> defaultConfigMap = new HashMap<String, DefaultConfig>();
+ DefaultConfig defaultConfig = new DefaultConfig();
+ PropertyInfo propertyInfo1 = new PropertyInfo();
+ propertyInfo1.setName("defaultA");
+ propertyInfo1.setValue("Avalue");
+ defaultConfig.addPropertyInfo(propertyInfo1);
+ propertyInfo1 = new PropertyInfo();
+ propertyInfo1.setName("defaultB");
+ propertyInfo1.setValue("");
+ defaultConfig.addPropertyInfo(propertyInfo1);
+ defaultConfigMap.put("hbase-site", defaultConfig);
+
+ expect(access.getAppConfSnapshot()).andReturn(treeOps).anyTimes();
+ expect(access.getInternalsSnapshot()).andReturn(treeOps).anyTimes();
+ expect(access.isApplicationLive()).andReturn(true).anyTimes();
+
+ doReturn("HOST1").when(mockAps).getClusterInfoPropertyValue(anyString());
+ doReturn(defaultConfigMap).when(mockAps).getDefaultConfigs();
+ List<String> configurations = new ArrayList<String>();
+ configurations.add("hbase-site");
+ configurations.add("global");
+ List<String> sysConfigurations = new ArrayList<String>();
+ configurations.add("core-site");
+ doReturn(configurations).when(mockAps).getApplicationConfigurationTypes();
+ doReturn(sysConfigurations).when(mockAps).getSystemConfigurationsRequested(any(ConfTreeOperations.class));
+
+ Map<String, Map<String, ClusterNode>> roleClusterNodeMap = new HashMap<String, Map<String, ClusterNode>>();
+ Map<String, ClusterNode> container = new HashMap<String, ClusterNode>();
+ ClusterNode cn1 = new ClusterNode(new MockContainerId(1));
+ cn1.host = "HOST1";
+ container.put("cid1", cn1);
+ roleClusterNodeMap.put("HBASE_MASTER", container);
+ doReturn(roleClusterNodeMap).when(mockAps).getRoleClusterNodeMapping();
+ Map<String, String> allocatedPorts = new HashMap<String, String>();
+ allocatedPorts.put("hbase-site.a.port", "10023");
+ allocatedPorts.put("hbase-site.b.port", "10024");
+ doReturn(allocatedPorts).when(mockAps).getAllocatedPorts();
+ Map<String, String> allocatedPorts2 = new HashMap<String, String>();
+ allocatedPorts2.put("hbase-site.random.port", "10025");
+ doReturn(allocatedPorts2).when(mockAps).getAllocatedPorts(anyString());
+
+ replay(access);
+
+ mockAps.addStopCommand("HBASE_MASTER", "cid1", hbr, "/tmp/stop_cmd.sh", 10, false);
+
+ Assert.assertTrue(hbr.getExecutionCommands().get(0).getConfigurations().containsKey("hbase-site"));
+ Assert.assertTrue(hbr.getExecutionCommands().get(0).getConfigurations().containsKey("core-site"));
+ Map<String, String> hbaseSiteConf = hbr.getExecutionCommands().get(0).getConfigurations().get("hbase-site");
+ Assert.assertTrue(hbaseSiteConf.containsKey("a.port"));
+ Assert.assertEquals("10023", hbaseSiteConf.get("a.port"));
+ Assert.assertEquals("10024", hbaseSiteConf.get("b.port"));
+ Assert.assertEquals("10025", hbaseSiteConf.get("random.port"));
+ assertEquals("${HBASE_MASTER.ALLOCATED_PORT}",
+ hbaseSiteConf.get("random2.port"));
+ ExecutionCommand cmd = hbr.getExecutionCommands().get(0);
+ Assert.assertTrue(cmd.getConfigurations().get("global").containsKey("app_log_dir"));
+ Assert.assertTrue(cmd.getConfigurations().get("global").containsKey("app_pid_dir"));
+ Assert.assertTrue(cmd.getConfigurations().get("global").containsKey("app_install_dir"));
+ Assert.assertTrue(cmd.getConfigurations().get("global").containsKey("app_input_conf_dir"));
+ Assert.assertTrue(cmd.getConfigurations().get("global").containsKey("app_container_id"));
+ Assert.assertTrue(cmd.getConfigurations().get("global").containsKey("pid_file"));
+ Assert.assertTrue(cmd.getConfigurations().get("global").containsKey("app_root"));
+ Assert.assertTrue(cmd.getConfigurations().get("hbase-site").containsKey("defaultA"));
+ Assert.assertFalse(cmd.getConfigurations().get("hbase-site").containsKey("defaultB"));
+ }
+
+ @Test
public void testParameterParsing() throws IOException {
AgentProviderService aps = createAgentProviderService(new Configuration());
AggregateConf aggConf = new AggregateConf();
diff --git a/slider-core/src/test/java/org/apache/slider/providers/agent/TestState.java b/slider-core/src/test/java/org/apache/slider/providers/agent/TestState.java
new file mode 100644
index 0000000..6a2e5ab
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/providers/agent/TestState.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.slider.providers.agent;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestState {
+ protected static final Logger log = LoggerFactory.getLogger(TestState.class);
+
+ @Test
+ public void testState() throws Exception {
+ State state = State.STARTED;
+ Assert.assertEquals(Command.STOP, state.getSupportedCommand(false, true));
+ }
+}