HDFS kerberos support refresh ticket
diff --git a/linkis-commons/linkis-hadoop-common/src/main/java/org/apache/linkis/hadoop/common/utils/KerberosUtils.java b/linkis-commons/linkis-hadoop-common/src/main/java/org/apache/linkis/hadoop/common/utils/KerberosUtils.java
index 6c5c125..67fecd0 100644
--- a/linkis-commons/linkis-hadoop-common/src/main/java/org/apache/linkis/hadoop/common/utils/KerberosUtils.java
+++ b/linkis-commons/linkis-hadoop-common/src/main/java/org/apache/linkis/hadoop/common/utils/KerberosUtils.java
@@ -17,12 +17,14 @@
package org.apache.linkis.hadoop.common.utils;
+import org.apache.linkis.common.utils.Utils;
import org.apache.linkis.hadoop.common.conf.HadoopConf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +35,10 @@
public class KerberosUtils {
private static final Logger LOG = LoggerFactory.getLogger(KerberosUtils.class);
+ private static boolean kerberosRefreshStarted = false;
+
+ private static final Object kerberosRefreshLock = new Object();
+
private KerberosUtils() {}
private static Configuration createKerberosSecurityConfiguration() {
@@ -81,20 +87,20 @@
public static Long getKerberosRefreshInterval() {
long refreshInterval;
- String refreshIntervalString = "86400000";
- // defined in linkis-env.sh, if not initialized then the default value is 86400000 ms (1d).
- if (System.getenv("LINKIS_JDBC_KERBEROS_REFRESH_INTERVAL") != null) {
- refreshIntervalString = System.getenv("LINKIS_JDBC_KERBEROS_REFRESH_INTERVAL");
+ String refreshIntervalString = "43200";
+ // defined in linkis-env.sh, if not initialized then the default value is 43200 s (0.5d).
+ if (System.getenv("LINKIS_KERBEROS_REFRESH_INTERVAL") != null) {
+ refreshIntervalString = System.getenv("LINKIS_KERBEROS_REFRESH_INTERVAL");
}
try {
refreshInterval = Long.parseLong(refreshIntervalString);
} catch (NumberFormatException e) {
LOG.error(
- "Cannot get time in MS for the given string, "
+ "Cannot get time in S for the given string, "
+ refreshIntervalString
- + " defaulting to 86400000 ",
+ + " defaulting to 43200 ",
e);
- refreshInterval = 86400000L;
+ refreshInterval = 43200;
}
return refreshInterval;
}
@@ -102,14 +108,13 @@
public static Integer kinitFailTimesThreshold() {
Integer kinitFailThreshold = 5;
// defined in linkis-env.sh, if not initialized then the default value is 5.
- if (System.getenv("LINKIS_JDBC_KERBEROS_KINIT_FAIL_THRESHOLD") != null) {
+ if (System.getenv("LINKIS_KERBEROS_KINIT_FAIL_THRESHOLD") != null) {
try {
- kinitFailThreshold =
- new Integer(System.getenv("LINKIS_JDBC_KERBEROS_KINIT_FAIL_THRESHOLD"));
+ kinitFailThreshold = new Integer(System.getenv("LINKIS_KERBEROS_KINIT_FAIL_THRESHOLD"));
} catch (Exception e) {
LOG.error(
"Cannot get integer value from the given string, "
- + System.getenv("LINKIS_JDBC_KERBEROS_KINIT_FAIL_THRESHOLD")
+ + System.getenv("LINKIS_KERBEROS_KINIT_FAIL_THRESHOLD")
+ " defaulting to "
+ kinitFailThreshold,
e);
@@ -117,4 +122,70 @@
}
return kinitFailThreshold;
}
+
+ public static void checkStatus() {
+ try {
+ LOG.info("isSecurityEnabled:" + UserGroupInformation.isSecurityEnabled());
+ LOG.info(
+ "userAuthenticationMethod:"
+ + UserGroupInformation.getLoginUser().getAuthenticationMethod());
+ UserGroupInformation loginUsr = UserGroupInformation.getLoginUser();
+ UserGroupInformation curUsr = UserGroupInformation.getCurrentUser();
+ LOG.info("LoginUser: " + loginUsr);
+ LOG.info("CurrentUser: " + curUsr);
+ if (curUsr == null) {
+ LOG.info("CurrentUser is null");
+ } else {
+ LOG.info("CurrentUser is not null");
+ }
+ if (loginUsr.getClass() != curUsr.getClass()) {
+ LOG.info("getClass() is different");
+ } else {
+ LOG.info("getClass() is same");
+ }
+ if (loginUsr.equals(curUsr)) {
+ LOG.info("subject is equal");
+ } else {
+ LOG.info("subject is not equal");
+ }
+ } catch (Exception e) {
+ LOG.error("UGI error: ", e.getMessage());
+ }
+ }
+
+ public static void startKerberosRefreshThread() {
+
+ if (kerberosRefreshStarted || !HadoopConf.KERBEROS_ENABLE()) {
+ LOG.warn(
+ "kerberos refresh thread had start or not kerberos {}", HadoopConf.HDFS_ENABLE_CACHE());
+ return;
+ }
+ synchronized (kerberosRefreshLock) {
+ if (kerberosRefreshStarted) {
+ LOG.warn("kerberos refresh thread had start");
+ return;
+ }
+ kerberosRefreshStarted = true;
+ LOG.info("kerberos Refresh tread started");
+ Utils.defaultScheduler()
+ .scheduleAtFixedRate(
+ () -> {
+ try {
+ checkStatus();
+ if (UserGroupInformation.isLoginKeytabBased()) {
+ LOG.info("Trying re-login from keytab");
+ UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+ } else if (UserGroupInformation.isLoginTicketBased()) {
+ LOG.info("Trying re-login from ticket cache");
+ UserGroupInformation.getLoginUser().reloginFromTicketCache();
+ }
+ } catch (Exception e) {
+ LOG.error("Unable to re-login", e);
+ }
+ },
+ getKerberosRefreshInterval(),
+ getKerberosRefreshInterval(),
+ TimeUnit.SECONDS);
+ }
+ }
}
diff --git a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
index 16fb45e..c550b3f 100644
--- a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
+++ b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
@@ -23,7 +23,7 @@
val HADOOP_ROOT_USER = CommonVars("wds.linkis.hadoop.root.user", "hadoop")
- val KERBEROS_ENABLE = CommonVars("wds.linkis.keytab.enable", false)
+ val KERBEROS_ENABLE = CommonVars("wds.linkis.keytab.enable", false).getValue
val KERBEROS_ENABLE_MAP =
CommonVars("linkis.keytab.enable.map", "cluster1=false,cluster2=true")
diff --git a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
index 1a69510..5e8d8a6 100644
--- a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
+++ b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
@@ -66,8 +66,7 @@
)
}
.foreach { hdfsFileSystemContainer =>
- val locker =
- hdfsFileSystemContainer.getUser + JOINT + hdfsFileSystemContainer.getLabel + LOCKER_SUFFIX
+ val locker = hdfsFileSystemContainer.getUser + LOCKER_SUFFIX
locker.intern() synchronized {
if (hdfsFileSystemContainer.canRemove()) {
fileSystemCache.remove(
@@ -248,7 +247,7 @@
def isKerberosEnabled(label: String): Boolean = {
if (label == null) {
- KERBEROS_ENABLE.getValue
+ KERBEROS_ENABLE
} else {
kerberosValueMapParser(KERBEROS_ENABLE_MAP.getValue).get(label).contains("true")
}
diff --git a/linkis-commons/linkis-hadoop-common/src/test/scala/org/apache/linkis/hadoop/common/conf/HadoopConfTest.scala b/linkis-commons/linkis-hadoop-common/src/test/scala/org/apache/linkis/hadoop/common/conf/HadoopConfTest.scala
index 44ca1da..7c2c7b3 100644
--- a/linkis-commons/linkis-hadoop-common/src/test/scala/org/apache/linkis/hadoop/common/conf/HadoopConfTest.scala
+++ b/linkis-commons/linkis-hadoop-common/src/test/scala/org/apache/linkis/hadoop/common/conf/HadoopConfTest.scala
@@ -26,7 +26,7 @@
def constTest(): Unit = {
Assertions.assertEquals("hadoop", HadoopConf.HADOOP_ROOT_USER.getValue)
- Assertions.assertFalse(HadoopConf.KERBEROS_ENABLE.getValue)
+ Assertions.assertFalse(HadoopConf.KERBEROS_ENABLE)
Assertions.assertEquals("/appcom/keytab/", HadoopConf.KEYTAB_FILE.getValue)
Assertions.assertEquals("127.0.0.1", HadoopConf.KEYTAB_HOST.getValue)
Assertions.assertFalse(HadoopConf.KEYTAB_HOST_ENABLED.getValue)