blob: febb3ed9e4b9eb5e41624d59e7e97c51b375e2a8 [file] [log] [blame]
package org.apache.hawq.pxf.service;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Ticker;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
/**
* Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they
* have not been accessed for UGI_CACHE_EXPIRY milliseconds.
* <p>
* The motivation for caching is that destroying UGIs is slow. The alternative, creating and
* destroying a UGI per-request, is wasteful.
*/
public class UGICache {
private static final Log LOG = LogFactory.getLog(UGICache.class);
private final Map<SessionId, Entry> cache = new ConcurrentHashMap<>();
// There is a separate DelayQueue for each segment (also being used for locking)
private final Map<Integer, DelayQueue<Entry>> expirationQueueMap = new HashMap<>();
private final UGIProvider ugiProvider;
private final Ticker ticker;
final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes
/**
* Create a UGICache with the given {@link Ticker} and {@link UGIProvider}. Intended for use by
* tests which need to mock UGI creation/destruction and the current time.
*/
UGICache(UGIProvider provider, Ticker ticker) {
this.ticker = ticker;
this.ugiProvider = provider;
}
/**
* Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to
* create and destroy UserGroupInformation instances.
*/
public UGICache() {
this(new UGIProvider(), Ticker.systemTicker());
}
/**
* If a UGI for the given session exists in the cache, returns it. Otherwise, creates a new
* proxy UGI. In either case this method increments the reference count of the UGI. This method
* also destroys expired, unreferenced UGIs for the same segmentId as the given session.
*
* @param session The user from the session is impersonated by the proxy UGI.
* @return the proxy UGI for the given session.
*/
public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException {
Integer segmentId = session.getSegmentId();
String user = session.getUser();
DelayQueue<Entry> delayQueue = getExpirationQueue(segmentId);
synchronized (delayQueue) {
// Use the opportunity to cleanup any expired entries
cleanup(delayQueue);
Entry entry = cache.get(session);
if (entry == null) {
if (LOG.isDebugEnabled()) {
LOG.debug(session + " Creating proxy user = " + user);
}
entry = new Entry(ticker, ugiProvider.createProxyUGI(user), session);
delayQueue.offer(entry);
cache.put(session, entry);
}
entry.incrementRefCount();
return entry.getUGI();
}
}
/**
* Decrement reference count for the given session's UGI. Resets the time at which the UGI will
* expire to UGI_CACHE_EXPIRY milliseconds in the future.
*
* @param session the session for which we want to release the UGI.
* @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it
* is now unreferenced).
*/
public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) {
Entry entry = cache.get(session);
if (entry == null) {
throw new IllegalStateException("Cannot release UGI for this session; it is not cached: " + session);
}
DelayQueue<Entry> expirationQueue = getExpirationQueue(session.getSegmentId());
synchronized (expirationQueue) {
entry.decrementRefCount();
expirationQueue.remove(entry);
if (cleanImmediatelyIfNoRefs && entry.isNotInUse()) {
closeUGI(entry);
} else {
// Reset expiration time and put it back in the queue
// only when we don't close the UGI
entry.resetTime();
expirationQueue.offer(entry);
}
}
}
/**
* @return the size of the cache
*/
int size() {
return cache.size();
}
/**
* This method is not thread-safe, and is intended to be called in tests.
*
* @return the sum of the sizes of the internal queues
*/
int allQueuesSize() {
int count = 0;
for (DelayQueue queue : expirationQueueMap.values()) {
count += queue.size();
}
return count;
}
/**
* This method is O(n) in the number of cache entries and should only be called in tests.
*
* @param session
* @return determine whether the session is in the internal cache
*/
boolean contains(SessionId session) {
DelayQueue<Entry> expirationQueue = getExpirationQueue(session.getSegmentId());
synchronized (expirationQueue) {
Entry entry = cache.get(session);
return entry != null && expirationQueue.contains(entry);
}
}
/**
* Get the queue of cache entries associated with a segment, creating it if it doesn't yet
* exist. This lets us lazily populate the expirationQueueMap.
*
* @param segmentId
* @return the {@link DelayQueue} associated to the segment.
*/
private DelayQueue<Entry> getExpirationQueue(Integer segmentId) {
DelayQueue<Entry> queue = expirationQueueMap.get(segmentId);
if (queue == null) {
synchronized (expirationQueueMap) {
queue = expirationQueueMap.get(segmentId);
if (queue == null) {
queue = new DelayQueue<>();
expirationQueueMap.put(segmentId, queue);
}
}
}
return queue;
}
/**
* Iterate through all the entries in the queue and close expired {@link UserGroupInformation},
* otherwise it resets the timer for every non-expired entry.
*
* @param expirationQueue
*/
private void cleanup(DelayQueue<Entry> expirationQueue) {
Entry expiredUGI;
while ((expiredUGI = expirationQueue.poll()) != null) {
if (expiredUGI.isNotInUse()) {
closeUGI(expiredUGI);
} else {
// The UGI object is still being used by another thread
String fsMsg = "FileSystem for proxy user = " + expiredUGI.getSession().getUser();
if (LOG.isDebugEnabled()) {
LOG.debug(expiredUGI.getSession().toString() + " Skipping close of " + fsMsg);
}
// Place it back in the queue if still in use and was not closed
expiredUGI.resetTime();
expirationQueue.offer(expiredUGI);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Delay Queue Size for segment " +
expiredUGI.getSession().getSegmentId() + " = " + expirationQueue.size());
}
}
}
/**
* This method must be called from a synchronized block for the delayQueue for the given
* session.getSegmentId(). Removes the cachedUGI from the internal cache and then passes it to
* {@link UGIProvider} to destroy the UGI.
*
* @param expiredUGI
*/
private void closeUGI(Entry expiredUGI) {
SessionId session = expiredUGI.getSession();
String fsMsg = "FileSystem for proxy user = " + session.getUser();
if (LOG.isDebugEnabled()) {
LOG.debug(session.toString() + " Closing " + fsMsg +
" (Cache Size = " + cache.size() + ")");
}
try {
// Remove it from cache, as cache now has an
// expired entry which is not in progress
cache.remove(session);
ugiProvider.destroy(expiredUGI.getUGI());
} catch (Throwable t) {
LOG.warn(session.toString() + " Error closing " + fsMsg, t);
}
}
/**
* Stores a {@link UserGroupInformation}, and determines when to expire the UGI.
*/
private static class Entry implements Delayed {
private volatile long startTime;
private final SessionId session;
private final UserGroupInformation proxyUGI;
private final AtomicInteger referenceCount = new AtomicInteger();
private final Ticker ticker;
/**
* Creates a new UGICache Entry.
*
* @param ticker
* @param proxyUGI
* @param session
*/
Entry(Ticker ticker, UserGroupInformation proxyUGI, SessionId session) {
this.ticker = ticker;
this.proxyUGI = proxyUGI;
this.session = session;
}
/**
* @return the Cached {@link UserGroupInformation}.
*/
public UserGroupInformation getUGI() {
return proxyUGI;
}
/**
* @return the session associated to the {@link UserGroupInformation}.
*/
public SessionId getSession() {
return session;
}
/**
* @return true if the UGI is being referenced by a session, false otherwise
*/
private boolean isNotInUse() {
return referenceCount.get() <= 0;
}
/**
* Increments the number of references accessing the {@link UserGroupInformation}.
*/
void incrementRefCount() {
referenceCount.incrementAndGet();
}
/**
* Decrements the number of references accessing the {@link UserGroupInformation}.
*/
void decrementRefCount() {
int count = referenceCount.decrementAndGet();
if (count < 0) {
throw new IllegalStateException("UGICache.Entry referenceCount may not be decremented past 0.");
}
}
/**
* Resets the timer for removing this Entry from the cache.
*/
void resetTime() {
startTime = currentTimeMillis();
}
/**
* {@inheritDoc}
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(getDelayMillis(), TimeUnit.MILLISECONDS);
}
/**
* Compare the expiry time of this cache entry to another cache entry's expiry time.
*
* @param other a UGICache.Entry (passing any other kind of Delayed produces an error)
* @see java.lang.Comparable<>#compareTo(java.lang.Comparable<>)
*/
@Override
public int compareTo(Delayed other) {
if (!(other instanceof Entry)) return 1;
Entry that = (Entry) other;
return Long.compare(this.getDelayMillis(), that.getDelayMillis());
}
/**
* @return the number of milliseconds remaining before this cache entry expires.
*/
private long getDelayMillis() {
return (startTime + UGI_CACHE_EXPIRY) - currentTimeMillis();
}
/**
* @return the current Unix timestamp in milliseconds (equivalent to {@link
* System}.currentTimeMillis)
*/
private long currentTimeMillis() {
return ticker.read() / 1000;
}
}
}