[FLINK-32223][runtime][security] Add Hive delegation token support
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenIdentifier.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenIdentifier.java
new file mode 100644
index 0000000..94ce2c9
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenIdentifier.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.security.token;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+
+/** Delegation token identifier for HiveServer2. */
+@Internal
+public class HiveServer2DelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
+ public static final Text HIVE_DELEGATION_KIND = new Text("HIVE_DELEGATION_TOKEN");
+
+ public HiveServer2DelegationTokenIdentifier() {}
+
+ public HiveServer2DelegationTokenIdentifier(Text owner, Text renewer, Text realUser) {
+ super(owner, renewer, realUser);
+ }
+
+ public Text getKind() {
+ return HIVE_DELEGATION_KIND;
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProvider.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProvider.java
new file mode 100644
index 0000000..2988f2a
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProvider.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.security.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
+import org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter;
+import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.time.Clock;
+import java.util.Optional;
+
+import static org.apache.flink.runtime.hadoop.HadoopUserUtils.getIssueDate;
+
+/** Delegation token provider for HiveServer2. */
+@Internal
+public class HiveServer2DelegationTokenProvider implements DelegationTokenProvider {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(HiveServer2DelegationTokenProvider.class);
+
+ org.apache.hadoop.conf.Configuration hiveConf;
+
+ private KerberosLoginProvider kerberosLoginProvider;
+
+ private Long tokenRenewalInterval;
+
+ @Override
+ public String serviceName() {
+ return "HiveServer2";
+ }
+
+ @Override
+ public void init(Configuration configuration) throws Exception {
+ hiveConf = getHiveConfiguration(configuration);
+ kerberosLoginProvider = new KerberosLoginProvider(configuration);
+ }
+
+ private org.apache.hadoop.conf.Configuration getHiveConfiguration(Configuration conf) {
+ try {
+ org.apache.hadoop.conf.Configuration hadoopConf =
+ HadoopUtils.getHadoopConfiguration(conf);
+ hiveConf = new HiveConf(hadoopConf, HiveConf.class);
+ } catch (Exception | NoClassDefFoundError e) {
+ LOG.warn("Fail to create HiveServer2 Configuration", e);
+ }
+ return hiveConf;
+ }
+
+ @Override
+ public boolean delegationTokensRequired() throws Exception {
+ /**
+ * The general rule how a provider/receiver must behave is the following: The provider and
+ * the receiver must be added to the classpath together with all the additionally required
+ * dependencies.
+ *
+ * <p>This null check is required because the HiveServer2 provider is always on classpath
+ * but Hive jars are optional. Such case configuration is not able to be loaded. This
+ * construct is intended to be removed when HiveServer2 provider/receiver pair can be
+ * externalized (namely if a provider/receiver throws an exception then workload must be
+ * stopped).
+ */
+ if (hiveConf == null) {
+ LOG.debug(
+ "HiveServer2 is not available (not packaged with this application), hence no "
+ + "hiveServer2 tokens will be acquired.");
+ return false;
+ }
+ try {
+ if (!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) {
+ LOG.debug(
+ "Hadoop Kerberos is not enabled,hence no hiveServer2 tokens will be acquired.");
+ return false;
+ }
+ } catch (IOException e) {
+ LOG.debug(
+ "Hadoop Kerberos is not enabled,hence no hiveServer2 tokens will be acquired.",
+ e);
+ return false;
+ }
+
+ if (hiveConf.getTrimmed("hive.metastore.uris", "").isEmpty()) {
+ LOG.debug(
+ "The hive.metastore.uris item is empty,hence no hiveServer2 tokens will be acquired.");
+ return false;
+ }
+
+ if (!kerberosLoginProvider.isLoginPossible(false)) {
+ LOG.debug("Login is NOT possible,hence no hiveServer2 tokens will be acquired.");
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public ObtainedDelegationTokens obtainDelegationTokens() throws Exception {
+ UserGroupInformation freshUGI = kerberosLoginProvider.doLoginAndReturnUGI();
+ return freshUGI.doAs(
+ (PrivilegedExceptionAction<ObtainedDelegationTokens>)
+ () -> {
+ Preconditions.checkNotNull(hiveConf);
+ Hive hive = Hive.get((HiveConf) hiveConf);
+ Clock clock = Clock.systemDefaultZone();
+ try {
+ LOG.info("Obtaining Kerberos security token for HiveServer2");
+
+ String principal =
+ hiveConf.getTrimmed(
+ "hive.metastore.kerberos.principal", "");
+
+ String tokenStr =
+ hive.getDelegationToken(
+ UserGroupInformation.getCurrentUser().getUserName(),
+ principal);
+ Token<HiveServer2DelegationTokenIdentifier> hive2Token =
+ new Token<>();
+ hive2Token.decodeFromUrlString(tokenStr);
+
+ Credentials credentials = new Credentials();
+ credentials.addToken(hive2Token.getKind(), hive2Token);
+
+ HiveServer2DelegationTokenIdentifier tokenIdentifier =
+ hive2Token.decodeIdentifier();
+
+ if (tokenRenewalInterval == null) {
+ tokenRenewalInterval =
+ getTokenRenewalInterval(
+ clock, tokenIdentifier, hive, tokenStr);
+ }
+ Optional<Long> validUntil =
+ getTokenRenewalDate(
+ clock, tokenIdentifier, tokenRenewalInterval);
+
+ return new ObtainedDelegationTokens(
+ HadoopDelegationTokenConverter.serialize(credentials),
+ validUntil);
+
+ } catch (Exception e) {
+ LOG.error(
+ "Failed to obtain delegation token for {}",
+ this.serviceName(),
+ e);
+ throw new FlinkRuntimeException(e);
+ } finally {
+ Hive.closeCurrent();
+ }
+ });
+ }
+
+ @VisibleForTesting
+ Long getTokenRenewalInterval(
+ Clock clock,
+ HiveServer2DelegationTokenIdentifier tokenIdentifier,
+ Hive hive,
+ String tokenStr) {
+ Long interval;
+ LOG.debug("Got Delegation token is {} ", tokenIdentifier);
+ long newExpiration = getNewExpiration(hive, tokenStr);
+ String tokenKind = tokenIdentifier.getKind().toString();
+ interval = newExpiration - getIssueDate(clock, tokenKind, tokenIdentifier);
+ LOG.info("Renewal interval is {} for token {}", interval, tokenKind);
+ return interval;
+ }
+
+ @VisibleForTesting
+ long getNewExpiration(Hive hive, String tokenStr) {
+ try {
+ return hive.getMSC().renewDelegationToken(tokenStr);
+ } catch (Exception e) {
+ LOG.error("renew Delegation Token failed", e);
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
+ @VisibleForTesting
+ Optional<Long> getTokenRenewalDate(
+ Clock clock,
+ HiveServer2DelegationTokenIdentifier tokenIdentifier,
+ Long renewalInterval) {
+ if (renewalInterval < 0) {
+ LOG.debug("Negative renewal interval so no renewal date is calculated");
+ return Optional.empty();
+ }
+
+ try {
+ String tokenKind = tokenIdentifier.getKind().toString();
+ long date = getIssueDate(clock, tokenKind, tokenIdentifier) + renewalInterval;
+ LOG.debug("Renewal date is {} for token {}", date, tokenKind);
+ return Optional.of(date);
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(e);
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenReceiver.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenReceiver.java
new file mode 100644
index 0000000..3d35704
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenReceiver.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.security.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenReceiver;
+
+/** Delegation token receiver for HiveServer2. */
+@Internal
+public class HiveServer2DelegationTokenReceiver extends HadoopDelegationTokenReceiver {
+
+ @Override
+ public String serviceName() {
+ return "HiveServer2";
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
new file mode 100644
index 0000000..4b5e6b1
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider
diff --git a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
new file mode 100644
index 0000000..03204e6
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.security.token.HiveServer2DelegationTokenReceiver
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProviderITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProviderITCase.java
new file mode 100644
index 0000000..0bce268
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProviderITCase.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.security.token;
+
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import sun.security.krb5.KrbException;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Clock;
+import java.time.ZoneId;
+import java.util.Optional;
+
+import static java.time.Instant.ofEpochMilli;
+import static org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_KEYTAB;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for {@link HiveServer2DelegationTokenProvider}. */
+public class HiveServer2DelegationTokenProviderITCase {
+
+ @BeforeAll
+ public static void setPropertiesToEnableKerberosConfigInit() throws KrbException {
+ System.setProperty("java.security.krb5.realm", "EXAMPLE.COM");
+ System.setProperty("java.security.krb5.kdc", "kdc");
+ System.setProperty("java.security.krb5.conf", "/dev/null");
+ sun.security.krb5.Config.refresh();
+ }
+
+ @AfterAll
+ public static void cleanupHadoopConfigs() {
+ UserGroupInformation.setConfiguration(new Configuration());
+ }
+
+ private class TestHiveServer2DelegationToken extends HiveServer2DelegationTokenIdentifier {
+ @Override
+ public long getIssueDate() {
+ return 1000;
+ }
+ }
+
+ @Test
+ public void delegationTokensRequiredShouldReturnFalseWhenKerberosIsNotEnabled()
+ throws Exception {
+ HiveServer2DelegationTokenProvider provider = new HiveServer2DelegationTokenProvider();
+ provider.init(new org.apache.flink.configuration.Configuration());
+ boolean result = provider.delegationTokensRequired();
+ assertFalse(result);
+ }
+
+ @Test
+ public void delegationTokensRequiredShouldReturnFalseWhenHiveMetastoreUrisIsEmpty()
+ throws Exception {
+ UserGroupInformation.setConfiguration(
+ getHadoopConfigWithAuthMethod(UserGroupInformation.AuthenticationMethod.KERBEROS));
+ UserGroupInformation.getCurrentUser()
+ .setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS);
+ HiveServer2DelegationTokenProvider provider = new HiveServer2DelegationTokenProvider();
+ provider.init(new org.apache.flink.configuration.Configuration());
+ boolean result = provider.delegationTokensRequired();
+ assertFalse(result);
+ }
+
+ @Test
+ public void delegationTokensRequiredShouldReturnTrueWhenAllConditionsIsRight(
+ @TempDir Path tmpDir) throws Exception {
+ URL resource =
+ Thread.currentThread()
+ .getContextClassLoader()
+ .getResource("test-hive-delegation-token/hive-site.xml");
+ HiveConf.setHiveSiteLocation(resource);
+
+ UserGroupInformation.setConfiguration(
+ getHadoopConfigWithAuthMethod(UserGroupInformation.AuthenticationMethod.KERBEROS));
+ UserGroupInformation.getCurrentUser()
+ .setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS);
+ HiveServer2DelegationTokenProvider provider = new HiveServer2DelegationTokenProvider();
+ org.apache.flink.configuration.Configuration configuration =
+ new org.apache.flink.configuration.Configuration();
+
+ final Path keyTab = Files.createFile(tmpDir.resolve("test.keytab"));
+ configuration.setString(KERBEROS_LOGIN_KEYTAB, keyTab.toAbsolutePath().toString());
+ configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, "test@EXAMPLE.COM");
+ provider.init(configuration);
+ boolean result = provider.delegationTokensRequired();
+ assertTrue(result);
+ }
+
+ @Test
+ public void getTokenRenewalIntervalShouldReturnRenewalIntervalWhenNoExceptionIsThrown() {
+ HiveServer2DelegationTokenProvider provider =
+ new HiveServer2DelegationTokenProvider() {
+ @Override
+ long getNewExpiration(Hive hive, String tokenStr) {
+ return 10000;
+ }
+ };
+ Clock constantClock = Clock.fixed(ofEpochMilli(0), ZoneId.systemDefault());
+ TestHiveServer2DelegationToken testDelegationToken = new TestHiveServer2DelegationToken();
+ Long renewalInterval =
+ provider.getTokenRenewalInterval(constantClock, testDelegationToken, null, "test");
+ assertEquals(9000L, renewalInterval);
+ }
+
+ @Test
+ public void getTokenRenewalIntervalShouldThrowExceptionWhenHiveIsNull() {
+ HiveServer2DelegationTokenProvider provider = new HiveServer2DelegationTokenProvider();
+ Clock constantClock = Clock.fixed(ofEpochMilli(0), ZoneId.systemDefault());
+ TestHiveServer2DelegationToken testDelegationToken = new TestHiveServer2DelegationToken();
+ final String msg = "java.lang.NullPointerException";
+ assertThatThrownBy(
+ () ->
+ provider.getTokenRenewalInterval(
+ constantClock, testDelegationToken, null, "test"))
+ .isInstanceOf(FlinkRuntimeException.class)
+ .hasMessageContaining(msg);
+ }
+
+ @Test
+ public void getTokenRenewalDateShouldReturnNoneWhenNegativeRenewalInterval() {
+ HiveServer2DelegationTokenProvider provider = new HiveServer2DelegationTokenProvider();
+ Clock constantClock = Clock.fixed(ofEpochMilli(0), ZoneId.systemDefault());
+ TestHiveServer2DelegationToken testDelegationToken = new TestHiveServer2DelegationToken();
+ assertEquals(
+ Optional.empty(),
+ provider.getTokenRenewalDate(constantClock, testDelegationToken, -1L));
+ }
+
+ @Test
+ public void getTokenRenewalDateShouldReturnRenewalDateWhenNotNegativeRenewalInterval() {
+ HiveServer2DelegationTokenProvider provider = new HiveServer2DelegationTokenProvider();
+ Clock constantClock = Clock.fixed(ofEpochMilli(0), ZoneId.systemDefault());
+ TestHiveServer2DelegationToken testDelegationToken = new TestHiveServer2DelegationToken();
+ assertEquals(
+ Optional.of(10000L),
+ provider.getTokenRenewalDate(constantClock, testDelegationToken, 9000L));
+ }
+
+ private Configuration getHadoopConfigWithAuthMethod(
+ UserGroupInformation.AuthenticationMethod authenticationMethod) {
+ Configuration conf = new Configuration(true);
+ conf.set("hadoop.security.authentication", authenticationMethod.name());
+ return conf;
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/resources/hive-site.xml b/flink-connectors/flink-connector-hive/src/test/resources/hive-site.xml
index 6dbe4c9..59a8be0 100644
--- a/flink-connectors/flink-connector-hive/src/test/resources/hive-site.xml
+++ b/flink-connectors/flink-connector-hive/src/test/resources/hive-site.xml
@@ -19,7 +19,7 @@
<configuration>
- <!-- allow integral partition filter pushdown to avoid unstable test -->
+ <!-- allow integral partition filter pushdown to avoid unstable test -->
<property>
<name>hive.metastore.integral.jdo.pushdown</name>
<value>true</value>
diff --git a/flink-connectors/flink-connector-hive/src/test/resources/test-hive-delegation-token/hive-site.xml b/flink-connectors/flink-connector-hive/src/test/resources/test-hive-delegation-token/hive-site.xml
new file mode 100644
index 0000000..779a0deb
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/resources/test-hive-delegation-token/hive-site.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<configuration>
+
+ <property>
+ <name>hive.metastore.uris</name>
+ <value>thrift://localhost:9083</value>
+ </property>
+
+</configuration>
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/hadoop/HadoopUserUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/hadoop/HadoopUserUtils.java
index c360b58..d91da12 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/hadoop/HadoopUserUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/hadoop/HadoopUserUtils.java
@@ -19,6 +19,11 @@
package org.apache.flink.runtime.hadoop;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
/**
* Utility class for working with Hadoop user related classes. This should only be used if Hadoop is
@@ -26,6 +31,8 @@
*/
public class HadoopUserUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(HadoopUserUtils.class);
+
public static boolean isProxyUser(UserGroupInformation ugi) {
return ugi.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY;
}
@@ -35,4 +42,33 @@
&& ugi.getAuthenticationMethod()
== UserGroupInformation.AuthenticationMethod.KERBEROS;
}
+
+ public static long getIssueDate(
+ Clock clock, String tokenKind, AbstractDelegationTokenIdentifier identifier) {
+ long now = clock.millis();
+ long issueDate = identifier.getIssueDate();
+
+ if (issueDate > now) {
+ LOG.warn(
+ "Token {} has set up issue date later than current time. (provided: "
+ + "{} / current timestamp: {}) Please make sure clocks are in sync between "
+ + "machines. If the issue is not a clock mismatch, consult token implementor to check "
+ + "whether issue date is valid.",
+ tokenKind,
+ issueDate,
+ now);
+ return issueDate;
+ } else if (issueDate > 0) {
+ return issueDate;
+ } else {
+ LOG.warn(
+ "Token {} has not set up issue date properly. (provided: {}) "
+ + "Using current timestamp ({}) as issue date instead. Consult token implementor to fix "
+ + "the behavior.",
+ tokenKind,
+ issueDate,
+ now);
+ return now;
+ }
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
index 5c3dd48..4a24997 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
@@ -46,6 +46,8 @@
import java.util.Optional;
import java.util.Set;
+import static org.apache.flink.runtime.hadoop.HadoopUserUtils.getIssueDate;
+
/** Delegation token provider for Hadoop filesystems. */
@Internal
public class HadoopFSDelegationTokenProvider implements DelegationTokenProvider {
@@ -300,33 +302,4 @@
return result;
}
-
- @VisibleForTesting
- long getIssueDate(Clock clock, String tokenKind, AbstractDelegationTokenIdentifier identifier) {
- long now = clock.millis();
- long issueDate = identifier.getIssueDate();
-
- if (issueDate > now) {
- LOG.warn(
- "Token {} has set up issue date later than current time. (provided: "
- + "{} / current timestamp: {}) Please make sure clocks are in sync between "
- + "machines. If the issue is not a clock mismatch, consult token implementor to check "
- + "whether issue date is valid.",
- tokenKind,
- issueDate,
- now);
- return issueDate;
- } else if (issueDate > 0) {
- return issueDate;
- } else {
- LOG.warn(
- "Token {} has not set up issue date properly. (provided: {}) "
- + "Using current timestamp ({}) as issue date instead. Consult token implementor to fix "
- + "the behavior.",
- tokenKind,
- issueDate,
- now);
- return now;
- }
- }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsITCase.java
index 3a7368f..bfb6869 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsITCase.java
@@ -18,13 +18,22 @@
package org.apache.flink.runtime.hadoop;
+import org.apache.flink.runtime.security.token.hadoop.TestHadoopDelegationTokenIdentifier;
+
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
+import java.time.Clock;
+import java.time.ZoneId;
+
+import static java.time.Instant.ofEpochMilli;
+import static org.apache.flink.runtime.hadoop.HadoopUserUtils.getIssueDate;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.PROXY;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.SIMPLE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@@ -38,6 +47,7 @@
* implementing a reusable test utility around it, consequently had to resort to relying on mockito.
*/
class HadoopUserUtilsITCase {
+ private static final long NOW = 100;
@Test
public void isProxyUserShouldReturnFalseWhenNormalUser() {
@@ -86,4 +96,40 @@
assertTrue(HadoopUserUtils.hasUserKerberosAuthMethod(userGroupInformation));
}
}
+
+ @Test
+ public void getIssueDateShouldReturnIssueDateWithFutureToken() {
+ Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
+ long issueDate = NOW + 1;
+ AbstractDelegationTokenIdentifier tokenIdentifier =
+ new TestHadoopDelegationTokenIdentifier(issueDate);
+
+ assertEquals(
+ issueDate,
+ getIssueDate(constantClock, tokenIdentifier.getKind().toString(), tokenIdentifier));
+ }
+
+ @Test
+ public void getIssueDateShouldReturnIssueDateWithPastToken() {
+ Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
+ long issueDate = NOW - 1;
+ AbstractDelegationTokenIdentifier tokenIdentifier =
+ new TestHadoopDelegationTokenIdentifier(issueDate);
+
+ assertEquals(
+ issueDate,
+ getIssueDate(constantClock, tokenIdentifier.getKind().toString(), tokenIdentifier));
+ }
+
+ @Test
+ public void getIssueDateShouldReturnNowWithInvalidToken() {
+ Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
+ long issueDate = -1;
+ AbstractDelegationTokenIdentifier tokenIdentifier =
+ new TestHadoopDelegationTokenIdentifier(issueDate);
+
+ assertEquals(
+ NOW,
+ getIssueDate(constantClock, tokenIdentifier.getKind().toString(), tokenIdentifier));
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProviderITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProviderITCase.java
index 43ba3ae..731156b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProviderITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProviderITCase.java
@@ -23,7 +23,6 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -173,51 +172,6 @@
}
@Test
- public void getIssueDateShouldReturnIssueDateWithFutureToken() {
- HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
-
- Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
- long issueDate = NOW + 1;
- AbstractDelegationTokenIdentifier tokenIdentifier =
- new TestHadoopDelegationTokenIdentifier(issueDate);
-
- assertEquals(
- issueDate,
- provider.getIssueDate(
- constantClock, tokenIdentifier.getKind().toString(), tokenIdentifier));
- }
-
- @Test
- public void getIssueDateShouldReturnIssueDateWithPastToken() {
- HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
-
- Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
- long issueDate = NOW - 1;
- AbstractDelegationTokenIdentifier tokenIdentifier =
- new TestHadoopDelegationTokenIdentifier(issueDate);
-
- assertEquals(
- issueDate,
- provider.getIssueDate(
- constantClock, tokenIdentifier.getKind().toString(), tokenIdentifier));
- }
-
- @Test
- public void getIssueDateShouldReturnNowWithInvalidToken() {
- HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
-
- Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
- long issueDate = -1;
- AbstractDelegationTokenIdentifier tokenIdentifier =
- new TestHadoopDelegationTokenIdentifier(issueDate);
-
- assertEquals(
- NOW,
- provider.getIssueDate(
- constantClock, tokenIdentifier.getKind().toString(), tokenIdentifier));
- }
-
- @Test
public void obtainDelegationTokenWithStandaloneDeployment() throws Exception {
HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
provider.init(new org.apache.flink.configuration.Configuration());