HAWQ-1622. Cache UGI objects and clean them periodically
Co-authored-by: Lav Jain <ljain@pivotal.io>
Co-authored-by: Ben Christel <bchristel@pivotal.io>
Co-authored-by: Alex Denissov <adenissov@pivotal.io>
Co-authored-by: Shivram Mani <smani@pivotal.io>
Co-authored-by: Francisco Guerrero <aguerrero@pivotal.io>
Co-authored-by: Divya Bhargov <dbhargov@pivotal.io>
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/SessionId.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/SessionId.java
new file mode 100644
index 0000000..172014b
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/SessionId.java
@@ -0,0 +1,91 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * For the purposes of pxf-server, a session is the set of requests processed on a specific segment
+ * on behalf of a particular user and transaction. Grouping requests together into a session allows
+ * us to re-use the UserGroupInformation object (which is expensive to destroy) for each session.
+ * <p>
+ * SessionId is used as the cache key to look up the UserGroupInformation for a request. See {@link
+ * UGICache}.
+ */
+public class SessionId {
+
+ private final String user;
+ private final Integer segmentId;
+ private final String sessionId;
+
+ /**
+ * Create a sessionId
+ *
+ * @param segmentId the calling segment
+ * @param transactionId the identifier for the transaction
+ * @param gpdbUser the GPDB username
+ */
+ public SessionId(Integer segmentId, String transactionId, String gpdbUser) {
+ this.segmentId = segmentId;
+ this.user = gpdbUser;
+ this.sessionId = gpdbUser + ":" + transactionId + ":" + segmentId;
+ }
+
+ /**
+ * @return the segment id
+ */
+ public Integer getSegmentId() {
+ return segmentId;
+ }
+
+ /**
+ * @return the gpdb user name
+ */
+ public String getUser() {
+ return user;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int hashCode() {
+ return sessionId.hashCode();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) return false;
+ if (obj == this) return true;
+ if (obj.getClass() != getClass()) return false;
+
+ SessionId that = (SessionId) obj;
+ return this.sessionId.equals(that.sessionId);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString() {
+ return "Session = " + sessionId;
+ }
+}
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java
new file mode 100644
index 0000000..febb3ed
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java
@@ -0,0 +1,349 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Ticker;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they
+ * have not been accessed for UGI_CACHE_EXPIRY milliseconds.
+ * <p>
+ * The motivation for caching is that destroying UGIs is slow. The alternative, creating and
+ * destroying a UGI per-request, is wasteful.
+ */
+public class UGICache {
+
+ private static final Log LOG = LogFactory.getLog(UGICache.class);
+ private final Map<SessionId, Entry> cache = new ConcurrentHashMap<>();
+ // There is a separate DelayQueue for each segment (also being used for locking)
+ private final Map<Integer, DelayQueue<Entry>> expirationQueueMap = new HashMap<>();
+ private final UGIProvider ugiProvider;
+ private final Ticker ticker;
+ final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes
+
+ /**
+ * Create a UGICache with the given {@link Ticker} and {@link UGIProvider}. Intended for use by
+ * tests which need to mock UGI creation/destruction and the current time.
+ */
+ UGICache(UGIProvider provider, Ticker ticker) {
+ this.ticker = ticker;
+ this.ugiProvider = provider;
+ }
+
+ /**
+ * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to
+ * create and destroy UserGroupInformation instances.
+ */
+ public UGICache() {
+ this(new UGIProvider(), Ticker.systemTicker());
+ }
+
+ /**
+ * If a UGI for the given session exists in the cache, returns it. Otherwise, creates a new
+ * proxy UGI. In either case this method increments the reference count of the UGI. This method
+ * also destroys expired, unreferenced UGIs for the same segmentId as the given session.
+ *
+ * @param session The user from the session is impersonated by the proxy UGI.
+ * @return the proxy UGI for the given session.
+ */
+ public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException {
+ Integer segmentId = session.getSegmentId();
+ String user = session.getUser();
+ DelayQueue<Entry> delayQueue = getExpirationQueue(segmentId);
+ synchronized (delayQueue) {
+ // Use the opportunity to cleanup any expired entries
+ cleanup(delayQueue);
+ Entry entry = cache.get(session);
+ if (entry == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(session + " Creating proxy user = " + user);
+ }
+ entry = new Entry(ticker, ugiProvider.createProxyUGI(user), session);
+ delayQueue.offer(entry);
+ cache.put(session, entry);
+ }
+ entry.incrementRefCount();
+ return entry.getUGI();
+ }
+ }
+
+ /**
+ * Decrement reference count for the given session's UGI. Resets the time at which the UGI will
+ * expire to UGI_CACHE_EXPIRY milliseconds in the future.
+ *
+ * @param session the session for which we want to release the UGI.
+ * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it
+ * is now unreferenced).
+ */
+ public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) {
+
+ Entry entry = cache.get(session);
+
+ if (entry == null) {
+ throw new IllegalStateException("Cannot release UGI for this session; it is not cached: " + session);
+ }
+
+ DelayQueue<Entry> expirationQueue = getExpirationQueue(session.getSegmentId());
+
+ synchronized (expirationQueue) {
+ entry.decrementRefCount();
+ expirationQueue.remove(entry);
+ if (cleanImmediatelyIfNoRefs && entry.isNotInUse()) {
+ closeUGI(entry);
+ } else {
+ // Reset expiration time and put it back in the queue
+ // only when we don't close the UGI
+ entry.resetTime();
+ expirationQueue.offer(entry);
+ }
+ }
+ }
+
+ /**
+ * @return the size of the cache
+ */
+ int size() {
+ return cache.size();
+ }
+
+ /**
+ * This method is not thread-safe, and is intended to be called in tests.
+ *
+ * @return the sum of the sizes of the internal queues
+ */
+ int allQueuesSize() {
+ int count = 0;
+ for (DelayQueue queue : expirationQueueMap.values()) {
+ count += queue.size();
+ }
+ return count;
+ }
+
+ /**
+ * This method is O(n) in the number of cache entries and should only be called in tests.
+ *
+ * @param session
+ * @return determine whether the session is in the internal cache
+ */
+ boolean contains(SessionId session) {
+ DelayQueue<Entry> expirationQueue = getExpirationQueue(session.getSegmentId());
+ synchronized (expirationQueue) {
+ Entry entry = cache.get(session);
+ return entry != null && expirationQueue.contains(entry);
+ }
+ }
+
+ /**
+ * Get the queue of cache entries associated with a segment, creating it if it doesn't yet
+ * exist. This lets us lazily populate the expirationQueueMap.
+ *
+ * @param segmentId
+ * @return the {@link DelayQueue} associated to the segment.
+ */
+ private DelayQueue<Entry> getExpirationQueue(Integer segmentId) {
+ DelayQueue<Entry> queue = expirationQueueMap.get(segmentId);
+ if (queue == null) {
+ synchronized (expirationQueueMap) {
+ queue = expirationQueueMap.get(segmentId);
+ if (queue == null) {
+ queue = new DelayQueue<>();
+ expirationQueueMap.put(segmentId, queue);
+ }
+ }
+ }
+ return queue;
+ }
+
+ /**
+ * Iterate through all the entries in the queue and close expired {@link UserGroupInformation},
+ * otherwise it resets the timer for every non-expired entry.
+ *
+ * @param expirationQueue
+ */
+ private void cleanup(DelayQueue<Entry> expirationQueue) {
+
+ Entry expiredUGI;
+ while ((expiredUGI = expirationQueue.poll()) != null) {
+ if (expiredUGI.isNotInUse()) {
+ closeUGI(expiredUGI);
+ } else {
+ // The UGI object is still being used by another thread
+ String fsMsg = "FileSystem for proxy user = " + expiredUGI.getSession().getUser();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(expiredUGI.getSession().toString() + " Skipping close of " + fsMsg);
+ }
+ // Place it back in the queue if still in use and was not closed
+ expiredUGI.resetTime();
+ expirationQueue.offer(expiredUGI);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Delay Queue Size for segment " +
+ expiredUGI.getSession().getSegmentId() + " = " + expirationQueue.size());
+ }
+ }
+ }
+
+ /**
+ * This method must be called from a synchronized block for the delayQueue for the given
+ * session.getSegmentId(). Removes the cachedUGI from the internal cache and then passes it to
+ * {@link UGIProvider} to destroy the UGI.
+ *
+ * @param expiredUGI
+ */
+ private void closeUGI(Entry expiredUGI) {
+ SessionId session = expiredUGI.getSession();
+ String fsMsg = "FileSystem for proxy user = " + session.getUser();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(session.toString() + " Closing " + fsMsg +
+ " (Cache Size = " + cache.size() + ")");
+ }
+
+ try {
+ // Remove it from cache, as cache now has an
+ // expired entry which is not in progress
+ cache.remove(session);
+ ugiProvider.destroy(expiredUGI.getUGI());
+
+ } catch (Throwable t) {
+ LOG.warn(session.toString() + " Error closing " + fsMsg, t);
+ }
+ }
+
+ /**
+ * Stores a {@link UserGroupInformation}, and determines when to expire the UGI.
+ */
+ private static class Entry implements Delayed {
+
+ private volatile long startTime;
+ private final SessionId session;
+ private final UserGroupInformation proxyUGI;
+ private final AtomicInteger referenceCount = new AtomicInteger();
+ private final Ticker ticker;
+
+ /**
+ * Creates a new UGICache Entry.
+ *
+ * @param ticker
+ * @param proxyUGI
+ * @param session
+ */
+ Entry(Ticker ticker, UserGroupInformation proxyUGI, SessionId session) {
+ this.ticker = ticker;
+ this.proxyUGI = proxyUGI;
+ this.session = session;
+ }
+
+ /**
+ * @return the Cached {@link UserGroupInformation}.
+ */
+ public UserGroupInformation getUGI() {
+ return proxyUGI;
+ }
+
+
+ /**
+ * @return the session associated to the {@link UserGroupInformation}.
+ */
+ public SessionId getSession() {
+ return session;
+ }
+
+ /**
+ * @return true if the UGI is being referenced by a session, false otherwise
+ */
+ private boolean isNotInUse() {
+ return referenceCount.get() <= 0;
+ }
+
+ /**
+ * Increments the number of references accessing the {@link UserGroupInformation}.
+ */
+ void incrementRefCount() {
+ referenceCount.incrementAndGet();
+ }
+
+ /**
+ * Decrements the number of references accessing the {@link UserGroupInformation}.
+ */
+ void decrementRefCount() {
+ int count = referenceCount.decrementAndGet();
+ if (count < 0) {
+ throw new IllegalStateException("UGICache.Entry referenceCount may not be decremented past 0.");
+ }
+ }
+
+ /**
+ * Resets the timer for removing this Entry from the cache.
+ */
+ void resetTime() {
+ startTime = currentTimeMillis();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long getDelay(TimeUnit unit) {
+ return unit.convert(getDelayMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Compare the expiry time of this cache entry to another cache entry's expiry time.
+ *
+ * @param other a UGICache.Entry (passing any other kind of Delayed produces an error)
+ * @see java.lang.Comparable<>#compareTo(java.lang.Comparable<>)
+ */
+ @Override
+ public int compareTo(Delayed other) {
+ if (!(other instanceof Entry)) return 1;
+
+ Entry that = (Entry) other;
+ return Long.compare(this.getDelayMillis(), that.getDelayMillis());
+ }
+
+ /**
+ * @return the number of milliseconds remaining before this cache entry expires.
+ */
+ private long getDelayMillis() {
+ return (startTime + UGI_CACHE_EXPIRY) - currentTimeMillis();
+ }
+
+ /**
+ * @return the current Unix timestamp in milliseconds (equivalent to {@link
+ * System}.currentTimeMillis)
+ */
+ private long currentTimeMillis() {
+ return ticker.read() / 1000;
+ }
+ }
+}
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGIProvider.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGIProvider.java
new file mode 100644
index 0000000..38c60a0
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGIProvider.java
@@ -0,0 +1,54 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+
+/**
+ * Thin wrapper around {@link UserGroupInformation} create and destroy methods. We mock this class
+ * in tests to be able to detect when a UGI is created/destroyed, and to isolate our tests from
+ * creating/destroying real UGI instances.
+ */
+class UGIProvider {
+
+ /**
+ * Wrapper for {@link UserGroupInformation} creation
+ *
+ * @param effectiveUser the name of the user that we want to impersonate
+ * @return a {@link UserGroupInformation} for impersonation.
+ * @throws IOException
+ */
+ UserGroupInformation createProxyUGI(String effectiveUser) throws IOException {
+ return UserGroupInformation.createProxyUser(
+ effectiveUser, UserGroupInformation.getLoginUser());
+ }
+
+ /**
+ * Wrapper for {@link FileSystem}.closeAllForUGI method.
+ * @param ugi the {@link UserGroupInformation} whose filesystem resources we want to free.
+ * @throws IOException
+ */
+ void destroy(UserGroupInformation ugi) throws IOException {
+ FileSystem.closeAllForUGI(ugi);
+ }
+}
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java
index 4cf3d03..0c48cca 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java
@@ -19,22 +19,25 @@
* under the License.
*/
-
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
-import javax.servlet.*;
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hawq.pxf.service.SessionId;
+import org.apache.hawq.pxf.service.UGICache;
import org.apache.hawq.pxf.service.utilities.SecureLogin;
-
/**
* Listener on lifecycle events of our webapp
*/
@@ -42,8 +45,12 @@
private static final Log LOG = LogFactory.getLog(SecurityServletFilter.class);
private static final String USER_HEADER = "X-GP-USER";
- private static final String MISSING_HEADER_ERROR = String.format("Header %s is missing in the request", USER_HEADER);
- private static final String EMPTY_HEADER_ERROR = String.format("Header %s is empty in the request", USER_HEADER);
+ private static final String SEGMENT_ID_HEADER = "X-GP-SEGMENT-ID";
+ private static final String TRANSACTION_ID_HEADER = "X-GP-XID";
+ private static final String LAST_FRAGMENT_HEADER = "X-GP-LAST-FRAGMENT";
+ private static final String MISSING_HEADER_ERROR = "Header %s is missing in the request";
+ private static final String EMPTY_HEADER_ERROR = "Header %s is empty in the request";
+ UGICache proxyUGICache;
/**
* Initializes the filter.
@@ -51,7 +58,8 @@
* @param filterConfig filter configuration
*/
@Override
- public void init(FilterConfig filterConfig) throws ServletException {
+ public void init(FilterConfig filterConfig) {
+ proxyUGICache = new UGICache();
}
/**
@@ -59,54 +67,62 @@
* and create a proxy user to execute further request chain. Responds with an HTTP error if the header is missing
* or the chain processing throws an exception.
*
- * @param request http request
+ * @param request http request
* @param response http response
- * @param chain filter chain
+ * @param chain filter chain
*/
@Override
- public void doFilter(final ServletRequest request, final ServletResponse response, final FilterChain chain) throws IOException, ServletException {
+ public void doFilter(final ServletRequest request, final ServletResponse response, final FilterChain chain)
+ throws IOException, ServletException {
if (SecureLogin.isUserImpersonationEnabled()) {
// retrieve user header and make sure header is present and is not empty
- final String user = ((HttpServletRequest) request).getHeader(USER_HEADER);
- if (user == null) {
- throw new IllegalArgumentException(MISSING_HEADER_ERROR);
- } else if (user.trim().isEmpty()) {
- throw new IllegalArgumentException(EMPTY_HEADER_ERROR);
- }
+ final String gpdbUser = getHeaderValue(request, USER_HEADER, true);
+ final String transactionId = getHeaderValue(request, TRANSACTION_ID_HEADER, true);
+ final Integer segmentId = getHeaderValueInt(request, SEGMENT_ID_HEADER, true);
+ final boolean lastCallForSegment = getHeaderValueBoolean(request, LAST_FRAGMENT_HEADER, false);
+
+ SessionId session = new SessionId(segmentId, transactionId, gpdbUser);
// TODO refresh Kerberos token when security is enabled
- // prepare pivileged action to run on behalf of proxy user
+ // prepare privileged action to run on behalf of proxy user
PrivilegedExceptionAction<Boolean> action = new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws IOException, ServletException {
- LOG.debug("Performing request chain call for proxy user = " + user);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Performing request chain call for proxy user = " + gpdbUser);
+ }
chain.doFilter(request, response);
return true;
}
};
- // create proxy user UGI from the UGI of the logged in user and execute the servlet chain as that user
- UserGroupInformation proxyUGI = null;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Retrieving proxy user for session: " + session);
+ }
try {
- LOG.debug("Creating proxy user = " + user);
- proxyUGI = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
- proxyUGI.doAs(action);
+ // Retrieve proxy user UGI from the UGI of the logged in user
+ // and execute the servlet chain as that user
+ proxyUGICache
+ .getUserGroupInformation(session)
+ .doAs(action);
} catch (UndeclaredThrowableException ute) {
// unwrap the real exception thrown by the action
throw new ServletException(ute.getCause());
} catch (InterruptedException ie) {
throw new ServletException(ie);
} finally {
+ // Optimization to cleanup the cache if it is the last fragment
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Releasing proxy user for session: " + session +
+ (lastCallForSegment ? " Last fragment call." : ""));
+ }
try {
- if (proxyUGI != null) {
- LOG.debug("Closing FileSystem for proxy user = " + proxyUGI.getUserName());
- FileSystem.closeAllForUGI(proxyUGI);
- }
+ proxyUGICache.release(session, lastCallForSegment);
} catch (Throwable t) {
- LOG.warn("Error closing FileSystem for proxy user = " + proxyUGI.getUserName());
+ LOG.error("Error releasing UGICache for session: " + session, t);
}
}
} else {
@@ -122,4 +138,25 @@
public void destroy() {
}
+ private Integer getHeaderValueInt(ServletRequest request, String headerKey, boolean required)
+ throws IllegalArgumentException {
+ String value = getHeaderValue(request, headerKey, required);
+ return value != null ? Integer.valueOf(value) : null;
+ }
+
+ private String getHeaderValue(ServletRequest request, String headerKey, boolean required)
+ throws IllegalArgumentException {
+ String value = ((HttpServletRequest) request).getHeader(headerKey);
+ if (required && value == null) {
+ throw new IllegalArgumentException(String.format(MISSING_HEADER_ERROR, headerKey));
+ } else if (required && value.trim().isEmpty()) {
+ throw new IllegalArgumentException(String.format(EMPTY_HEADER_ERROR, headerKey));
+ }
+ return value;
+ }
+
+ private boolean getHeaderValueBoolean(ServletRequest request, String headerKey, boolean required) {
+ return StringUtils.equals("true", getHeaderValue(request, headerKey, required));
+ }
+
}
diff --git a/pxf/pxf-service/src/main/resources/pxf-log4j.properties b/pxf/pxf-service/src/main/resources/pxf-log4j.properties
index f6bb0ea..3b64e8a 100644
--- a/pxf/pxf-service/src/main/resources/pxf-log4j.properties
+++ b/pxf/pxf-service/src/main/resources/pxf-log4j.properties
@@ -5,9 +5,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheMultiThreadTest.java b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheMultiThreadTest.java
new file mode 100644
index 0000000..ea76b17
--- /dev/null
+++ b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheMultiThreadTest.java
@@ -0,0 +1,131 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.util.internal.ConcurrentSet;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class UGICacheMultiThreadTest {
+ private FakeUgiProvider provider = null;
+ private static final int numberOfSegments = 3;
+ private static final int numberOfUsers = 3;
+ private static final int numberOfTxns = 3;
+ private SessionId[] sessions = new SessionId[numberOfSegments * numberOfUsers * numberOfTxns];
+ private UGICache cache = null;
+ private UGICacheTest.FakeTicker fakeTicker;
+
+ class FakeUgiProvider extends UGIProvider {
+ Set<UserGroupInformation> ugis = new ConcurrentSet<>();
+
+ @Override
+ UserGroupInformation createProxyUGI(String effectiveUser) {
+ UserGroupInformation ugi = mock(UserGroupInformation.class);
+ ugis.add(ugi);
+ return ugi;
+ }
+
+ @Override
+ void destroy(UserGroupInformation ugi) {
+ if (!ugis.remove(ugi)) {
+ throw new IllegalStateException("Tried to destroy UGI that does not exist");
+ }
+ }
+
+ int countUgisInUse() {
+ return ugis.size();
+ }
+ }
+
+ @Before
+ public void setUp() {
+ provider = new FakeUgiProvider();
+
+ int l = 0;
+ for (int i = 0; i < numberOfSegments; i++) {
+ for (int j = 0; j < numberOfUsers; j++) {
+ for (int k = 0; k < numberOfTxns; k++) {
+ sessions[l++] = new SessionId(i, "txn-id-" + k, "the-user-" + j);
+ }
+ }
+ }
+ fakeTicker = new UGICacheTest.FakeTicker();
+ cache = new UGICache(provider, fakeTicker);
+ }
+
+ @Test
+ public void multiThreadedTest() throws Exception {
+ final Random rnd = new SecureRandom();
+ final AtomicInteger finishedCount = new AtomicInteger();
+
+ int threadCount = 500;
+ Thread[] threads = new Thread[threadCount];
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < 100; i++) {
+ for (SessionId session : sessions) {
+ cache.getUserGroupInformation(session);
+ }
+ Thread.sleep(0, rnd.nextInt(1000));
+ for (SessionId session : sessions) {
+ cache.release(session, false);
+ }
+ }
+
+ for (SessionId session : sessions) {
+ cache.getUserGroupInformation(session);
+ cache.release(session, true);
+ }
+
+ finishedCount.incrementAndGet();
+ } catch (IOException | InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ threads[i].start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ assertEquals(threadCount, finishedCount.intValue());
+ assertEquals(0, provider.countUgisInUse());
+ // after the test has completed, the internal cache
+ // should be 0
+ assertEquals(0, cache.size());
+ assertEquals(0, cache.allQueuesSize());
+ }
+}
diff --git a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java
new file mode 100644
index 0000000..44c3f9a
--- /dev/null
+++ b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java
@@ -0,0 +1,330 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Ticker;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class UGICacheTest {
+ private static final long MINUTES = 60 * 1000L;
+ private UGIProvider provider = null;
+ private SessionId session = null;
+ private UGICache cache = null;
+ private FakeTicker fakeTicker;
+
+ static class FakeTicker extends Ticker {
+ private final AtomicLong nanos = new AtomicLong();
+
+ @Override
+ public long read() {
+ return nanos.get();
+ }
+
+ long advanceTime(long milliseconds) {
+ return nanos.addAndGet(milliseconds * 1000) / 1000;
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ provider = mock(UGIProvider.class);
+ when(provider.createProxyUGI(any(String.class))).thenAnswer(new Answer<UserGroupInformation>() {
+ @Override
+ public UserGroupInformation answer(InvocationOnMock invocation) {
+ return mock(UserGroupInformation.class);
+ }
+ });
+
+ session = new SessionId(0, "txn-id", "the-user");
+ fakeTicker = new FakeTicker();
+ cache = new UGICache(provider, fakeTicker);
+ }
+
+ @Test
+ public void getUGIFromEmptyCache() throws Exception {
+ UserGroupInformation ugi = cache.getUserGroupInformation(session);
+ assertNotNull(ugi);
+ verify(provider).createProxyUGI("the-user");
+ }
+
+ @Test
+ public void getSameUGITwiceUsesCache() throws Exception {
+ UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+ UserGroupInformation ugi2 = cache.getUserGroupInformation(session);
+ assertEquals(ugi1, ugi2);
+ verify(provider, times(1)).createProxyUGI("the-user");
+ assertCacheSize(1);
+ }
+
+ @Test
+ public void getUGIWithEquivalentSessionsReturnsTheSameInstance() throws Exception {
+ SessionId session2 = new SessionId(0, "txn-id", "the-user");
+ UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+ UserGroupInformation ugi2 = cache.getUserGroupInformation(session2);
+ assertEquals(ugi1, ugi2);
+ }
+
+ @Test
+ public void getTwoUGIsWithDifferentTransactionsForSameUser() throws Exception {
+ SessionId otherSession = new SessionId(0, "txn-id-2", "the-user");
+ UserGroupInformation proxyUGI1 = cache.getUserGroupInformation(session);
+ UserGroupInformation proxyUGI2 = cache.getUserGroupInformation(otherSession);
+ assertNotEquals(proxyUGI1, proxyUGI2);
+ verify(provider, times(2)).createProxyUGI("the-user");
+ assertCacheSize(2);
+ // getting a new UGI instance for each transaction ID is not strictly necessary, but allows
+ // us to expire UGIs for transactions that have finished. If we reused one UGI per user,
+ // it might never get to expire from the cache, and eventually Kerberos might invalidate
+ // the UGI on its end.
+ }
+
+ @Test
+ public void getTwoUGIsWithDifferentUsers() throws Exception {
+ SessionId otherSession = new SessionId(0, "txn-id", "different-user");
+ UserGroupInformation proxyUGI1 = cache.getUserGroupInformation(session);
+ UserGroupInformation proxyUGI2 = cache.getUserGroupInformation(otherSession);
+ assertNotEquals(proxyUGI1, proxyUGI2);
+ verify(provider, times(1)).createProxyUGI("the-user");
+ verify(provider, times(1)).createProxyUGI("different-user");
+ assertCacheSize(2);
+ assertStillInCache(session, proxyUGI1);
+ assertStillInCache(otherSession, proxyUGI2);
+ }
+
+ @Test
+ public void anySegmentIdIsValid() throws Exception {
+ int crazySegId = Integer.MAX_VALUE;
+ session = new SessionId(crazySegId, "txn-id", "the-user");
+ UserGroupInformation proxyUGI1 = cache.getUserGroupInformation(session);
+ assertNotNull(proxyUGI1);
+ assertStillInCache(session, proxyUGI1);
+ }
+
+ @Test
+ public void ensureCleanUpAfterExpiration() throws Exception {
+ UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+ cache.release(session, false);
+ fakeTicker.advanceTime(UGICache.UGI_CACHE_EXPIRY + 1000);
+
+ SessionId session2 = new SessionId(0, "txn-id", "the-user-2");
+ cache.getUserGroupInformation(session2); // this triggers cleanup of ugi1
+ assertNoLongerInCache(session, ugi1);
+ cache.release(session2, true);
+ assertCacheSize(0);
+ }
+
+ @Test
+ public void ensureExpiredUGIIsNotCleanedUpIfItIsStillReferenced() throws Exception {
+ SessionId session2 = new SessionId(0, "txn-id", "the-user-2");
+ UserGroupInformation stillInUse = cache.getUserGroupInformation(session);
+ fakeTicker.advanceTime(UGICache.UGI_CACHE_EXPIRY + 1000);
+
+ // at this point, stillInUse is expired but still in use
+ cache.getUserGroupInformation(session2); // trigger cleanup
+ assertStillInCache(session, stillInUse);
+ cache.release(session, false);
+ fakeTicker.advanceTime(UGICache.UGI_CACHE_EXPIRY + 1000);
+
+ cache.getUserGroupInformation(session2);
+
+ verify(provider, times(1)).destroy(stillInUse);
+ }
+
+ @Test
+ public void putsItemsBackInTheQueueWhenResettingExpirationDate() throws Exception {
+ SessionId session2 = new SessionId(0, "txn-id", "the-user-2");
+ SessionId session3 = new SessionId(0, "txn-id", "the-user-3");
+
+ UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+ fakeTicker.advanceTime(UGICache.UGI_CACHE_EXPIRY - 1000);
+ UserGroupInformation ugi2 = cache.getUserGroupInformation(session2);
+ cache.release(session2, false);
+ fakeTicker.advanceTime(UGICache.UGI_CACHE_EXPIRY - 1000);
+ cache.release(session, false);
+ fakeTicker.advanceTime(2 * MINUTES);
+ cache.getUserGroupInformation(session3);
+
+ assertStillInCache(session, ugi1);
+ assertNoLongerInCache(session2, ugi2);
+ }
+
+ @Test
+ public void releaseWithoutImmediateCleanup() throws Exception {
+ UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+
+ cache.release(session, false);
+ assertStillInCache(session, ugi1);
+ }
+
+ @Test
+ public void releaseWithImmediateCleanup() throws Exception {
+ UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+
+ cache.release(session, true);
+ assertNoLongerInCache(session, ugi1);
+ }
+
+ @Test
+ public void releaseWithImmediateCleanupOnlyCleansUGIsForThatSegment() throws Exception {
+ UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+
+ SessionId differentSeg = new SessionId(999, "txn-id", "user");
+ UserGroupInformation ugi2 = cache.getUserGroupInformation(differentSeg);
+
+ cache.release(differentSeg, false); // ugi2 is now unreferenced
+ cache.release(session, true);
+ assertNoLongerInCache(session, ugi1);
+ assertStillInCache(differentSeg, ugi2);
+ assertCacheSize(1);
+ }
+
+ @Test
+ public void releaseResetsTheExpirationTime() throws Exception {
+ UserGroupInformation reference1 = cache.getUserGroupInformation(session);
+ cache.getUserGroupInformation(session);
+
+ cache.release(session, true);
+ fakeTicker.advanceTime(UGICache.UGI_CACHE_EXPIRY + 1000);
+ cache.release(session, false);
+ fakeTicker.advanceTime(UGICache.UGI_CACHE_EXPIRY - 1000);
+
+ assertStillInCache(session, reference1);
+ }
+
+ @Test
+ public void releaseAnExpiredUGIResetsTheTimer() throws Exception {
+ UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+ fakeTicker.advanceTime(UGICache.UGI_CACHE_EXPIRY + 1000);
+ cache.release(session, false);
+
+ assertStillInCache(session, ugi1);
+
+ fakeTicker.advanceTime(UGICache.UGI_CACHE_EXPIRY - 1000);
+ SessionId session2 = new SessionId(0, "txn-id", "the-user-2");
+ cache.getUserGroupInformation(session2); // triggers cleanup
+ assertStillInCache(session, ugi1);
+ }
+
+ @Test
+ public void releaseAndReacquireDoesNotFreeResources() throws Exception {
+ cache.getUserGroupInformation(session);
+ cache.release(session, false);
+ UserGroupInformation ugi2 = cache.getUserGroupInformation(session);
+ fakeTicker.advanceTime(UGICache.UGI_CACHE_EXPIRY + 1000);
+ UserGroupInformation ugi3 = cache.getUserGroupInformation(session);
+ // this does not clean up any UGIs because our ugi is still in use.
+ assertEquals(ugi3, ugi2);
+ verify(provider, times(1)).createProxyUGI("the-user");
+ verify(provider, never()).destroy(any(UserGroupInformation.class));
+ assertStillInCache(session, ugi2);
+ }
+
+ @Test
+ public void releaseAndAcquireAfterTimeoutFreesResources() throws Exception {
+ UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+
+ cache.release(session, false);
+ fakeTicker.advanceTime(UGICache.UGI_CACHE_EXPIRY + 1000);
+ assertStillInCache(session, ugi1);
+ UserGroupInformation ugi2 = cache.getUserGroupInformation(session);
+ verify(provider).destroy(ugi1);
+ assertNotEquals(ugi2, ugi1);
+ assertStillInCache(session, ugi2);
+ }
+
+ @Test
+ public void releaseDoesNotFreeResourcesIfUGIIsUsedElsewhere() throws Exception {
+ UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+ cache.getUserGroupInformation(session); // increments ref count to 2
+
+ cache.release(session, true);
+ fakeTicker.advanceTime(60 * MINUTES);
+ // UGI was not cleaned up because we are still holding a reference
+ assertStillInCache(session, ugi1);
+ }
+
+ @Test
+ public void releasingAllReferencesFreesResources() throws Exception {
+ UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+ UserGroupInformation ugi2 = cache.getUserGroupInformation(session);
+
+ assertEquals(ugi1, ugi2);
+
+ cache.release(session, true);
+ assertStillInCache(session, ugi1);
+ cache.release(session, true);
+ // at this point, the initial UGI has been freed.
+ assertNoLongerInCache(session, ugi1);
+ }
+
+ @Test(expected = IOException.class)
+ public void errorsThrownByCreatingAUgiAreNotCaught() throws Exception {
+ when(provider.createProxyUGI("the-user")).thenThrow(new IOException("test exception"));
+ cache.getUserGroupInformation(session);
+ }
+
+ @Test
+ public void errorsThrownByDestroyingAUgiAreCaught() throws Exception {
+ UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+ doThrow(new IOException("test exception")).when(provider).destroy(ugi1);
+ cache.release(session, true); // does not throw
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void releaseAnEntryNotInTheCache() {
+ // this could happen if some caller of the cache calls release twice.
+ cache.release(session, false);
+ }
+
+ private void assertStillInCache(SessionId session, UserGroupInformation ugi) throws Exception {
+ assertTrue(cache.contains(session));
+ verify(provider, never()).destroy(ugi);
+ }
+
+ private void assertNoLongerInCache(SessionId session, UserGroupInformation ugi) throws Exception {
+ assertFalse(cache.contains(session));
+ verify(provider, times(1)).destroy(ugi);
+ }
+
+ private void assertCacheSize(int expectedSize) {
+ assertEquals(expectedSize, cache.size());
+ assertEquals(expectedSize, cache.allQueuesSize());
+ }
+}
diff --git a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilterTest.java b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilterTest.java
new file mode 100644
index 0000000..c4b2c27
--- /dev/null
+++ b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilterTest.java
@@ -0,0 +1,114 @@
+package org.apache.hawq.pxf.service.servlet;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hawq.pxf.service.SessionId;
+import org.apache.hawq.pxf.service.UGICache;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import javax.servlet.FilterChain;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class SecurityServletFilterTest {
+
+ private static final String PROPERTY_KEY_USER_IMPERSONATION = "pxf.service.user.impersonation.enabled";
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+ private HttpServletRequest servletRequest;
+
+ @Before
+ public void setup() {
+ System.setProperty(PROPERTY_KEY_USER_IMPERSONATION, "true");
+ servletRequest = mock(HttpServletRequest.class);
+ }
+
+ @Test
+ public void throwsWhenRequiredHeaderIsEmpty() throws Exception {
+
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("Header X-GP-USER is empty in the request");
+
+ when(servletRequest.getHeader("X-GP-USER")).thenReturn(" ");
+
+ SecurityServletFilter securityServletFilter = new SecurityServletFilter();
+ securityServletFilter.doFilter(servletRequest, mock(ServletResponse.class), mock(FilterChain.class));
+
+ }
+
+ @Test
+ public void throwsWhenRequiredHeaderIsMissing() throws Exception {
+
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("Header X-GP-USER is missing in the request");
+
+ SecurityServletFilter securityServletFilter = new SecurityServletFilter();
+ securityServletFilter.doFilter(servletRequest, mock(ServletResponse.class), mock(FilterChain.class));
+ }
+
+ @Test
+ public void doesNotCleanTheUGICacheOnNonLastCalls() throws Exception {
+
+ when(servletRequest.getHeader("X-GP-USER")).thenReturn("gpadmin");
+ when(servletRequest.getHeader("X-GP-XID")).thenReturn("0");
+ when(servletRequest.getHeader("X-GP-SEGMENT-ID")).thenReturn("1");
+
+ SecurityServletFilter securityServletFilter = new SecurityServletFilter();
+ securityServletFilter.proxyUGICache = mock(UGICache.class);
+
+ when(securityServletFilter.proxyUGICache.getUserGroupInformation(any(SessionId.class))).thenReturn(mock(UserGroupInformation.class));
+
+ securityServletFilter.doFilter(servletRequest, mock(ServletResponse.class), mock(FilterChain.class));
+
+ verify(securityServletFilter.proxyUGICache).release(any(SessionId.class), eq(false));
+ }
+
+ @Test
+ public void tellsTheUGICacheToCleanItselfOnTheLastCallForASegment() throws Exception {
+
+ System.setProperty(PROPERTY_KEY_USER_IMPERSONATION, "true");
+ HttpServletRequest servletRequest = mock(HttpServletRequest.class);
+
+ when(servletRequest.getHeader("X-GP-USER")).thenReturn("gpadmin");
+ when(servletRequest.getHeader("X-GP-XID")).thenReturn("0");
+ when(servletRequest.getHeader("X-GP-SEGMENT-ID")).thenReturn("1");
+ when(servletRequest.getHeader("X-GP-LAST-FRAGMENT")).thenReturn("true");
+
+ SecurityServletFilter securityServletFilter = new SecurityServletFilter();
+ securityServletFilter.proxyUGICache = mock(UGICache.class);
+
+ when(securityServletFilter.proxyUGICache.getUserGroupInformation(any(SessionId.class))).thenReturn(mock(UserGroupInformation.class));
+
+ securityServletFilter.doFilter(servletRequest, mock(ServletResponse.class), mock(FilterChain.class));
+
+ verify(securityServletFilter.proxyUGICache).release(any(SessionId.class), eq(true));
+ }
+}