SLIDER-1153 Code issues - 14 null pointer deferences found
diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
index 7ef3168..60c670b 100644
--- a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
+++ b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
@@ -3679,23 +3679,24 @@
if (imagePath == null) {
ApplicationReport appReport = findInstance(clusterName);
Path path1 = sliderFileSystem.getTempPathForCluster(clusterName);
-
- Path subPath = new Path(path1, appReport.getApplicationId().toString()
- + "/agent");
- imagePath = subPath.toString();
+ if (appReport != null) {
+ Path subPath = new Path(path1, appReport.getApplicationId()
+ .toString() + "/agent");
+ imagePath = subPath.toString();
+ String pathStr = imagePath + "/" + AGENT_TAR;
+ try {
+ validateHDFSFile(sliderFileSystem, pathStr);
+ log.info("Slider agent package is properly installed at " + pathStr);
+ } catch (FileNotFoundException e) {
+ log.error("can not find agent package: {}", pathStr, e);
+ return;
+ } catch (IOException e) {
+ log.error("can not open agent package: {}", pathStr, e);
+ return;
+ }
+ }
}
- String pathStr = imagePath + "/" + AGENT_TAR;
- try {
- validateHDFSFile(sliderFileSystem, pathStr);
- log.info("Slider agent package is properly installed");
- } catch (FileNotFoundException e) {
- log.error("can not find agent package: {}", pathStr);
- log.debug("can not find agent package: {}", pathStr, e);
- return;
- } catch (IOException e) {
- log.error("can not open agent package: {}", pathStr, e);
- return;
- }
+
String pkgTarballPath = getApplicationDefinitionPath(instanceDefinition
.getAppConfOperations());
try {
diff --git a/slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java b/slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java
index 746e468..ef62745 100644
--- a/slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java
+++ b/slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java
@@ -1032,6 +1032,9 @@
* something other than 0.0.0.0
*/
public static boolean isAddressDefined(InetSocketAddress address) {
+ if (address == null || address.getHostString() == null) {
+ return false;
+ }
return !(address.getHostString().equals("0.0.0.0"));
}
@@ -1433,8 +1436,11 @@
if (dir == null) {
return "";
}
- StringBuilder builder = new StringBuilder();
String[] confDirEntries = dir.list();
+ if (confDirEntries == null) {
+ return "";
+ }
+ StringBuilder builder = new StringBuilder();
for (String entry : confDirEntries) {
builder.append(entry).append("\n");
}
diff --git a/slider-core/src/main/java/org/apache/slider/core/launch/SerializedApplicationReport.java b/slider-core/src/main/java/org/apache/slider/core/launch/SerializedApplicationReport.java
index 377c87a..8e0ef5a 100644
--- a/slider-core/src/main/java/org/apache/slider/core/launch/SerializedApplicationReport.java
+++ b/slider-core/src/main/java/org/apache/slider/core/launch/SerializedApplicationReport.java
@@ -18,6 +18,7 @@
package org.apache.slider.core.launch;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.slider.core.persist.ApplicationReportSerDeser;
@@ -67,7 +68,8 @@
public SerializedApplicationReport(ApplicationReport report) {
this.applicationId = report.getApplicationId().toString();
- this.applicationAttemptId = report.getCurrentApplicationAttemptId().toString();
+ ApplicationAttemptId attemptId = report.getCurrentApplicationAttemptId();
+ this.applicationAttemptId = attemptId != null ? attemptId.toString() : "N/A";
this.name = report.getName();
this.applicationType = report.getApplicationType();
this.user = report.getUser();
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java
index 1164df9..f3143ea 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java
@@ -61,9 +61,6 @@
TimeUnit timeUnit,
int limit) {
super("renewing " + action.name, initialDelay, timeUnit, action.getAttrs());
- // slightly superfluous as the super init above checks these values...retained
- // in case that code is ever changed
- Preconditions.checkArgument(action != null, "null actions");
Preconditions.checkArgument(interval > 0, "invalid interval: " + interval);
this.action = action;
this.interval = interval;
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
index 3213d93..7ff2b4c 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
@@ -1287,7 +1287,12 @@
return createAAContainerRequest(role);
} else {
incrementRequestCount(role);
- return roleHistory.requestContainerForRole(role).getIssuedRequest();
+ OutstandingRequest request = roleHistory.requestContainerForRole(role);
+ if (request != null) {
+ return request.getIssuedRequest();
+ } else {
+ return null;
+ }
}
}
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/security/FsDelegationTokenManager.java b/slider-core/src/main/java/org/apache/slider/server/services/security/FsDelegationTokenManager.java
deleted file mode 100644
index 617fe3c..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/security/FsDelegationTokenManager.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.slider.server.services.security;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Time;
-import org.apache.slider.common.SliderXmlConfKeys;
-import org.apache.slider.common.tools.SliderUtils;
-import org.apache.slider.core.launch.CredentialUtils;
-import org.apache.slider.server.appmaster.SliderAppMaster;
-import org.apache.slider.server.appmaster.actions.AsyncAction;
-import org.apache.slider.server.appmaster.actions.QueueAccess;
-import org.apache.slider.server.appmaster.actions.RenewingAction;
-import org.apache.slider.server.appmaster.state.AppState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.security.PrivilegedExceptionAction;
-import java.text.DateFormat;
-import java.util.Date;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-/**
- *
- */
-public class FsDelegationTokenManager {
- private final QueueAccess queue;
- private RenewingAction<RenewAction> renewingAction;
- private UserGroupInformation remoteUser;
- private UserGroupInformation currentUser;
- private static final Logger
- log = LoggerFactory.getLogger(FsDelegationTokenManager.class);
- private long renewInterval;
- private RenewAction renewAction;
- private String tokenName;
-
- public FsDelegationTokenManager(QueueAccess queue) throws IOException {
- this.queue = queue;
- this.currentUser = UserGroupInformation.getCurrentUser();
- }
-
- private void createRemoteUser(Configuration configuration) throws IOException {
- Configuration loginConfig = new Configuration(configuration);
- loginConfig.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
- "kerberos");
- // using HDFS principal...
- this.remoteUser = UserGroupInformation
- .loginUserFromKeytabAndReturnUGI(
- SecurityUtil.getServerPrincipal(
- loginConfig.get(SliderXmlConfKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY),
- InetAddress.getLocalHost().getCanonicalHostName()),
- loginConfig.get(SliderXmlConfKeys.DFS_NAMENODE_KEYTAB_FILE_KEY));
- log.info("Created remote user {}. UGI reports current user is {}",
- this.remoteUser, UserGroupInformation.getCurrentUser());
- }
-
- public void acquireDelegationToken(Configuration configuration)
- throws IOException, InterruptedException {
- if (remoteUser == null) {
- createRemoteUser(configuration);
- }
- if (SliderUtils.isHadoopClusterSecure(configuration) &&
- renewingAction == null) {
- renewInterval = configuration.getLong(
- SliderXmlConfKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
- SliderXmlConfKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
- // constructor of action will retrieve initial token. One may already be
- // associated with user, but its lifecycle/management is not clear so let's
- // create and manage a token explicitly
- renewAction = new RenewAction("HDFS renew",
- configuration);
- // set retrieved token as the user associated delegation token and
- // start a renewing action to renew
- Token<?> token = renewAction.getToken();
- currentUser.addToken(token.getService(), token);
- log.info("HDFS delegation token {} acquired and set as credential for current user", token);
- renewingAction = new RenewingAction<RenewAction>(renewAction,
- (int) renewInterval,
- (int) renewInterval,
- TimeUnit.MILLISECONDS,
- getRenewingLimit());
- log.info("queuing HDFS delegation token renewal interval of {} milliseconds",
- renewInterval);
- queue(renewingAction);
- }
- }
-
- public void cancelDelegationToken(Configuration configuration)
- throws IOException, InterruptedException {
- queue.removeRenewingAction(getRenewingActionName());
- if (renewAction != null) {
- renewAction.getToken().cancel(configuration);
- }
- log.info("Renewing action {} removed and delegation token renewal "
- + "cancelled", getRenewingActionName());
- }
-
- protected int getRenewingLimit() {
- return 0;
- }
-
- protected void queue(RenewingAction<RenewAction> action) {
- queue.renewing(getRenewingActionName(),
- action);
- }
-
- protected String getRenewingActionName() {
- if (tokenName == null) {
- tokenName = "HDFS renewing token " + UUID.randomUUID();
- }
- return tokenName;
- }
-
- class RenewAction extends AsyncAction {
- Configuration configuration;
- Token<?> token;
- private long tokenExpiryTime;
- private final FileSystem fs;
-
- RenewAction(String name,
- Configuration configuration)
- throws IOException, InterruptedException {
- super(name);
- this.configuration = configuration;
- fs = getFileSystem();
- // get initial token by creating a kerberos authenticated user and
- // invoking token methods as that user
- synchronized (fs) {
- this.token = remoteUser.doAs(new PrivilegedExceptionAction<Token<?>>() {
- @Override
- public Token<?> run() throws Exception {
- log.info("Obtaining HDFS delgation token with user {}",
- remoteUser.getShortUserName());
- Token token = fs.getDelegationToken(
- remoteUser.getShortUserName());
- tokenExpiryTime = CredentialUtils.getTokenExpiryTime(token);
- log.info("Initial delegation token obtained with expiry time of {}",
- getPrintableExpirationTime(tokenExpiryTime));
- return token;
- }
- });
- }
- log.info("Initial request returned delegation token {}", token);
- }
-
- protected FileSystem getFileSystem()
- throws IOException, InterruptedException {
- // return non-cache FS reference
- return remoteUser.doAs(new PrivilegedExceptionAction<FileSystem>() {
- @Override
- public FileSystem run() throws Exception {
- Configuration config = new Configuration(configuration);
- config.setBoolean("fs.hdfs.impl.disable.cache", true);
- return getRemoteFileSystemForRenewal(config);
- }
- });
- }
-
- @Override
- public void execute(SliderAppMaster appMaster, QueueAccess queueService,
- AppState appState)
- throws Exception {
- if (fs != null) {
- synchronized(fs) {
- try {
- long expires = remoteUser.doAs(new PrivilegedExceptionAction<Long>() {
- @Override
- public Long run() throws Exception {
- long expires = token.renew(fs.getConf());
- log.info("HDFS delegation token renewed. Renewal cycle ends at {}",
- getPrintableExpirationTime(expires));
- return expires;
- }
- });
- long calculatedInterval = tokenExpiryTime - Time.now();
- if ( calculatedInterval < renewInterval ) {
- // time to get a new token since the token will expire before
- // next renewal interval. Could modify this to be closer to expiry
- // time if deemed necessary....
- log.info("Interval of {} less than renew interval. Getting new token",
- calculatedInterval);
- getNewToken();
- } else {
- updateRenewalTime(renewInterval);
- }
- } catch (IOException ie) {
- // token has expired. get a new one...
- log.info("Exception raised by renew", ie);
- getNewToken();
- }
- }
- }
- }
-
- private String getPrintableExpirationTime(long expires) {
- Date d = new Date(expires);
- return DateFormat.getDateTimeInstance().format(d);
- }
-
- private void getNewToken()
- throws InterruptedException, IOException {
- try {
- Text service = token.getService();
- Token<?>[] tokens = remoteUser.doAs(new PrivilegedExceptionAction<Token<?>[]>() {
- @Override
- public Token<?>[] run() throws Exception {
- return fs.addDelegationTokens(remoteUser.getShortUserName(), null);
- }
- });
- if (tokens.length == 0) {
- throw new IOException("addDelegationTokens returned no tokens");
- }
- token = findMatchingToken(service, tokens);
- currentUser.addToken(token.getService(), token);
-
- tokenExpiryTime = CredentialUtils.getTokenExpiryTime(token);
-
- log.info("Expired HDFS delegation token replaced and added as credential"
- + " to current user. Token expires at {}",
- getPrintableExpirationTime(tokenExpiryTime));
- updateRenewalTime(renewInterval);
- } catch (IOException ie2) {
- throw new IOException("Can't get new delegation token ", ie2);
- }
- }
-
- private void updateRenewalTime(long interval) {
- long delay = interval - interval/10;
- renewingAction.updateInterval(delay, TimeUnit.MILLISECONDS);
- log.info("Token renewal set for {} ms from now", delay);
- }
-
- private Token<?> findMatchingToken(Text service, Token<?>[] tokens) {
- Token<?> token = null;
- int i = 0;
- while (token == null && i < tokens.length) {
- if (tokens[i].getService().equals(service)) {
- token = tokens[i];
- }
- i++;
- }
-
- return token;
- }
-
- Token<?> getToken() {
- synchronized (fs) {
- return token;
- }
- }
- }
-
- protected FileSystem getRemoteFileSystemForRenewal(Configuration config)
- throws IOException {
- return FileSystem.get(config);
- }
-}
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/services/security/TestFsDelegationTokenManager.groovy b/slider-core/src/test/groovy/org/apache/slider/server/services/security/TestFsDelegationTokenManager.groovy
deleted file mode 100644
index d82a79c..0000000
--- a/slider-core/src/test/groovy/org/apache/slider/server/services/security/TestFsDelegationTokenManager.groovy
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.security
-
-import groovy.util.logging.Slf4j
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic
-import org.apache.hadoop.fs.FileSystem as HadoopFS
-import org.apache.hadoop.fs.RawLocalFileSystem
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.security.Credentials
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.security.token.SecretManager
-import org.apache.hadoop.security.token.Token
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager
-import org.apache.hadoop.service.ServiceOperations
-import org.apache.hadoop.util.Time
-import org.apache.slider.common.SliderXmlConfKeys
-import org.apache.slider.common.tools.CoreFileSystem
-import org.apache.slider.server.appmaster.actions.ActionStopQueue
-import org.apache.slider.server.appmaster.actions.QueueExecutor
-import org.apache.slider.server.appmaster.actions.QueueService
-import org.junit.After
-import org.junit.Before
-import org.junit.Test
-
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.atomic.AtomicLong
-
-@Slf4j
-//@CompileStatic
-class TestFsDelegationTokenManager {
-
- QueueService queues;
- FsDelegationTokenManager tokenManager;
- Configuration conf;
- UserGroupInformation currentUser;
-
-
- @Before
- void setup() {
- queues = new QueueService();
-
- conf = new Configuration()
- conf.set(
- CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
- "TOKEN")
- conf.setLong(
- SliderXmlConfKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
- 1000)
- queues.init(conf)
- queues.start();
-
- HadoopFS fs = new TestFileSystem()
-
- CoreFileSystem coreFileSystem = new CoreFileSystem(fs, conf)
-
- String[] groups = new String[1];
- groups[0] = 'testGroup1'
-
- currentUser = UserGroupInformation.createUserForTesting("test", groups)
- UserGroupInformation.setLoginUser(currentUser)
-
- tokenManager = new FsDelegationTokenManager(queues) {
- @Override
- protected int getRenewingLimit() {
- return 5
- }
-
- @Override
- protected org.apache.hadoop.fs.FileSystem getRemoteFileSystemForRenewal(Configuration config) throws IOException {
- return new TestFileSystem();
- }
-
- @Override
- protected String getRenewingActionName() {
- return "TEST RENEW"
- }
- }
-
- }
-
- public static class DummySecretManager extends
- AbstractDelegationTokenSecretManager<DelegationTokenIdentifier> {
-
- public DummySecretManager(long delegationKeyUpdateInterval,
- long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
- long delegationTokenRemoverScanInterval) {
- super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
- delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
- }
-
- @Override
- public DelegationTokenIdentifier createIdentifier() {
- return null;
- }
-
- @Override
- public byte[] createPassword(DelegationTokenIdentifier dtId) {
- return new byte[1];
- }
- }
-
- public class TestFileSystem extends RawLocalFileSystem {
- int sequenceNum = 0;
- SecretManager<DelegationTokenIdentifier> mgr =
- new DummySecretManager(0, 0, 0, 0);
- @Override
- Token<DelegationTokenIdentifier> getDelegationToken(String renewer) throws IOException {
- return new TestToken(getIdentifier(), mgr);
- }
-
- @Override
- Token<?>[] addDelegationTokens(String renewer, Credentials credentials) throws IOException {
- Token[] tokens = new Token[1]
- tokens[0] = new TestToken(getIdentifier(), mgr)
- return tokens
- }
-
- private DelegationTokenIdentifier getIdentifier() {
- def user = new Text(currentUser.getUserName())
- def id = new DelegationTokenIdentifier(user, user, user)
- id.setSequenceNumber(sequenceNum++)
- id.setMaxDate(Time.now() + 10000)
-
- return id
- }
- }
-
- public class TestToken extends Token<DelegationTokenIdentifier> {
- static long maxCount = 0;
- private final AtomicLong renewCount = new AtomicLong()
- private final AtomicLong totalCount = new AtomicLong()
- public final AtomicBoolean expired = new AtomicBoolean(false);
- public final AtomicBoolean cancelled = new AtomicBoolean(false);
-
- TestToken(DelegationTokenIdentifier id, SecretManager<DelegationTokenIdentifier> mgr) {
- super(id, mgr)
- }
-
- @Override
- Text getService() {
- return new Text("HDFS")
- }
-
- @Override
- long renew(Configuration conf) throws IOException, InterruptedException {
- totalCount.getAndIncrement();
- if (maxCount > 0 && renewCount.getAndIncrement() > maxCount) {
- renewCount.set(0L)
- expired.set(true)
- throw new IOException("Expired")
- }
-
-
- return Time.now() + 1000;
- }
-
- @Override
- void cancel(Configuration conf) throws IOException, InterruptedException {
- cancelled.set(true)
- }
- }
-
- @After
- void destroyService() {
- ServiceOperations.stop(queues);
- }
-
- public void runQueuesToCompletion() {
- new Thread(queues).start();
- QueueExecutor ex = new QueueExecutor(queues)
- ex.run();
- }
-
- public void runQueuesButNotToCompletion() {
- new Thread(queues).start();
- QueueExecutor ex = new QueueExecutor(queues)
- new Thread(ex).start();
- Thread.sleep(1000)
- tokenManager.cancelDelegationToken(conf)
- }
-
- @Test
- public void testRenew() throws Throwable {
- tokenManager.acquireDelegationToken(conf)
- def stop = new ActionStopQueue(10, TimeUnit.SECONDS)
- queues.schedule(stop);
- runQueuesToCompletion()
-
- TestToken token = (TestToken) currentUser.getTokens()[0]
- assert token.totalCount.get() > 4
- }
-
- @Test
- public void testCancel() throws Throwable {
- tokenManager.acquireDelegationToken(conf)
- def stop = new ActionStopQueue(10, TimeUnit.SECONDS)
- queues.schedule(stop);
- runQueuesButNotToCompletion()
-
- TestToken token = (TestToken) currentUser.getTokens()[0]
- assert token.cancelled.get()
- assert queues.lookupRenewingAction("TEST RENEW") == null
- }
-
-
- @Test
- public void testRenewPastExpiry() throws Throwable {
- try {
- TestToken.maxCount = 3L
- tokenManager.acquireDelegationToken(conf)
- TestToken origToken = currentUser.getTokens()[0]
- def stop = new ActionStopQueue(10, TimeUnit.SECONDS)
- queues.schedule(stop);
- runQueuesToCompletion()
-
- TestToken token = (TestToken) currentUser.getTokens()[0]
- assert token != null
- assert token != origToken
- assert origToken.getService().equals(token.getService())
- assert origToken.totalCount.get() > 4
- assert origToken.expired.get()
- } finally {
- TestToken.maxCount = 0
- }
- }
-}