HAWQ-1622. Cache UGI objects and clean them periodically

Co-authored-by: Lav Jain <ljain@pivotal.io>
Co-authored-by: Ben Christel <bchristel@pivotal.io>
Co-authored-by: Alex Denissov <adenissov@pivotal.io>
Co-authored-by: Shivram Mani <smani@pivotal.io>
Co-authored-by: Francisco Guerrero <aguerrero@pivotal.io>
Co-authored-by: Divya Bhargov <dbhargov@pivotal.io>
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/SessionId.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/SessionId.java
new file mode 100644
index 0000000..172014b
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/SessionId.java
@@ -0,0 +1,91 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * For the purposes of pxf-server, a session is the set of requests processed on a specific segment
+ * on behalf of a particular user and transaction. Grouping requests together into a session allows
+ * us to re-use the UserGroupInformation object (which is expensive to destroy) for each session.
+ * <p>
+ * SessionId is used as the cache key to look up the UserGroupInformation for a request. See {@link
+ * UGICache}.
+ */
+public class SessionId {
+
+    private final String user;
+    private final Integer segmentId;
+    private final String sessionId;
+
+    /**
+     * Create a sessionId
+     *
+     * @param segmentId     the calling segment
+     * @param transactionId the identifier for the transaction
+     * @param gpdbUser      the GPDB username
+     */
+    public SessionId(Integer segmentId, String transactionId, String gpdbUser) {
+        this.segmentId = segmentId;
+        this.user = gpdbUser;
+        this.sessionId = gpdbUser + ":" + transactionId + ":" + segmentId;
+    }
+
+    /**
+     * @return the segment id
+     */
+    public Integer getSegmentId() {
+        return segmentId;
+    }
+
+    /**
+     * @return the gpdb user name
+     */
+    public String getUser() {
+        return user;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public int hashCode() {
+        return sessionId.hashCode();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null) return false;
+        if (obj == this) return true;
+        if (obj.getClass() != getClass()) return false;
+
+        SessionId that = (SessionId) obj;
+        return this.sessionId.equals(that.sessionId);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String toString() {
+        return "Session = " + sessionId;
+    }
+}
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java
new file mode 100644
index 0000000..febb3ed
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java
@@ -0,0 +1,349 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Ticker;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they
+ * have not been accessed for UGI_CACHE_EXPIRY milliseconds.
+ * <p>
+ * The motivation for caching is that destroying UGIs is slow. The alternative, creating and
+ * destroying a UGI per-request, is wasteful.
+ */
+public class UGICache {
+
+    private static final Log LOG = LogFactory.getLog(UGICache.class);
+    private final Map<SessionId, Entry> cache = new ConcurrentHashMap<>();
+    // There is a separate DelayQueue for each segment (also being used for locking)
+    private final Map<Integer, DelayQueue<Entry>> expirationQueueMap = new HashMap<>();
+    private final UGIProvider ugiProvider;
+    private final Ticker ticker;
+    final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes
+
+    /**
+     * Create a UGICache with the given {@link Ticker} and {@link UGIProvider}. Intended for use by
+     * tests which need to mock UGI creation/destruction and the current time.
+     */
+    UGICache(UGIProvider provider, Ticker ticker) {
+        this.ticker = ticker;
+        this.ugiProvider = provider;
+    }
+
+    /**
+     * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to
+     * create and destroy UserGroupInformation instances.
+     */
+    public UGICache() {
+        this(new UGIProvider(), Ticker.systemTicker());
+    }
+
+    /**
+     * If a UGI for the given session exists in the cache, returns it. Otherwise, creates a new
+     * proxy UGI. In either case this method increments the reference count of the UGI. This method
+     * also destroys expired, unreferenced UGIs for the same segmentId as the given session.
+     *
+     * @param session The user from the session is impersonated by the proxy UGI.
+     * @return the proxy UGI for the given session.
+     */
+    public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException {
+        Integer segmentId = session.getSegmentId();
+        String user = session.getUser();
+        DelayQueue<Entry> delayQueue = getExpirationQueue(segmentId);
+        synchronized (delayQueue) {
+            // Use the opportunity to cleanup any expired entries
+            cleanup(delayQueue);
+            Entry entry = cache.get(session);
+            if (entry == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(session + " Creating proxy user = " + user);
+                }
+                entry = new Entry(ticker, ugiProvider.createProxyUGI(user), session);
+                delayQueue.offer(entry);
+                cache.put(session, entry);
+            }
+            entry.incrementRefCount();
+            return entry.getUGI();
+        }
+    }
+
+    /**
+     * Decrement reference count for the given session's UGI. Resets the time at which the UGI will
+     * expire to UGI_CACHE_EXPIRY milliseconds in the future.
+     *
+     * @param session                  the session for which we want to release the UGI.
+     * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it
+     *                                 is now unreferenced).
+     */
+    public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) {
+
+        Entry entry = cache.get(session);
+
+        if (entry == null) {
+            throw new IllegalStateException("Cannot release UGI for this session; it is not cached: " + session);
+        }
+
+        DelayQueue<Entry> expirationQueue = getExpirationQueue(session.getSegmentId());
+
+        synchronized (expirationQueue) {
+            entry.decrementRefCount();
+            expirationQueue.remove(entry);
+            if (cleanImmediatelyIfNoRefs && entry.isNotInUse()) {
+                closeUGI(entry);
+            } else {
+                // Reset expiration time and put it back in the queue
+                // only when we don't close the UGI
+                entry.resetTime();
+                expirationQueue.offer(entry);
+            }
+        }
+    }
+
+    /**
+     * @return the size of the cache
+     */
+    int size() {
+        return cache.size();
+    }
+
+    /**
+     * This method is not thread-safe, and is intended to be called in tests.
+     *
+     * @return the sum of the sizes of the internal queues
+     */
+    int allQueuesSize() {
+        int count = 0;
+        for (DelayQueue queue : expirationQueueMap.values()) {
+            count += queue.size();
+        }
+        return count;
+    }
+
+    /**
+     * This method is O(n) in the number of cache entries and should only be called in tests.
+     *
+     * @param session
+     * @return determine whether the session is in the internal cache
+     */
+    boolean contains(SessionId session) {
+        DelayQueue<Entry> expirationQueue = getExpirationQueue(session.getSegmentId());
+        synchronized (expirationQueue) {
+            Entry entry = cache.get(session);
+            return entry != null && expirationQueue.contains(entry);
+        }
+    }
+
+    /**
+     * Get the queue of cache entries associated with a segment, creating it if it doesn't yet
+     * exist. This lets us lazily populate the expirationQueueMap.
+     *
+     * @param segmentId
+     * @return the {@link DelayQueue} associated to the segment.
+     */
+    private DelayQueue<Entry> getExpirationQueue(Integer segmentId) {
+        DelayQueue<Entry> queue = expirationQueueMap.get(segmentId);
+        if (queue == null) {
+            synchronized (expirationQueueMap) {
+                queue = expirationQueueMap.get(segmentId);
+                if (queue == null) {
+                    queue = new DelayQueue<>();
+                    expirationQueueMap.put(segmentId, queue);
+                }
+            }
+        }
+        return queue;
+    }
+
+    /**
+     * Iterate through all the entries in the queue and close expired {@link UserGroupInformation},
+     * otherwise it resets the timer for every non-expired entry.
+     *
+     * @param expirationQueue
+     */
+    private void cleanup(DelayQueue<Entry> expirationQueue) {
+
+        Entry expiredUGI;
+        while ((expiredUGI = expirationQueue.poll()) != null) {
+            if (expiredUGI.isNotInUse()) {
+                closeUGI(expiredUGI);
+            } else {
+                // The UGI object is still being used by another thread
+                String fsMsg = "FileSystem for proxy user = " + expiredUGI.getSession().getUser();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(expiredUGI.getSession().toString() + " Skipping close of " + fsMsg);
+                }
+                // Place it back in the queue if still in use and was not closed
+                expiredUGI.resetTime();
+                expirationQueue.offer(expiredUGI);
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Delay Queue Size for segment " +
+                        expiredUGI.getSession().getSegmentId() + " = " + expirationQueue.size());
+            }
+        }
+    }
+
+    /**
+     * This method must be called from a synchronized block for the delayQueue for the given
+     * session.getSegmentId(). Removes the cachedUGI from the internal cache and then passes it to
+     * {@link UGIProvider} to destroy the UGI.
+     *
+     * @param expiredUGI
+     */
+    private void closeUGI(Entry expiredUGI) {
+        SessionId session = expiredUGI.getSession();
+        String fsMsg = "FileSystem for proxy user = " + session.getUser();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(session.toString() + " Closing " + fsMsg +
+                    " (Cache Size = " + cache.size() + ")");
+        }
+
+        try {
+            // Remove it from cache, as cache now has an
+            // expired entry which is not in progress
+            cache.remove(session);
+            ugiProvider.destroy(expiredUGI.getUGI());
+
+        } catch (Throwable t) {
+            LOG.warn(session.toString() + " Error closing " + fsMsg, t);
+        }
+    }
+
+    /**
+     * Stores a {@link UserGroupInformation}, and determines when to expire the UGI.
+     */
+    private static class Entry implements Delayed {
+
+        private volatile long startTime;
+        private final SessionId session;
+        private final UserGroupInformation proxyUGI;
+        private final AtomicInteger referenceCount = new AtomicInteger();
+        private final Ticker ticker;
+
+        /**
+         * Creates a new UGICache Entry.
+         *
+         * @param ticker
+         * @param proxyUGI
+         * @param session
+         */
+        Entry(Ticker ticker, UserGroupInformation proxyUGI, SessionId session) {
+            this.ticker = ticker;
+            this.proxyUGI = proxyUGI;
+            this.session = session;
+        }
+
+        /**
+         * @return the Cached {@link UserGroupInformation}.
+         */
+        public UserGroupInformation getUGI() {
+            return proxyUGI;
+        }
+
+
+        /**
+         * @return the session associated to the {@link UserGroupInformation}.
+         */
+        public SessionId getSession() {
+            return session;
+        }
+
+        /**
+         * @return true if the UGI is being referenced by a session, false otherwise
+         */
+        private boolean isNotInUse() {
+            return referenceCount.get() <= 0;
+        }
+
+        /**
+         * Increments the number of references accessing the {@link UserGroupInformation}.
+         */
+        void incrementRefCount() {
+            referenceCount.incrementAndGet();
+        }
+
+        /**
+         * Decrements the number of references accessing the {@link UserGroupInformation}.
+         */
+        void decrementRefCount() {
+            int count = referenceCount.decrementAndGet();
+            if (count < 0) {
+                throw new IllegalStateException("UGICache.Entry referenceCount may not be decremented past 0.");
+            }
+        }
+
+        /**
+         * Resets the timer for removing this Entry from the cache.
+         */
+        void resetTime() {
+            startTime = currentTimeMillis();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        public long getDelay(TimeUnit unit) {
+            return unit.convert(getDelayMillis(), TimeUnit.MILLISECONDS);
+        }
+
+        /**
+         * Compare the expiry time of this cache entry to another cache entry's expiry time.
+         *
+         * @param other a UGICache.Entry (passing any other kind of Delayed produces an error)
+         * @see java.lang.Comparable<>#compareTo(java.lang.Comparable<>)
+         */
+        @Override
+        public int compareTo(Delayed other) {
+            if (!(other instanceof Entry)) return 1;
+
+            Entry that = (Entry) other;
+            return Long.compare(this.getDelayMillis(), that.getDelayMillis());
+        }
+
+        /**
+         * @return the number of milliseconds remaining before this cache entry expires.
+         */
+        private long getDelayMillis() {
+            return (startTime + UGI_CACHE_EXPIRY) - currentTimeMillis();
+        }
+
+        /**
+         * @return the current Unix timestamp in milliseconds (equivalent to {@link
+         * System}.currentTimeMillis)
+         */
+        private long currentTimeMillis() {
+            return ticker.read() / 1000;
+        }
+    }
+}
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGIProvider.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGIProvider.java
new file mode 100644
index 0000000..38c60a0
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGIProvider.java
@@ -0,0 +1,54 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+
+/**
+ * Thin wrapper around {@link UserGroupInformation} create and destroy methods. We mock this class
+ * in tests to be able to detect when a UGI is created/destroyed, and to isolate our tests from
+ * creating/destroying real UGI instances.
+ */
+class UGIProvider {
+
+    /**
+     * Wrapper for {@link UserGroupInformation} creation
+     *
+     * @param effectiveUser the name of the user that we want to impersonate
+     * @return a {@link UserGroupInformation} for impersonation.
+     * @throws IOException
+     */
+    UserGroupInformation createProxyUGI(String effectiveUser) throws IOException {
+        return UserGroupInformation.createProxyUser(
+                effectiveUser, UserGroupInformation.getLoginUser());
+    }
+
+    /**
+     * Wrapper for {@link FileSystem}.closeAllForUGI method.
+     * @param ugi the {@link UserGroupInformation} whose filesystem resources we want to free.
+     * @throws IOException
+     */
+    void destroy(UserGroupInformation ugi) throws IOException {
+        FileSystem.closeAllForUGI(ugi);
+    }
+}
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java
index 4cf3d03..0c48cca 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java
@@ -19,22 +19,25 @@
  * under the License.
  */
 
-
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.security.PrivilegedExceptionAction;
 
-import javax.servlet.*;
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
 import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hawq.pxf.service.SessionId;
+import org.apache.hawq.pxf.service.UGICache;
 import org.apache.hawq.pxf.service.utilities.SecureLogin;
 
-
 /**
  * Listener on lifecycle events of our webapp
  */
@@ -42,8 +45,12 @@
 
     private static final Log LOG = LogFactory.getLog(SecurityServletFilter.class);
     private static final String USER_HEADER = "X-GP-USER";
-    private static final String MISSING_HEADER_ERROR = String.format("Header %s is missing in the request", USER_HEADER);
-    private static final String EMPTY_HEADER_ERROR = String.format("Header %s is empty in the request", USER_HEADER);
+    private static final String SEGMENT_ID_HEADER = "X-GP-SEGMENT-ID";
+    private static final String TRANSACTION_ID_HEADER = "X-GP-XID";
+    private static final String LAST_FRAGMENT_HEADER = "X-GP-LAST-FRAGMENT";
+    private static final String MISSING_HEADER_ERROR = "Header %s is missing in the request";
+    private static final String EMPTY_HEADER_ERROR = "Header %s is empty in the request";
+    UGICache proxyUGICache;
 
     /**
      * Initializes the filter.
@@ -51,7 +58,8 @@
      * @param filterConfig filter configuration
      */
     @Override
-    public void init(FilterConfig filterConfig) throws ServletException {
+    public void init(FilterConfig filterConfig) {
+        proxyUGICache = new UGICache();
     }
 
     /**
@@ -59,54 +67,62 @@
      * and create a proxy user to execute further request chain. Responds with an HTTP error if the header is missing
      * or the chain processing throws an exception.
      *
-     * @param request http request
+     * @param request  http request
      * @param response http response
-     * @param chain filter chain
+     * @param chain    filter chain
      */
     @Override
-    public void doFilter(final ServletRequest request, final ServletResponse response, final FilterChain chain) throws IOException, ServletException {
+    public void doFilter(final ServletRequest request, final ServletResponse response, final FilterChain chain)
+            throws IOException, ServletException {
 
         if (SecureLogin.isUserImpersonationEnabled()) {
 
             // retrieve user header and make sure header is present and is not empty
-            final String user = ((HttpServletRequest) request).getHeader(USER_HEADER);
-            if (user == null) {
-                throw new IllegalArgumentException(MISSING_HEADER_ERROR);
-            } else if (user.trim().isEmpty()) {
-                throw new IllegalArgumentException(EMPTY_HEADER_ERROR);
-            }
+            final String gpdbUser = getHeaderValue(request, USER_HEADER, true);
+            final String transactionId = getHeaderValue(request, TRANSACTION_ID_HEADER, true);
+            final Integer segmentId = getHeaderValueInt(request, SEGMENT_ID_HEADER, true);
+            final boolean lastCallForSegment = getHeaderValueBoolean(request, LAST_FRAGMENT_HEADER, false);
+
+            SessionId session = new SessionId(segmentId, transactionId, gpdbUser);
 
             // TODO refresh Kerberos token when security is enabled
 
-            // prepare pivileged action to run on behalf of proxy user
+            // prepare privileged action to run on behalf of proxy user
             PrivilegedExceptionAction<Boolean> action = new PrivilegedExceptionAction<Boolean>() {
                 @Override
                 public Boolean run() throws IOException, ServletException {
-                    LOG.debug("Performing request chain call for proxy user = " + user);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Performing request chain call for proxy user = " + gpdbUser);
+                    }
                     chain.doFilter(request, response);
                     return true;
                 }
             };
 
-            // create proxy user UGI from the UGI of the logged in user and execute the servlet chain as that user
-            UserGroupInformation proxyUGI = null;
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Retrieving proxy user for session: " + session);
+            }
             try {
-                LOG.debug("Creating proxy user = " + user);
-                proxyUGI = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
-                proxyUGI.doAs(action);
+                // Retrieve proxy user UGI from the UGI of the logged in user
+                // and execute the servlet chain as that user
+                proxyUGICache
+                        .getUserGroupInformation(session)
+                        .doAs(action);
             } catch (UndeclaredThrowableException ute) {
                 // unwrap the real exception thrown by the action
                 throw new ServletException(ute.getCause());
             } catch (InterruptedException ie) {
                 throw new ServletException(ie);
             } finally {
+                // Optimization to cleanup the cache if it is the last fragment
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Releasing proxy user for session: " + session +
+                            (lastCallForSegment ? " Last fragment call." : ""));
+                }
                 try {
-                    if (proxyUGI != null) {
-                        LOG.debug("Closing FileSystem for proxy user = " + proxyUGI.getUserName());
-                        FileSystem.closeAllForUGI(proxyUGI);
-                    }
+                    proxyUGICache.release(session, lastCallForSegment);
                 } catch (Throwable t) {
-                    LOG.warn("Error closing FileSystem for proxy user = " + proxyUGI.getUserName());
+                    LOG.error("Error releasing UGICache for session: " + session, t);
                 }
             }
         } else {
@@ -122,4 +138,25 @@
     public void destroy() {
     }
 
+    private Integer getHeaderValueInt(ServletRequest request, String headerKey, boolean required)
+            throws IllegalArgumentException {
+        String value = getHeaderValue(request, headerKey, required);
+        return value != null ? Integer.valueOf(value) : null;
+    }
+
+    private String getHeaderValue(ServletRequest request, String headerKey, boolean required)
+            throws IllegalArgumentException {
+        String value = ((HttpServletRequest) request).getHeader(headerKey);
+        if (required && value == null) {
+            throw new IllegalArgumentException(String.format(MISSING_HEADER_ERROR, headerKey));
+        } else if (required && value.trim().isEmpty()) {
+            throw new IllegalArgumentException(String.format(EMPTY_HEADER_ERROR, headerKey));
+        }
+        return value;
+    }
+
+    private boolean getHeaderValueBoolean(ServletRequest request, String headerKey, boolean required) {
+        return StringUtils.equals("true", getHeaderValue(request, headerKey, required));
+    }
+
 }
diff --git a/pxf/pxf-service/src/main/resources/pxf-log4j.properties b/pxf/pxf-service/src/main/resources/pxf-log4j.properties
index f6bb0ea..3b64e8a 100644
--- a/pxf/pxf-service/src/main/resources/pxf-log4j.properties
+++ b/pxf/pxf-service/src/main/resources/pxf-log4j.properties
@@ -5,9 +5,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheMultiThreadTest.java b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheMultiThreadTest.java
new file mode 100644
index 0000000..ea76b17
--- /dev/null
+++ b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheMultiThreadTest.java
@@ -0,0 +1,131 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.util.internal.ConcurrentSet;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class UGICacheMultiThreadTest {
+    private FakeUgiProvider provider = null;
+    private static final int numberOfSegments = 3;
+    private static final int numberOfUsers = 3;
+    private static final int numberOfTxns = 3;
+    private SessionId[] sessions = new SessionId[numberOfSegments * numberOfUsers * numberOfTxns];
+    private UGICache cache = null;
+    private UGICacheTest.FakeTicker fakeTicker;
+
+    class FakeUgiProvider extends UGIProvider {
+        Set<UserGroupInformation> ugis = new ConcurrentSet<>();
+
+        @Override
+        UserGroupInformation createProxyUGI(String effectiveUser) {
+            UserGroupInformation ugi = mock(UserGroupInformation.class);
+            ugis.add(ugi);
+            return ugi;
+        }
+
+        @Override
+        void destroy(UserGroupInformation ugi) {
+            if (!ugis.remove(ugi)) {
+                throw new IllegalStateException("Tried to destroy UGI that does not exist");
+            }
+        }
+
+        int countUgisInUse() {
+            return ugis.size();
+        }
+    }
+
+    @Before
+    public void setUp() {
+        provider = new FakeUgiProvider();
+
+        int l = 0;
+        for (int i = 0; i < numberOfSegments; i++) {
+            for (int j = 0; j < numberOfUsers; j++) {
+                for (int k = 0; k < numberOfTxns; k++) {
+                    sessions[l++] = new SessionId(i, "txn-id-" + k, "the-user-" + j);
+                }
+            }
+        }
+        fakeTicker = new UGICacheTest.FakeTicker();
+        cache = new UGICache(provider, fakeTicker);
+    }
+
+    @Test
+    public void multiThreadedTest() throws Exception {
+        final Random rnd = new SecureRandom();
+        final AtomicInteger finishedCount = new AtomicInteger();
+
+        int threadCount = 500;
+        Thread[] threads = new Thread[threadCount];
+
+        for (int i = 0; i < threads.length; i++) {
+            threads[i] = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        for (int i = 0; i < 100; i++) {
+                            for (SessionId session : sessions) {
+                                cache.getUserGroupInformation(session);
+                            }
+                            Thread.sleep(0, rnd.nextInt(1000));
+                            for (SessionId session : sessions) {
+                                cache.release(session, false);
+                            }
+                        }
+
+                        for (SessionId session : sessions) {
+                            cache.getUserGroupInformation(session);
+                            cache.release(session, true);
+                        }
+
+                        finishedCount.incrementAndGet();
+                    } catch (IOException | InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+            threads[i].start();
+        }
+
+        for (Thread thread : threads) {
+            thread.join();
+        }
+
+        assertEquals(threadCount, finishedCount.intValue());
+        assertEquals(0, provider.countUgisInUse());
+        // after the test has completed, the internal cache
+        // should be 0
+        assertEquals(0, cache.size());
+        assertEquals(0, cache.allQueuesSize());
+    }
+}
diff --git a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java
new file mode 100644
index 0000000..44c3f9a
--- /dev/null
+++ b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java
@@ -0,0 +1,330 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Ticker;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class UGICacheTest {
+    private static final long MINUTES = 60 * 1000L;
+    private UGIProvider provider = null;
+    private SessionId session = null;
+    private UGICache cache = null;
+    private FakeTicker fakeTicker;
+
+    static class FakeTicker extends Ticker {
+        private final AtomicLong nanos = new AtomicLong();
+
+        @Override
+        public long read() {
+            return nanos.get();
+        }
+
+        long advanceTime(long milliseconds) {
+            return nanos.addAndGet(milliseconds * 1000) / 1000;
+        }
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        provider = mock(UGIProvider.class);
+        when(provider.createProxyUGI(any(String.class))).thenAnswer(new Answer<UserGroupInformation>() {
+            @Override
+            public UserGroupInformation answer(InvocationOnMock invocation) {
+                return mock(UserGroupInformation.class);
+            }
+        });
+
+        session = new SessionId(0, "txn-id", "the-user");
+        fakeTicker = new FakeTicker();
+        cache = new UGICache(provider, fakeTicker);
+    }
+
+    @Test
+    public void getUGIFromEmptyCache() throws Exception {
+        UserGroupInformation ugi = cache.getUserGroupInformation(session);
+        assertNotNull(ugi);
+        verify(provider).createProxyUGI("the-user");
+    }
+
+    @Test
+    public void getSameUGITwiceUsesCache() throws Exception {
+        UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+        UserGroupInformation ugi2 = cache.getUserGroupInformation(session);
+        assertEquals(ugi1, ugi2);
+        verify(provider, times(1)).createProxyUGI("the-user");
+        assertCacheSize(1);
+    }
+
+    @Test
+    public void getUGIWithEquivalentSessionsReturnsTheSameInstance() throws Exception {
+        SessionId session2 = new SessionId(0, "txn-id", "the-user");
+        UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+        UserGroupInformation ugi2 = cache.getUserGroupInformation(session2);
+        assertEquals(ugi1, ugi2);
+    }
+
+    @Test
+    public void getTwoUGIsWithDifferentTransactionsForSameUser() throws Exception {
+        SessionId otherSession = new SessionId(0, "txn-id-2", "the-user");
+        UserGroupInformation proxyUGI1 = cache.getUserGroupInformation(session);
+        UserGroupInformation proxyUGI2 = cache.getUserGroupInformation(otherSession);
+        assertNotEquals(proxyUGI1, proxyUGI2);
+        verify(provider, times(2)).createProxyUGI("the-user");
+        assertCacheSize(2);
+        // getting a new UGI instance for each transaction ID is not strictly necessary, but allows
+        // us to expire UGIs for transactions that have finished. If we reused one UGI per user,
+        // it might never get to expire from the cache, and eventually Kerberos might invalidate
+        // the UGI on its end.
+    }
+
+    @Test
+    public void getTwoUGIsWithDifferentUsers() throws Exception {
+        SessionId otherSession = new SessionId(0, "txn-id", "different-user");
+        UserGroupInformation proxyUGI1 = cache.getUserGroupInformation(session);
+        UserGroupInformation proxyUGI2 = cache.getUserGroupInformation(otherSession);
+        assertNotEquals(proxyUGI1, proxyUGI2);
+        verify(provider, times(1)).createProxyUGI("the-user");
+        verify(provider, times(1)).createProxyUGI("different-user");
+        assertCacheSize(2);
+        assertStillInCache(session, proxyUGI1);
+        assertStillInCache(otherSession, proxyUGI2);
+    }
+
+    @Test
+    public void anySegmentIdIsValid() throws Exception {
+        int crazySegId = Integer.MAX_VALUE;
+        session = new SessionId(crazySegId, "txn-id", "the-user");
+        UserGroupInformation proxyUGI1 = cache.getUserGroupInformation(session);
+        assertNotNull(proxyUGI1);
+        assertStillInCache(session, proxyUGI1);
+    }
+
+    @Test
+    public void ensureCleanUpAfterExpiration() throws Exception {
+        UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+        cache.release(session, false);
+        fakeTicker.advanceTime(UGICache.UGI_CACHE_EXPIRY + 1000);
+
+        SessionId session2 = new SessionId(0, "txn-id", "the-user-2");
+        cache.getUserGroupInformation(session2); // this triggers cleanup of ugi1
+        assertNoLongerInCache(session, ugi1);
+        cache.release(session2, true);
+        assertCacheSize(0);
+    }
+
+    @Test
+    public void ensureExpiredUGIIsNotCleanedUpIfItIsStillReferenced() throws Exception {
+        SessionId session2 = new SessionId(0, "txn-id", "the-user-2");
+        UserGroupInformation stillInUse = cache.getUserGroupInformation(session);
+        fakeTicker.advanceTime(UGICache.UGI_CACHE_EXPIRY + 1000);
+
+        // at this point, stillInUse is expired but still in use
+        cache.getUserGroupInformation(session2); // trigger cleanup
+        assertStillInCache(session, stillInUse);
+        cache.release(session, false);
+        fakeTicker.advanceTime(UGICache.UGI_CACHE_EXPIRY + 1000);
+
+        cache.getUserGroupInformation(session2);
+
+        verify(provider, times(1)).destroy(stillInUse);
+    }
+
+    @Test
+    public void putsItemsBackInTheQueueWhenResettingExpirationDate() throws Exception {
+        SessionId session2 = new SessionId(0, "txn-id", "the-user-2");
+        SessionId session3 = new SessionId(0, "txn-id", "the-user-3");
+
+        UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+        fakeTicker.advanceTime(UGICache.UGI_CACHE_EXPIRY - 1000);
+        UserGroupInformation ugi2 = cache.getUserGroupInformation(session2);
+        cache.release(session2, false);
+        fakeTicker.advanceTime(UGICache.UGI_CACHE_EXPIRY - 1000);
+        cache.release(session, false);
+        fakeTicker.advanceTime(2 * MINUTES);
+        cache.getUserGroupInformation(session3);
+
+        assertStillInCache(session, ugi1);
+        assertNoLongerInCache(session2, ugi2);
+    }
+
+    @Test
+    public void releaseWithoutImmediateCleanup() throws Exception {
+        UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+
+        cache.release(session, false);
+        assertStillInCache(session, ugi1);
+    }
+
+    @Test
+    public void releaseWithImmediateCleanup() throws Exception {
+        UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+
+        cache.release(session, true);
+        assertNoLongerInCache(session, ugi1);
+    }
+
+    @Test
+    public void releaseWithImmediateCleanupOnlyCleansUGIsForThatSegment() throws Exception {
+        UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+
+        SessionId differentSeg = new SessionId(999, "txn-id", "user");
+        UserGroupInformation ugi2 = cache.getUserGroupInformation(differentSeg);
+
+        cache.release(differentSeg, false); // ugi2 is now unreferenced
+        cache.release(session, true);
+        assertNoLongerInCache(session, ugi1);
+        assertStillInCache(differentSeg, ugi2);
+        assertCacheSize(1);
+    }
+
+    @Test
+    public void releaseResetsTheExpirationTime() throws Exception {
+        UserGroupInformation reference1 = cache.getUserGroupInformation(session);
+        cache.getUserGroupInformation(session);
+
+        cache.release(session, true);
+        fakeTicker.advanceTime(UGICache.UGI_CACHE_EXPIRY + 1000);
+        cache.release(session, false);
+        fakeTicker.advanceTime(UGICache.UGI_CACHE_EXPIRY - 1000);
+
+        assertStillInCache(session, reference1);
+    }
+
+    @Test
+    public void releaseAnExpiredUGIResetsTheTimer() throws Exception {
+        UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+        fakeTicker.advanceTime(UGICache.UGI_CACHE_EXPIRY + 1000);
+        cache.release(session, false);
+
+        assertStillInCache(session, ugi1);
+
+        fakeTicker.advanceTime(UGICache.UGI_CACHE_EXPIRY - 1000);
+        SessionId session2 = new SessionId(0, "txn-id", "the-user-2");
+        cache.getUserGroupInformation(session2); // triggers cleanup
+        assertStillInCache(session, ugi1);
+    }
+
+    @Test
+    public void releaseAndReacquireDoesNotFreeResources() throws Exception {
+        cache.getUserGroupInformation(session);
+        cache.release(session, false);
+        UserGroupInformation ugi2 = cache.getUserGroupInformation(session);
+        fakeTicker.advanceTime(UGICache.UGI_CACHE_EXPIRY + 1000);
+        UserGroupInformation ugi3 = cache.getUserGroupInformation(session);
+        // this does not clean up any UGIs because our ugi is still in use.
+        assertEquals(ugi3, ugi2);
+        verify(provider, times(1)).createProxyUGI("the-user");
+        verify(provider, never()).destroy(any(UserGroupInformation.class));
+        assertStillInCache(session, ugi2);
+    }
+
+    @Test
+    public void releaseAndAcquireAfterTimeoutFreesResources() throws Exception {
+        UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+
+        cache.release(session, false);
+        fakeTicker.advanceTime(UGICache.UGI_CACHE_EXPIRY + 1000);
+        assertStillInCache(session, ugi1);
+        UserGroupInformation ugi2 = cache.getUserGroupInformation(session);
+        verify(provider).destroy(ugi1);
+        assertNotEquals(ugi2, ugi1);
+        assertStillInCache(session, ugi2);
+    }
+
+    @Test
+    public void releaseDoesNotFreeResourcesIfUGIIsUsedElsewhere() throws Exception {
+        UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+        cache.getUserGroupInformation(session); // increments ref count to 2
+
+        cache.release(session, true);
+        fakeTicker.advanceTime(60 * MINUTES);
+        // UGI was not cleaned up because we are still holding a reference
+        assertStillInCache(session, ugi1);
+    }
+
+    @Test
+    public void releasingAllReferencesFreesResources() throws Exception {
+        UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+        UserGroupInformation ugi2 = cache.getUserGroupInformation(session);
+
+        assertEquals(ugi1, ugi2);
+
+        cache.release(session, true);
+        assertStillInCache(session, ugi1);
+        cache.release(session, true);
+        // at this point, the initial UGI has been freed.
+        assertNoLongerInCache(session, ugi1);
+    }
+
+    @Test(expected = IOException.class)
+    public void errorsThrownByCreatingAUgiAreNotCaught() throws Exception {
+        when(provider.createProxyUGI("the-user")).thenThrow(new IOException("test exception"));
+        cache.getUserGroupInformation(session);
+    }
+
+    @Test
+    public void errorsThrownByDestroyingAUgiAreCaught() throws Exception {
+        UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+        doThrow(new IOException("test exception")).when(provider).destroy(ugi1);
+        cache.release(session, true); // does not throw
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void releaseAnEntryNotInTheCache() {
+        // this could happen if some caller of the cache calls release twice.
+        cache.release(session, false);
+    }
+
+    private void assertStillInCache(SessionId session, UserGroupInformation ugi) throws Exception {
+        assertTrue(cache.contains(session));
+        verify(provider, never()).destroy(ugi);
+    }
+
+    private void assertNoLongerInCache(SessionId session, UserGroupInformation ugi) throws Exception {
+        assertFalse(cache.contains(session));
+        verify(provider, times(1)).destroy(ugi);
+    }
+
+    private void assertCacheSize(int expectedSize) {
+        assertEquals(expectedSize, cache.size());
+        assertEquals(expectedSize, cache.allQueuesSize());
+    }
+}
diff --git a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilterTest.java b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilterTest.java
new file mode 100644
index 0000000..c4b2c27
--- /dev/null
+++ b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilterTest.java
@@ -0,0 +1,114 @@
+package org.apache.hawq.pxf.service.servlet;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hawq.pxf.service.SessionId;
+import org.apache.hawq.pxf.service.UGICache;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import javax.servlet.FilterChain;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class SecurityServletFilterTest {
+
+    private static final String PROPERTY_KEY_USER_IMPERSONATION = "pxf.service.user.impersonation.enabled";
+
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+    private HttpServletRequest servletRequest;
+
+    @Before
+    public void setup() {
+        System.setProperty(PROPERTY_KEY_USER_IMPERSONATION, "true");
+        servletRequest = mock(HttpServletRequest.class);
+    }
+
+    @Test
+    public void throwsWhenRequiredHeaderIsEmpty() throws Exception {
+
+        expectedException.expect(IllegalArgumentException.class);
+        expectedException.expectMessage("Header X-GP-USER is empty in the request");
+
+        when(servletRequest.getHeader("X-GP-USER")).thenReturn("  ");
+
+        SecurityServletFilter securityServletFilter = new SecurityServletFilter();
+        securityServletFilter.doFilter(servletRequest, mock(ServletResponse.class), mock(FilterChain.class));
+
+    }
+
+    @Test
+    public void throwsWhenRequiredHeaderIsMissing() throws Exception {
+
+        expectedException.expect(IllegalArgumentException.class);
+        expectedException.expectMessage("Header X-GP-USER is missing in the request");
+
+        SecurityServletFilter securityServletFilter = new SecurityServletFilter();
+        securityServletFilter.doFilter(servletRequest, mock(ServletResponse.class), mock(FilterChain.class));
+    }
+
+    @Test
+    public void doesNotCleanTheUGICacheOnNonLastCalls() throws Exception {
+
+        when(servletRequest.getHeader("X-GP-USER")).thenReturn("gpadmin");
+        when(servletRequest.getHeader("X-GP-XID")).thenReturn("0");
+        when(servletRequest.getHeader("X-GP-SEGMENT-ID")).thenReturn("1");
+
+        SecurityServletFilter securityServletFilter = new SecurityServletFilter();
+        securityServletFilter.proxyUGICache = mock(UGICache.class);
+
+        when(securityServletFilter.proxyUGICache.getUserGroupInformation(any(SessionId.class))).thenReturn(mock(UserGroupInformation.class));
+
+        securityServletFilter.doFilter(servletRequest, mock(ServletResponse.class), mock(FilterChain.class));
+
+        verify(securityServletFilter.proxyUGICache).release(any(SessionId.class), eq(false));
+    }
+
+    @Test
+    public void tellsTheUGICacheToCleanItselfOnTheLastCallForASegment() throws Exception {
+
+        System.setProperty(PROPERTY_KEY_USER_IMPERSONATION, "true");
+        HttpServletRequest servletRequest = mock(HttpServletRequest.class);
+
+        when(servletRequest.getHeader("X-GP-USER")).thenReturn("gpadmin");
+        when(servletRequest.getHeader("X-GP-XID")).thenReturn("0");
+        when(servletRequest.getHeader("X-GP-SEGMENT-ID")).thenReturn("1");
+        when(servletRequest.getHeader("X-GP-LAST-FRAGMENT")).thenReturn("true");
+
+        SecurityServletFilter securityServletFilter = new SecurityServletFilter();
+        securityServletFilter.proxyUGICache = mock(UGICache.class);
+
+        when(securityServletFilter.proxyUGICache.getUserGroupInformation(any(SessionId.class))).thenReturn(mock(UserGroupInformation.class));
+
+        securityServletFilter.doFilter(servletRequest, mock(ServletResponse.class), mock(FilterChain.class));
+
+        verify(securityServletFilter.proxyUGICache).release(any(SessionId.class), eq(true));
+    }
+}