[FLINK-32223][runtime][security] Add Hive delegation token support

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenIdentifier.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenIdentifier.java
new file mode 100644
index 0000000..94ce2c9
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenIdentifier.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.security.token;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+
+/** Delegation token identifier for HiveServer2. */
+@Internal
+public class HiveServer2DelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
+    public static final Text HIVE_DELEGATION_KIND = new Text("HIVE_DELEGATION_TOKEN");
+
+    public HiveServer2DelegationTokenIdentifier() {}
+
+    public HiveServer2DelegationTokenIdentifier(Text owner, Text renewer, Text realUser) {
+        super(owner, renewer, realUser);
+    }
+
+    public Text getKind() {
+        return HIVE_DELEGATION_KIND;
+    }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProvider.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProvider.java
new file mode 100644
index 0000000..2988f2a
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProvider.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.security.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
+import org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter;
+import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.time.Clock;
+import java.util.Optional;
+
+import static org.apache.flink.runtime.hadoop.HadoopUserUtils.getIssueDate;
+
+/** Delegation token provider for HiveServer2. */
+@Internal
+public class HiveServer2DelegationTokenProvider implements DelegationTokenProvider {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(HiveServer2DelegationTokenProvider.class);
+
+    org.apache.hadoop.conf.Configuration hiveConf;
+
+    private KerberosLoginProvider kerberosLoginProvider;
+
+    private Long tokenRenewalInterval;
+
+    @Override
+    public String serviceName() {
+        return "HiveServer2";
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {
+        hiveConf = getHiveConfiguration(configuration);
+        kerberosLoginProvider = new KerberosLoginProvider(configuration);
+    }
+
+    private org.apache.hadoop.conf.Configuration getHiveConfiguration(Configuration conf) {
+        try {
+            org.apache.hadoop.conf.Configuration hadoopConf =
+                    HadoopUtils.getHadoopConfiguration(conf);
+            hiveConf = new HiveConf(hadoopConf, HiveConf.class);
+        } catch (Exception | NoClassDefFoundError e) {
+            LOG.warn("Fail to create HiveServer2 Configuration", e);
+        }
+        return hiveConf;
+    }
+
+    @Override
+    public boolean delegationTokensRequired() throws Exception {
+        /**
+         * The general rule how a provider/receiver must behave is the following: The provider and
+         * the receiver must be added to the classpath together with all the additionally required
+         * dependencies.
+         *
+         * <p>This null check is required because the HiveServer2 provider is always on classpath
+         * but Hive jars are optional. Such case configuration is not able to be loaded. This
+         * construct is intended to be removed when HiveServer2 provider/receiver pair can be
+         * externalized (namely if a provider/receiver throws an exception then workload must be
+         * stopped).
+         */
+        if (hiveConf == null) {
+            LOG.debug(
+                    "HiveServer2 is not available (not packaged with this application), hence no "
+                            + "hiveServer2 tokens will be acquired.");
+            return false;
+        }
+        try {
+            if (!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) {
+                LOG.debug(
+                        "Hadoop Kerberos is not enabled,hence no hiveServer2 tokens will be acquired.");
+                return false;
+            }
+        } catch (IOException e) {
+            LOG.debug(
+                    "Hadoop Kerberos is not enabled,hence no hiveServer2 tokens will be acquired.",
+                    e);
+            return false;
+        }
+
+        if (hiveConf.getTrimmed("hive.metastore.uris", "").isEmpty()) {
+            LOG.debug(
+                    "The hive.metastore.uris item is empty,hence no hiveServer2 tokens will be acquired.");
+            return false;
+        }
+
+        if (!kerberosLoginProvider.isLoginPossible(false)) {
+            LOG.debug("Login is NOT possible,hence no hiveServer2 tokens will be acquired.");
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public ObtainedDelegationTokens obtainDelegationTokens() throws Exception {
+        UserGroupInformation freshUGI = kerberosLoginProvider.doLoginAndReturnUGI();
+        return freshUGI.doAs(
+                (PrivilegedExceptionAction<ObtainedDelegationTokens>)
+                        () -> {
+                            Preconditions.checkNotNull(hiveConf);
+                            Hive hive = Hive.get((HiveConf) hiveConf);
+                            Clock clock = Clock.systemDefaultZone();
+                            try {
+                                LOG.info("Obtaining Kerberos security token for HiveServer2");
+
+                                String principal =
+                                        hiveConf.getTrimmed(
+                                                "hive.metastore.kerberos.principal", "");
+
+                                String tokenStr =
+                                        hive.getDelegationToken(
+                                                UserGroupInformation.getCurrentUser().getUserName(),
+                                                principal);
+                                Token<HiveServer2DelegationTokenIdentifier> hive2Token =
+                                        new Token<>();
+                                hive2Token.decodeFromUrlString(tokenStr);
+
+                                Credentials credentials = new Credentials();
+                                credentials.addToken(hive2Token.getKind(), hive2Token);
+
+                                HiveServer2DelegationTokenIdentifier tokenIdentifier =
+                                        hive2Token.decodeIdentifier();
+
+                                if (tokenRenewalInterval == null) {
+                                    tokenRenewalInterval =
+                                            getTokenRenewalInterval(
+                                                    clock, tokenIdentifier, hive, tokenStr);
+                                }
+                                Optional<Long> validUntil =
+                                        getTokenRenewalDate(
+                                                clock, tokenIdentifier, tokenRenewalInterval);
+
+                                return new ObtainedDelegationTokens(
+                                        HadoopDelegationTokenConverter.serialize(credentials),
+                                        validUntil);
+
+                            } catch (Exception e) {
+                                LOG.error(
+                                        "Failed to obtain delegation token for {}",
+                                        this.serviceName(),
+                                        e);
+                                throw new FlinkRuntimeException(e);
+                            } finally {
+                                Hive.closeCurrent();
+                            }
+                        });
+    }
+
+    @VisibleForTesting
+    Long getTokenRenewalInterval(
+            Clock clock,
+            HiveServer2DelegationTokenIdentifier tokenIdentifier,
+            Hive hive,
+            String tokenStr) {
+        Long interval;
+        LOG.debug("Got Delegation token is {} ", tokenIdentifier);
+        long newExpiration = getNewExpiration(hive, tokenStr);
+        String tokenKind = tokenIdentifier.getKind().toString();
+        interval = newExpiration - getIssueDate(clock, tokenKind, tokenIdentifier);
+        LOG.info("Renewal interval is {} for token {}", interval, tokenKind);
+        return interval;
+    }
+
+    @VisibleForTesting
+    long getNewExpiration(Hive hive, String tokenStr) {
+        try {
+            return hive.getMSC().renewDelegationToken(tokenStr);
+        } catch (Exception e) {
+            LOG.error("renew Delegation Token failed", e);
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    @VisibleForTesting
+    Optional<Long> getTokenRenewalDate(
+            Clock clock,
+            HiveServer2DelegationTokenIdentifier tokenIdentifier,
+            Long renewalInterval) {
+        if (renewalInterval < 0) {
+            LOG.debug("Negative renewal interval so no renewal date is calculated");
+            return Optional.empty();
+        }
+
+        try {
+            String tokenKind = tokenIdentifier.getKind().toString();
+            long date = getIssueDate(clock, tokenKind, tokenIdentifier) + renewalInterval;
+            LOG.debug("Renewal date is {} for token {}", date, tokenKind);
+            return Optional.of(date);
+        } catch (Exception e) {
+            throw new FlinkRuntimeException(e);
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenReceiver.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenReceiver.java
new file mode 100644
index 0000000..3d35704
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenReceiver.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.security.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenReceiver;
+
+/** Delegation token receiver for HiveServer2. */
+@Internal
+public class HiveServer2DelegationTokenReceiver extends HadoopDelegationTokenReceiver {
+
+    @Override
+    public String serviceName() {
+        return "HiveServer2";
+    }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
new file mode 100644
index 0000000..4b5e6b1
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider
diff --git a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
new file mode 100644
index 0000000..03204e6
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.security.token.HiveServer2DelegationTokenReceiver
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProviderITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProviderITCase.java
new file mode 100644
index 0000000..0bce268
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProviderITCase.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.security.token;
+
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import sun.security.krb5.KrbException;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Clock;
+import java.time.ZoneId;
+import java.util.Optional;
+
+import static java.time.Instant.ofEpochMilli;
+import static org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_KEYTAB;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for {@link HiveServer2DelegationTokenProvider}. */
+public class HiveServer2DelegationTokenProviderITCase {
+
+    @BeforeAll
+    public static void setPropertiesToEnableKerberosConfigInit() throws KrbException {
+        System.setProperty("java.security.krb5.realm", "EXAMPLE.COM");
+        System.setProperty("java.security.krb5.kdc", "kdc");
+        System.setProperty("java.security.krb5.conf", "/dev/null");
+        sun.security.krb5.Config.refresh();
+    }
+
+    @AfterAll
+    public static void cleanupHadoopConfigs() {
+        UserGroupInformation.setConfiguration(new Configuration());
+    }
+
+    private class TestHiveServer2DelegationToken extends HiveServer2DelegationTokenIdentifier {
+        @Override
+        public long getIssueDate() {
+            return 1000;
+        }
+    }
+
+    @Test
+    public void delegationTokensRequiredShouldReturnFalseWhenKerberosIsNotEnabled()
+            throws Exception {
+        HiveServer2DelegationTokenProvider provider = new HiveServer2DelegationTokenProvider();
+        provider.init(new org.apache.flink.configuration.Configuration());
+        boolean result = provider.delegationTokensRequired();
+        assertFalse(result);
+    }
+
+    @Test
+    public void delegationTokensRequiredShouldReturnFalseWhenHiveMetastoreUrisIsEmpty()
+            throws Exception {
+        UserGroupInformation.setConfiguration(
+                getHadoopConfigWithAuthMethod(UserGroupInformation.AuthenticationMethod.KERBEROS));
+        UserGroupInformation.getCurrentUser()
+                .setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS);
+        HiveServer2DelegationTokenProvider provider = new HiveServer2DelegationTokenProvider();
+        provider.init(new org.apache.flink.configuration.Configuration());
+        boolean result = provider.delegationTokensRequired();
+        assertFalse(result);
+    }
+
+    @Test
+    public void delegationTokensRequiredShouldReturnTrueWhenAllConditionsIsRight(
+            @TempDir Path tmpDir) throws Exception {
+        URL resource =
+                Thread.currentThread()
+                        .getContextClassLoader()
+                        .getResource("test-hive-delegation-token/hive-site.xml");
+        HiveConf.setHiveSiteLocation(resource);
+
+        UserGroupInformation.setConfiguration(
+                getHadoopConfigWithAuthMethod(UserGroupInformation.AuthenticationMethod.KERBEROS));
+        UserGroupInformation.getCurrentUser()
+                .setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS);
+        HiveServer2DelegationTokenProvider provider = new HiveServer2DelegationTokenProvider();
+        org.apache.flink.configuration.Configuration configuration =
+                new org.apache.flink.configuration.Configuration();
+
+        final Path keyTab = Files.createFile(tmpDir.resolve("test.keytab"));
+        configuration.setString(KERBEROS_LOGIN_KEYTAB, keyTab.toAbsolutePath().toString());
+        configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, "test@EXAMPLE.COM");
+        provider.init(configuration);
+        boolean result = provider.delegationTokensRequired();
+        assertTrue(result);
+    }
+
+    @Test
+    public void getTokenRenewalIntervalShouldReturnRenewalIntervalWhenNoExceptionIsThrown() {
+        HiveServer2DelegationTokenProvider provider =
+                new HiveServer2DelegationTokenProvider() {
+                    @Override
+                    long getNewExpiration(Hive hive, String tokenStr) {
+                        return 10000;
+                    }
+                };
+        Clock constantClock = Clock.fixed(ofEpochMilli(0), ZoneId.systemDefault());
+        TestHiveServer2DelegationToken testDelegationToken = new TestHiveServer2DelegationToken();
+        Long renewalInterval =
+                provider.getTokenRenewalInterval(constantClock, testDelegationToken, null, "test");
+        assertEquals(9000L, renewalInterval);
+    }
+
+    @Test
+    public void getTokenRenewalIntervalShouldThrowExceptionWhenHiveIsNull() {
+        HiveServer2DelegationTokenProvider provider = new HiveServer2DelegationTokenProvider();
+        Clock constantClock = Clock.fixed(ofEpochMilli(0), ZoneId.systemDefault());
+        TestHiveServer2DelegationToken testDelegationToken = new TestHiveServer2DelegationToken();
+        final String msg = "java.lang.NullPointerException";
+        assertThatThrownBy(
+                        () ->
+                                provider.getTokenRenewalInterval(
+                                        constantClock, testDelegationToken, null, "test"))
+                .isInstanceOf(FlinkRuntimeException.class)
+                .hasMessageContaining(msg);
+    }
+
+    @Test
+    public void getTokenRenewalDateShouldReturnNoneWhenNegativeRenewalInterval() {
+        HiveServer2DelegationTokenProvider provider = new HiveServer2DelegationTokenProvider();
+        Clock constantClock = Clock.fixed(ofEpochMilli(0), ZoneId.systemDefault());
+        TestHiveServer2DelegationToken testDelegationToken = new TestHiveServer2DelegationToken();
+        assertEquals(
+                Optional.empty(),
+                provider.getTokenRenewalDate(constantClock, testDelegationToken, -1L));
+    }
+
+    @Test
+    public void getTokenRenewalDateShouldReturnRenewalDateWhenNotNegativeRenewalInterval() {
+        HiveServer2DelegationTokenProvider provider = new HiveServer2DelegationTokenProvider();
+        Clock constantClock = Clock.fixed(ofEpochMilli(0), ZoneId.systemDefault());
+        TestHiveServer2DelegationToken testDelegationToken = new TestHiveServer2DelegationToken();
+        assertEquals(
+                Optional.of(10000L),
+                provider.getTokenRenewalDate(constantClock, testDelegationToken, 9000L));
+    }
+
+    private Configuration getHadoopConfigWithAuthMethod(
+            UserGroupInformation.AuthenticationMethod authenticationMethod) {
+        Configuration conf = new Configuration(true);
+        conf.set("hadoop.security.authentication", authenticationMethod.name());
+        return conf;
+    }
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/resources/hive-site.xml b/flink-connectors/flink-connector-hive/src/test/resources/hive-site.xml
index 6dbe4c9..59a8be0 100644
--- a/flink-connectors/flink-connector-hive/src/test/resources/hive-site.xml
+++ b/flink-connectors/flink-connector-hive/src/test/resources/hive-site.xml
@@ -19,7 +19,7 @@
 
 <configuration>
 
-    <!-- allow integral partition filter pushdown to avoid unstable test -->
+	<!-- allow integral partition filter pushdown to avoid unstable test -->
 	<property>
 		<name>hive.metastore.integral.jdo.pushdown</name>
 		<value>true</value>
diff --git a/flink-connectors/flink-connector-hive/src/test/resources/test-hive-delegation-token/hive-site.xml b/flink-connectors/flink-connector-hive/src/test/resources/test-hive-delegation-token/hive-site.xml
new file mode 100644
index 0000000..779a0deb
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/resources/test-hive-delegation-token/hive-site.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<configuration>
+
+	<property>
+		<name>hive.metastore.uris</name>
+		<value>thrift://localhost:9083</value>
+	</property>
+
+</configuration>
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/hadoop/HadoopUserUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/hadoop/HadoopUserUtils.java
index c360b58..d91da12 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/hadoop/HadoopUserUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/hadoop/HadoopUserUtils.java
@@ -19,6 +19,11 @@
 package org.apache.flink.runtime.hadoop;
 
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
 
 /**
  * Utility class for working with Hadoop user related classes. This should only be used if Hadoop is
@@ -26,6 +31,8 @@
  */
 public class HadoopUserUtils {
 
+    private static final Logger LOG = LoggerFactory.getLogger(HadoopUserUtils.class);
+
     public static boolean isProxyUser(UserGroupInformation ugi) {
         return ugi.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY;
     }
@@ -35,4 +42,33 @@
                 && ugi.getAuthenticationMethod()
                         == UserGroupInformation.AuthenticationMethod.KERBEROS;
     }
+
+    public static long getIssueDate(
+            Clock clock, String tokenKind, AbstractDelegationTokenIdentifier identifier) {
+        long now = clock.millis();
+        long issueDate = identifier.getIssueDate();
+
+        if (issueDate > now) {
+            LOG.warn(
+                    "Token {} has set up issue date later than current time. (provided: "
+                            + "{} / current timestamp: {}) Please make sure clocks are in sync between "
+                            + "machines. If the issue is not a clock mismatch, consult token implementor to check "
+                            + "whether issue date is valid.",
+                    tokenKind,
+                    issueDate,
+                    now);
+            return issueDate;
+        } else if (issueDate > 0) {
+            return issueDate;
+        } else {
+            LOG.warn(
+                    "Token {} has not set up issue date properly. (provided: {}) "
+                            + "Using current timestamp ({}) as issue date instead. Consult token implementor to fix "
+                            + "the behavior.",
+                    tokenKind,
+                    issueDate,
+                    now);
+            return now;
+        }
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
index 5c3dd48..4a24997 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
@@ -46,6 +46,8 @@
 import java.util.Optional;
 import java.util.Set;
 
+import static org.apache.flink.runtime.hadoop.HadoopUserUtils.getIssueDate;
+
 /** Delegation token provider for Hadoop filesystems. */
 @Internal
 public class HadoopFSDelegationTokenProvider implements DelegationTokenProvider {
@@ -300,33 +302,4 @@
 
         return result;
     }
-
-    @VisibleForTesting
-    long getIssueDate(Clock clock, String tokenKind, AbstractDelegationTokenIdentifier identifier) {
-        long now = clock.millis();
-        long issueDate = identifier.getIssueDate();
-
-        if (issueDate > now) {
-            LOG.warn(
-                    "Token {} has set up issue date later than current time. (provided: "
-                            + "{} / current timestamp: {}) Please make sure clocks are in sync between "
-                            + "machines. If the issue is not a clock mismatch, consult token implementor to check "
-                            + "whether issue date is valid.",
-                    tokenKind,
-                    issueDate,
-                    now);
-            return issueDate;
-        } else if (issueDate > 0) {
-            return issueDate;
-        } else {
-            LOG.warn(
-                    "Token {} has not set up issue date properly. (provided: {}) "
-                            + "Using current timestamp ({}) as issue date instead. Consult token implementor to fix "
-                            + "the behavior.",
-                    tokenKind,
-                    issueDate,
-                    now);
-            return now;
-        }
-    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsITCase.java
index 3a7368f..bfb6869 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsITCase.java
@@ -18,13 +18,22 @@
 
 package org.apache.flink.runtime.hadoop;
 
+import org.apache.flink.runtime.security.token.hadoop.TestHadoopDelegationTokenIdentifier;
+
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
 import org.junit.jupiter.api.Test;
 import org.mockito.MockedStatic;
 
+import java.time.Clock;
+import java.time.ZoneId;
+
+import static java.time.Instant.ofEpochMilli;
+import static org.apache.flink.runtime.hadoop.HadoopUserUtils.getIssueDate;
 import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
 import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.PROXY;
 import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.SIMPLE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -38,6 +47,7 @@
  * implementing a reusable test utility around it, consequently had to resort to relying on mockito.
  */
 class HadoopUserUtilsITCase {
+    private static final long NOW = 100;
 
     @Test
     public void isProxyUserShouldReturnFalseWhenNormalUser() {
@@ -86,4 +96,40 @@
             assertTrue(HadoopUserUtils.hasUserKerberosAuthMethod(userGroupInformation));
         }
     }
+
+    @Test
+    public void getIssueDateShouldReturnIssueDateWithFutureToken() {
+        Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
+        long issueDate = NOW + 1;
+        AbstractDelegationTokenIdentifier tokenIdentifier =
+                new TestHadoopDelegationTokenIdentifier(issueDate);
+
+        assertEquals(
+                issueDate,
+                getIssueDate(constantClock, tokenIdentifier.getKind().toString(), tokenIdentifier));
+    }
+
+    @Test
+    public void getIssueDateShouldReturnIssueDateWithPastToken() {
+        Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
+        long issueDate = NOW - 1;
+        AbstractDelegationTokenIdentifier tokenIdentifier =
+                new TestHadoopDelegationTokenIdentifier(issueDate);
+
+        assertEquals(
+                issueDate,
+                getIssueDate(constantClock, tokenIdentifier.getKind().toString(), tokenIdentifier));
+    }
+
+    @Test
+    public void getIssueDateShouldReturnNowWithInvalidToken() {
+        Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
+        long issueDate = -1;
+        AbstractDelegationTokenIdentifier tokenIdentifier =
+                new TestHadoopDelegationTokenIdentifier(issueDate);
+
+        assertEquals(
+                NOW,
+                getIssueDate(constantClock, tokenIdentifier.getKind().toString(), tokenIdentifier));
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProviderITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProviderITCase.java
index 43ba3ae..731156b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProviderITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProviderITCase.java
@@ -23,7 +23,6 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
@@ -173,51 +172,6 @@
     }
 
     @Test
-    public void getIssueDateShouldReturnIssueDateWithFutureToken() {
-        HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
-
-        Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
-        long issueDate = NOW + 1;
-        AbstractDelegationTokenIdentifier tokenIdentifier =
-                new TestHadoopDelegationTokenIdentifier(issueDate);
-
-        assertEquals(
-                issueDate,
-                provider.getIssueDate(
-                        constantClock, tokenIdentifier.getKind().toString(), tokenIdentifier));
-    }
-
-    @Test
-    public void getIssueDateShouldReturnIssueDateWithPastToken() {
-        HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
-
-        Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
-        long issueDate = NOW - 1;
-        AbstractDelegationTokenIdentifier tokenIdentifier =
-                new TestHadoopDelegationTokenIdentifier(issueDate);
-
-        assertEquals(
-                issueDate,
-                provider.getIssueDate(
-                        constantClock, tokenIdentifier.getKind().toString(), tokenIdentifier));
-    }
-
-    @Test
-    public void getIssueDateShouldReturnNowWithInvalidToken() {
-        HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
-
-        Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
-        long issueDate = -1;
-        AbstractDelegationTokenIdentifier tokenIdentifier =
-                new TestHadoopDelegationTokenIdentifier(issueDate);
-
-        assertEquals(
-                NOW,
-                provider.getIssueDate(
-                        constantClock, tokenIdentifier.getKind().toString(), tokenIdentifier));
-    }
-
-    @Test
     public void obtainDelegationTokenWithStandaloneDeployment() throws Exception {
         HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
         provider.init(new org.apache.flink.configuration.Configuration());