| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hive.service.cli.session; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.lang.reflect.Constructor; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.LongAdder; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Predicate; |
| import com.google.common.collect.Iterables; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.hadoop.hive.common.metrics.common.Metrics; |
| import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; |
| import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; |
| import org.apache.hadoop.hive.common.metrics.common.MetricsVariable; |
| import org.apache.hadoop.hive.conf.HiveConf; |
| import org.apache.hadoop.hive.conf.HiveConf.ConfVars; |
| import org.apache.hadoop.hive.ql.cleanup.SyncCleanupService; |
| import org.apache.hadoop.hive.ql.hooks.HookContext; |
| import org.apache.hadoop.hive.ql.cleanup.CleanupService; |
| import org.apache.hadoop.hive.ql.cleanup.EventualCleanupService; |
| import org.apache.hadoop.hive.ql.hooks.HookUtils; |
| import org.apache.hive.service.CompositeService; |
| import org.apache.hive.service.cli.HiveSQLException; |
| import org.apache.hive.service.cli.SessionHandle; |
| import org.apache.hive.service.cli.operation.Operation; |
| import org.apache.hive.service.cli.operation.OperationLogManager; |
| import org.apache.hive.service.cli.operation.OperationManager; |
| import org.apache.hive.service.rpc.thrift.TOpenSessionReq; |
| import org.apache.hive.service.rpc.thrift.TProtocolVersion; |
| import org.apache.hive.service.server.HiveServer2; |
| import org.apache.hive.service.server.KillQueryZookeeperManager; |
| import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * SessionManager. |
| * |
| */ |
| public class SessionManager extends CompositeService { |
| |
| public static final String INACTIVE_ERROR_MESSAGE = |
| "Cannot open sessions on an inactive HS2 instance, " + |
| "or the HS2 server leader is not ready; please use service discovery to " + |
| "connect the server leader again"; |
| private static final String FAIL_CLOSE_ERROR_MESSAGE="Cannot close the session opened " + |
| "during the HA state change time"; |
| public static final String HIVERCFILE = ".hiverc"; |
| private static final Logger LOG = LoggerFactory.getLogger(CompositeService.class); |
| private HiveConf hiveConf; |
| /** The lock that synchronizes the allowSessions flag and handleToSession map. |
| Active-passive HA first disables the connections, then closes existing one, making sure |
| there are no races between these two processes. */ |
| private final Object sessionAddLock = new Object(); |
| private boolean allowSessions; |
| private final Map<SessionHandle, HiveSession> handleToSession = |
| new ConcurrentHashMap<SessionHandle, HiveSession>(); |
| private final Map<String, LongAdder> connectionsCount = new ConcurrentHashMap<>(); |
| private int userLimit; |
| private int ipAddressLimit; |
| private int userIpAddressLimit; |
| private final OperationManager operationManager = new OperationManager(); |
| private KillQueryZookeeperManager killQueryZookeeperManager; |
| private Optional<OperationLogManager> logManager = Optional.empty(); |
| private ThreadPoolExecutor backgroundOperationPool; |
| private boolean isOperationLogEnabled; |
| private File operationLogRootDir; |
| |
| private long checkInterval; |
| private long sessionTimeout; |
| private boolean checkOperation; |
| |
| private volatile boolean shutdown; |
| // The HiveServer2 instance running this service |
| private final HiveServer2 hiveServer2; |
| private String sessionImplWithUGIclassName; |
| private String sessionImplclassName; |
| private CleanupService cleanupService; |
| |
| public SessionManager(HiveServer2 hiveServer2, boolean allowSessions) { |
| super(SessionManager.class.getSimpleName()); |
| this.hiveServer2 = hiveServer2; |
| this.allowSessions = allowSessions; |
| } |
| |
| @Override |
| public synchronized void init(HiveConf hiveConf) { |
| this.hiveConf = hiveConf; |
| //Create operation log root directory, if operation logging is enabled |
| if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { |
| initOperationLogRootDir(); |
| } |
| createBackgroundOperationPool(); |
| addService(operationManager); |
| if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY) && |
| !hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE) && |
| hiveConf.getBoolVar(ConfVars.HIVE_ZOOKEEPER_KILLQUERY_ENABLE)) { |
| killQueryZookeeperManager = new KillQueryZookeeperManager(operationManager, hiveServer2); |
| addService(killQueryZookeeperManager); |
| } |
| initSessionImplClassName(); |
| Metrics metrics = MetricsFactory.getInstance(); |
| if(metrics != null){ |
| registerOpenSesssionMetrics(metrics); |
| registerActiveSesssionMetrics(metrics); |
| } |
| |
| userLimit = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER); |
| ipAddressLimit = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS); |
| userIpAddressLimit = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS); |
| LOG.info("Connections limit are user: {} ipaddress: {} user-ipaddress: {}", userLimit, ipAddressLimit, |
| userIpAddressLimit); |
| |
| int cleanupThreadCount = hiveConf.getIntVar(ConfVars.HIVE_ASYNC_CLEANUP_SERVICE_THREAD_COUNT); |
| int cleanupQueueSize = hiveConf.getIntVar(ConfVars.HIVE_ASYNC_CLEANUP_SERVICE_QUEUE_SIZE); |
| if (cleanupThreadCount > 0) { |
| cleanupService = new EventualCleanupService(cleanupThreadCount, cleanupQueueSize); |
| } else { |
| cleanupService = SyncCleanupService.INSTANCE; |
| } |
| cleanupService.start(); |
| super.init(hiveConf); |
| } |
| |
| private void registerOpenSesssionMetrics(Metrics metrics) { |
| MetricsVariable<Integer> openSessionCnt = new MetricsVariable<Integer>() { |
| @Override |
| public Integer getValue() { |
| return getSessions().size(); |
| } |
| }; |
| MetricsVariable<Integer> openSessionTime = new MetricsVariable<Integer>() { |
| @Override |
| public Integer getValue() { |
| long sum = 0; |
| long currentTime = System.currentTimeMillis(); |
| for (HiveSession s : getSessions()) { |
| sum += currentTime - s.getCreationTime(); |
| } |
| // in case of an overflow return -1 |
| return (int) sum != sum ? -1 : (int) sum; |
| } |
| }; |
| metrics.addGauge(MetricsConstant.HS2_OPEN_SESSIONS, openSessionCnt); |
| metrics.addRatio(MetricsConstant.HS2_AVG_OPEN_SESSION_TIME, openSessionTime, openSessionCnt); |
| } |
| |
| private void registerActiveSesssionMetrics(Metrics metrics) { |
| MetricsVariable<Integer> activeSessionCnt = new MetricsVariable<Integer>() { |
| @Override |
| public Integer getValue() { |
| Iterable<HiveSession> filtered = Iterables.filter(getSessions(), new Predicate<HiveSession>() { |
| @Override |
| public boolean apply(HiveSession hiveSession) { |
| return hiveSession.getNoOperationTime() == 0L; |
| } |
| }); |
| return Iterables.size(filtered); |
| } |
| }; |
| MetricsVariable<Integer> activeSessionTime = new MetricsVariable<Integer>() { |
| @Override |
| public Integer getValue() { |
| long sum = 0; |
| long currentTime = System.currentTimeMillis(); |
| for (HiveSession s : getSessions()) { |
| if (s.getNoOperationTime() == 0L) { |
| sum += currentTime - s.getLastAccessTime(); |
| } |
| } |
| // in case of an overflow return -1 |
| return (int) sum != sum ? -1 : (int) sum; |
| } |
| }; |
| metrics.addGauge(MetricsConstant.HS2_ACTIVE_SESSIONS, activeSessionCnt); |
| metrics.addRatio(MetricsConstant.HS2_AVG_ACTIVE_SESSION_TIME, activeSessionTime, activeSessionCnt); |
| } |
| |
| private void initSessionImplClassName() { |
| this.sessionImplclassName = hiveConf.getVar(ConfVars.HIVE_SESSION_IMPL_CLASSNAME); |
| this.sessionImplWithUGIclassName = hiveConf.getVar(ConfVars.HIVE_SESSION_IMPL_WITH_UGI_CLASSNAME); |
| } |
| |
| private void createBackgroundOperationPool() { |
| int poolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS); |
| LOG.info("HiveServer2: Background operation thread pool size: " + poolSize); |
| int poolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE); |
| LOG.info("HiveServer2: Background operation thread wait queue size: " + poolQueueSize); |
| long keepAliveTime = HiveConf.getTimeVar( |
| hiveConf, ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME, TimeUnit.SECONDS); |
| LOG.info( |
| "HiveServer2: Background operation thread keepalive time: " + keepAliveTime + " seconds"); |
| |
| // Create a thread pool with #poolSize threads |
| // Threads terminate when they are idle for more than the keepAliveTime |
| // A bounded blocking queue is used to queue incoming operations, if #operations > poolSize |
| String threadPoolName = "HiveServer2-Background-Pool"; |
| final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(poolQueueSize); |
| backgroundOperationPool = new ThreadPoolExecutor(poolSize, poolSize, |
| keepAliveTime, TimeUnit.SECONDS, queue, |
| new ThreadFactoryWithGarbageCleanup(threadPoolName)); |
| backgroundOperationPool.allowCoreThreadTimeOut(true); |
| |
| checkInterval = HiveConf.getTimeVar( |
| hiveConf, ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL, TimeUnit.MILLISECONDS); |
| sessionTimeout = HiveConf.getTimeVar( |
| hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); |
| checkOperation = HiveConf.getBoolVar(hiveConf, |
| ConfVars.HIVE_SERVER2_IDLE_SESSION_CHECK_OPERATION); |
| |
| Metrics m = MetricsFactory.getInstance(); |
| if (m != null) { |
| m.addGauge(MetricsConstant.EXEC_ASYNC_QUEUE_SIZE, new MetricsVariable() { |
| @Override |
| public Object getValue() { |
| return queue.size(); |
| } |
| }); |
| m.addGauge(MetricsConstant.EXEC_ASYNC_POOL_SIZE, new MetricsVariable() { |
| @Override |
| public Object getValue() { |
| return backgroundOperationPool.getPoolSize(); |
| } |
| }); |
| } |
| } |
| |
| private void initOperationLogRootDir() { |
| operationLogRootDir = new File( |
| hiveConf.getVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION)); |
| isOperationLogEnabled = true; |
| |
| if (operationLogRootDir.exists() && !operationLogRootDir.isDirectory()) { |
| LOG.warn("The operation log root directory exists, but it is not a directory: " + |
| operationLogRootDir.getAbsolutePath()); |
| isOperationLogEnabled = false; |
| } |
| |
| if (!operationLogRootDir.exists()) { |
| if (!operationLogRootDir.mkdirs()) { |
| LOG.warn("Unable to create operation log root directory: " + |
| operationLogRootDir.getAbsolutePath()); |
| isOperationLogEnabled = false; |
| } |
| } |
| |
| if (isOperationLogEnabled) { |
| LOG.info("Operation log root directory is created: " + operationLogRootDir.getAbsolutePath()); |
| try { |
| FileUtils.forceDeleteOnExit(operationLogRootDir); |
| } catch (IOException e) { |
| LOG.warn("Failed to schedule cleanup HS2 operation logging root dir: " + |
| operationLogRootDir.getAbsolutePath(), e); |
| } |
| logManager = Optional.of(new OperationLogManager(this, hiveConf)); |
| } |
| } |
| |
| @Override |
| public synchronized void start() { |
| super.start(); |
| if (checkInterval > 0) { |
| startTimeoutChecker(); |
| } |
| } |
| |
| public CleanupService getCleanupService() { |
| return cleanupService; |
| } |
| |
| private final Object timeoutCheckerLock = new Object(); |
| |
| private void startTimeoutChecker() { |
| final long interval = Math.max(checkInterval, 3000l); // minimum 3 seconds |
| final Runnable timeoutChecker = new Runnable() { |
| @Override |
| public void run() { |
| sleepFor(interval); |
| while (!shutdown) { |
| long current = System.currentTimeMillis(); |
| for (HiveSession session : new ArrayList<HiveSession>(handleToSession.values())) { |
| if (shutdown) { |
| break; |
| } |
| if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current |
| && (!checkOperation || session.getNoOperationTime() > sessionTimeout)) { |
| SessionHandle handle = session.getSessionHandle(); |
| LOG.warn("Session " + handle + " is Timed-out (last access : " + |
| new Date(session.getLastAccessTime()) + ") and will be closed"); |
| try { |
| closeSession(handle); |
| } catch (HiveSQLException e) { |
| LOG.warn("Exception is thrown closing session " + handle, e); |
| } finally { |
| Metrics metrics = MetricsFactory.getInstance(); |
| if (metrics != null) { |
| metrics.incrementCounter(MetricsConstant.HS2_ABANDONED_SESSIONS); |
| } |
| } |
| } else { |
| session.closeExpiredOperations(); |
| } |
| } |
| sleepFor(interval); |
| } |
| } |
| |
| private void sleepFor(long interval) { |
| synchronized (timeoutCheckerLock) { |
| try { |
| timeoutCheckerLock.wait(interval); |
| } catch (InterruptedException e) { |
| // Ignore, and break. |
| } |
| } |
| } |
| }; |
| backgroundOperationPool.execute(timeoutChecker); |
| } |
| |
| private void shutdownTimeoutChecker() { |
| shutdown = true; |
| synchronized (timeoutCheckerLock) { |
| timeoutCheckerLock.notify(); |
| } |
| } |
| |
| @Override |
| public synchronized void decommission() { |
| allowSessions(false); |
| super.decommission(); |
| } |
| |
| @Override |
| public synchronized void stop() { |
| super.stop(); |
| shutdownTimeoutChecker(); |
| if (backgroundOperationPool != null) { |
| backgroundOperationPool.shutdown(); |
| cleanupService.shutdown(); |
| long timeout = hiveConf.getTimeVar( |
| ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS); |
| try { |
| backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout + |
| " seconds has been exceeded. RUNNING background operations will be shut down", e); |
| } |
| backgroundOperationPool.shutdownNow(); |
| backgroundOperationPool = null; |
| } |
| cleanupLoggingRootDir(); |
| logManager.ifPresent(lm -> lm.stop()); |
| } |
| |
| private void cleanupLoggingRootDir() { |
| if (isOperationLogEnabled) { |
| try { |
| FileUtils.forceDelete(operationLogRootDir); |
| } catch (Exception e) { |
| LOG.warn("Failed to cleanup root dir of HS2 logging: " + operationLogRootDir |
| .getAbsolutePath(), e); |
| } |
| } |
| } |
| |
| public SessionHandle openSession(TProtocolVersion protocol, String username, String password, String ipAddress, |
| Map<String, String> sessionConf) throws HiveSQLException { |
| return openSession(protocol, username, password, ipAddress, sessionConf, false, null); |
| } |
| |
| /** |
| * Opens a new session and creates a session handle. |
| * The username passed to this method is the effective username. |
| * If withImpersonation is true (==doAs true) we wrap all the calls in HiveSession |
| * within a UGI.doAs, where UGI corresponds to the effective user. |
| * @see org.apache.hive.service.cli.thrift.ThriftCLIService#getUserName(TOpenSessionReq) |
| * |
| * @param protocol |
| * @param username |
| * @param password |
| * @param ipAddress |
| * @param sessionConf |
| * @param withImpersonation |
| * @param delegationToken |
| * @return |
| * @throws HiveSQLException |
| */ |
| public SessionHandle openSession(TProtocolVersion protocol, String username, String password, String ipAddress, |
| Map<String, String> sessionConf, boolean withImpersonation, String delegationToken) |
| throws HiveSQLException { |
| return createSession(null, protocol, username, password, ipAddress, sessionConf, |
| withImpersonation, delegationToken).getSessionHandle(); |
| } |
| |
| public HiveSession createSession(SessionHandle sessionHandle, TProtocolVersion protocol, String username, |
| String password, String ipAddress, Map<String, String> sessionConf, boolean withImpersonation, |
| String delegationToken) |
| throws HiveSQLException { |
| // Check the flag opportunistically. |
| synchronized (sessionAddLock) { |
| if (!allowSessions) { |
| throw new HiveSQLException(INACTIVE_ERROR_MESSAGE); |
| } |
| } |
| // Do the expensive operations outside of any locks; we'll recheck the flag again at the end. |
| |
| // if client proxies connection, use forwarded ip-addresses instead of just the gateway |
| final List<String> forwardedAddresses = getForwardedAddresses(); |
| incrementConnections(username, ipAddress, forwardedAddresses); |
| |
| HiveSession session; |
| // If doAs is set to true for HiveServer2, we will create a proxy object for the session impl. |
| // Within the proxy object, we wrap the method call in a UserGroupInformation#doAs |
| if (withImpersonation) { |
| HiveSessionImplwithUGI hiveSessionUgi; |
| if (sessionImplWithUGIclassName == null) { |
| hiveSessionUgi = new HiveSessionImplwithUGI(sessionHandle, protocol, username, password, |
| hiveConf, ipAddress, delegationToken, forwardedAddresses); |
| } else { |
| try { |
| Class<?> clazz = Class.forName(sessionImplWithUGIclassName); |
| Constructor<?> constructor = clazz.getConstructor(SessionHandle.class, TProtocolVersion.class, String.class, |
| String.class, HiveConf.class, String.class, String.class, List.class); |
| hiveSessionUgi = (HiveSessionImplwithUGI) constructor.newInstance(sessionHandle, |
| protocol, username, password, hiveConf, ipAddress, delegationToken, forwardedAddresses); |
| } catch (Exception e) { |
| throw new HiveSQLException("Cannot initialize session class:" + sessionImplWithUGIclassName); |
| } |
| } |
| session = HiveSessionProxy.getProxy(hiveSessionUgi, hiveSessionUgi.getSessionUgi()); |
| hiveSessionUgi.setProxySession(session); |
| } else { |
| if (sessionImplclassName == null) { |
| session = new HiveSessionImpl(sessionHandle, protocol, username, password, hiveConf, |
| ipAddress, forwardedAddresses); |
| } else { |
| try { |
| Class<?> clazz = Class.forName(sessionImplclassName); |
| Constructor<?> constructor = clazz.getConstructor(SessionHandle.class, TProtocolVersion.class, |
| String.class, String.class, HiveConf.class, String.class, List.class); |
| session = (HiveSession) constructor.newInstance(sessionHandle, protocol, username, password, |
| hiveConf, ipAddress, forwardedAddresses); |
| } catch (Exception e) { |
| throw new HiveSQLException("Cannot initialize session class:" + sessionImplclassName, e); |
| } |
| } |
| } |
| session.setSessionManager(this); |
| session.setOperationManager(operationManager); |
| try { |
| session.open(sessionConf); |
| } catch (Exception e) { |
| LOG.warn("Failed to open session", e); |
| try { |
| session.close(); |
| } catch (Throwable t) { |
| LOG.warn("Error closing session", t); |
| } |
| session = null; |
| throw new HiveSQLException("Failed to open new session: " + e.getMessage(), e); |
| } |
| if (isOperationLogEnabled) { |
| session.setOperationLogSessionDir(operationLogRootDir); |
| } |
| try { |
| executeSessionHooks(session); |
| } catch (Exception e) { |
| LOG.warn("Failed to execute session hooks", e); |
| try { |
| session.close(); |
| } catch (Throwable t) { |
| LOG.warn("Error closing session", t); |
| } |
| session = null; |
| throw new HiveSQLException("Failed to execute session hooks: " + e.getMessage(), e); |
| } |
| boolean isAdded = false; |
| synchronized (sessionAddLock) { |
| if (allowSessions) { |
| handleToSession.put(session.getSessionHandle(), session); |
| isAdded = true; |
| } |
| } |
| if (!isAdded) { |
| try { |
| closeSessionInternal(session); |
| } catch (Exception e) { |
| LOG.warn("Failed to close the session opened during an HA state change; ignoring", e); |
| } |
| throw new HiveSQLException(FAIL_CLOSE_ERROR_MESSAGE); |
| } |
| LOG.info("Session opened, " + session.getSessionHandle() |
| + ", current sessions:" + getOpenSessionCount()); |
| return session; |
| } |
| |
| private void incrementConnections(final String username, final String ipAddress, |
| final List<String> forwardedAddresses) throws HiveSQLException { |
| final String clientIpAddress = getOriginClientIpAddress(ipAddress, forwardedAddresses); |
| |
| String violation = anyViolations(username, clientIpAddress); |
| // increment the counters only when there are no violations |
| if (violation == null) { |
| if (trackConnectionsPerUser(username)) { |
| connectionsCount.computeIfAbsent(username, k -> new LongAdder()).increment(); |
| } |
| |
| if (trackConnectionsPerIpAddress(clientIpAddress)) { |
| connectionsCount.computeIfAbsent(clientIpAddress, k -> new LongAdder()).increment(); |
| } |
| |
| if (trackConnectionsPerUserIpAddress(username, clientIpAddress)) { |
| connectionsCount.computeIfAbsent(username + ":" + clientIpAddress, k -> new LongAdder()).increment(); |
| } |
| } else { |
| LOG.error(violation); |
| throw new HiveSQLException(violation); |
| } |
| } |
| |
| private String getOriginClientIpAddress(final String ipAddress, final List<String> forwardedAddresses) { |
| if (forwardedAddresses == null || forwardedAddresses.isEmpty()) { |
| return ipAddress; |
| } |
| // order of forwarded ips per X-Forwarded-For http spec (client, proxy1, proxy2) |
| return forwardedAddresses.get(0); |
| } |
| |
| private void decrementConnections(final HiveSession session) { |
| final String username = session.getUserName(); |
| final String clientIpAddress = getOriginClientIpAddress(session.getIpAddress(), session.getForwardedAddresses()); |
| if (trackConnectionsPerUser(username)) { |
| connectionsCount.computeIfPresent(username, (k, v) -> v).decrement(); |
| } |
| |
| if (trackConnectionsPerIpAddress(clientIpAddress)) { |
| connectionsCount.computeIfPresent(clientIpAddress, (k, v) -> v).decrement(); |
| } |
| |
| if (trackConnectionsPerUserIpAddress(username, clientIpAddress)) { |
| connectionsCount.computeIfPresent(username + ":" + clientIpAddress, (k, v) -> v).decrement(); |
| } |
| } |
| |
| private String anyViolations(final String username, final String ipAddress) { |
| if (trackConnectionsPerUser(username) && !withinLimits(username, userLimit)) { |
| return "Connection limit per user reached (user: " + username + " limit: " + userLimit + ")"; |
| } |
| |
| if (trackConnectionsPerIpAddress(ipAddress) && !withinLimits(ipAddress, ipAddressLimit)) { |
| return "Connection limit per ipaddress reached (ipaddress: " + ipAddress + " limit: " + ipAddressLimit + ")"; |
| } |
| |
| if (trackConnectionsPerUserIpAddress(username, ipAddress) && |
| !withinLimits(username + ":" + ipAddress, userIpAddressLimit)) { |
| return "Connection limit per user:ipaddress reached (user:ipaddress: " + username + ":" + ipAddress + " limit: " + |
| userIpAddressLimit + ")"; |
| } |
| |
| return null; |
| } |
| |
| private boolean trackConnectionsPerUserIpAddress(final String username, final String ipAddress) { |
| return userIpAddressLimit > 0 && username != null && !username.isEmpty() && ipAddress != null && |
| !ipAddress.isEmpty(); |
| } |
| |
| private boolean trackConnectionsPerIpAddress(final String ipAddress) { |
| return ipAddressLimit > 0 && ipAddress != null && !ipAddress.isEmpty(); |
| } |
| |
| private boolean trackConnectionsPerUser(final String username) { |
| return userLimit > 0 && username != null && !username.isEmpty(); |
| } |
| |
| private boolean withinLimits(final String track, final int limit) { |
| if (connectionsCount.containsKey(track)) { |
| final int connectionCount = connectionsCount.get(track).intValue(); |
| if (connectionCount >= limit) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| public void closeSession(SessionHandle sessionHandle) throws HiveSQLException { |
| final HiveSession session; |
| synchronized(sessionAddLock) { |
| session = handleToSession.remove(sessionHandle); |
| if (session == null) { |
| throw new HiveSQLException("Session does not exist: " + sessionHandle); |
| } |
| LOG.info("Session closed, " + sessionHandle + ", current sessions:" + getOpenSessionCount()); |
| } |
| closeSessionInternal(session); |
| } |
| |
| private void closeSessionInternal(HiveSession session) throws HiveSQLException { |
| try { |
| session.close(); |
| } finally { |
| decrementConnections(session); |
| // Shutdown HiveServer2 if it has been deregistered from ZooKeeper and has no active sessions |
| if (!(hiveServer2 == null) && (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) |
| && (hiveServer2.isDeregisteredWithZooKeeper())) { |
| // Asynchronously shutdown this instance of HiveServer2, |
| // if there are no active client sessions |
| if (getOpenSessionCount() == 0) { |
| LOG.info("This instance of HiveServer2 has been removed from the list of server " |
| + "instances available for dynamic service discovery. " |
| + "The last client session has ended - will shutdown now."); |
| Thread shutdownThread = new Thread() { |
| @Override |
| public void run() { |
| hiveServer2.stop(); |
| } |
| }; |
| shutdownThread.start(); |
| } |
| } |
| } |
| } |
| |
| public HiveSession getSession(SessionHandle sessionHandle) throws HiveSQLException { |
| HiveSession session = handleToSession.get(sessionHandle); |
| if (session == null) { |
| throw new HiveSQLException("Invalid SessionHandle: " + sessionHandle); |
| } |
| return session; |
| } |
| |
| public OperationManager getOperationManager() { |
| return operationManager; |
| } |
| |
| @VisibleForTesting |
| public Optional<OperationLogManager> getLogManager() { |
| return logManager; |
| } |
| |
| public KillQueryZookeeperManager getKillQueryZookeeperManager() { |
| return killQueryZookeeperManager; |
| } |
| |
| private static ThreadLocal<String> threadLocalIpAddress = new ThreadLocal<String>(); |
| |
| public static void setIpAddress(String ipAddress) { |
| threadLocalIpAddress.set(ipAddress); |
| } |
| |
| public static void clearIpAddress() { |
| threadLocalIpAddress.remove(); |
| } |
| |
| public static String getIpAddress() { |
| return threadLocalIpAddress.get(); |
| } |
| |
| private static ThreadLocal<List<String>> threadLocalForwardedAddresses = new ThreadLocal<List<String>>(); |
| |
| public static void setForwardedAddresses(List<String> ipAddress) { |
| threadLocalForwardedAddresses.set(ipAddress); |
| } |
| |
| public static void clearForwardedAddresses() { |
| threadLocalForwardedAddresses.remove(); |
| } |
| |
| public static List<String> getForwardedAddresses() { |
| return threadLocalForwardedAddresses.get(); |
| } |
| |
| private static ThreadLocal<String> threadLocalUserName = new ThreadLocal<String>(){ |
| @Override |
| protected String initialValue() { |
| return null; |
| } |
| }; |
| |
| public static void setUserName(String userName) { |
| threadLocalUserName.set(userName); |
| } |
| |
| public static void clearUserName() { |
| threadLocalUserName.remove(); |
| } |
| |
| public static String getUserName() { |
| return threadLocalUserName.get(); |
| } |
| |
| private static ThreadLocal<String> threadLocalProxyUserName = new ThreadLocal<String>(){ |
| @Override |
| protected String initialValue() { |
| return null; |
| } |
| }; |
| |
| public static void setProxyUserName(String userName) { |
| LOG.debug("setting proxy user name based on query param to: " + userName); |
| threadLocalProxyUserName.set(userName); |
| } |
| |
| public static String getProxyUserName() { |
| return threadLocalProxyUserName.get(); |
| } |
| |
| public static void clearProxyUserName() { |
| threadLocalProxyUserName.remove(); |
| } |
| |
| // execute session hooks |
| private void executeSessionHooks(HiveSession session) throws Exception { |
| List<HiveSessionHook> sessionHooks = |
| HookUtils.readHooksFromConf(hiveConf, HookContext.HookType.HIVE_SERVER2_SESSION_HOOK); |
| for (HiveSessionHook sessionHook : sessionHooks) { |
| sessionHook.run(new HiveSessionHookContextImpl(session)); |
| } |
| } |
| |
| public Future<?> submitBackgroundOperation(Runnable r) { |
| return backgroundOperationPool.submit(r); |
| } |
| |
| public Collection<Operation> getOperations() { |
| return operationManager.getOperations(); |
| } |
| |
| public Collection<HiveSession> getSessions() { |
| return Collections.unmodifiableCollection(handleToSession.values()); |
| } |
| |
| public int getOpenSessionCount() { |
| return handleToSession.size(); |
| } |
| |
| public String getHiveServer2HostName() throws Exception { |
| if (hiveServer2 == null) { |
| return null; |
| } |
| return hiveServer2.getServerHost(); |
| } |
| |
| public void allowSessions(boolean b) { |
| synchronized (sessionAddLock) { |
| this.allowSessions = b; |
| } |
| } |
| } |
| |