SENTRY-1869: Try to use pool with idle connections first (Alex Kolbasov, reviewed by Vamsee Yarlagadda)
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportPool.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportPool.java
index 04a515a..80a8165 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportPool.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportPool.java
@@ -145,6 +145,15 @@
poolConfig);
}
+ /**
+ * Get an open transport instance.
+ * The instance can be connected to any of the available servers.
+ * We are trying to randomly load-balance between servers (unless it is
+ * disabled in configuration).
+ *
+ * @return connected transport
+ * @throws Exception if connection tto both servers fails
+ */
public TTransportWrapper getTransport() throws Exception {
List<HostAndPort> servers;
// If we are doing load balancing and there is more then one server,
@@ -156,29 +165,40 @@
servers = endpoints;
}
- // Try to get a connection from one of the pools
+ // Try to get a connection from one of the pools.
Exception failure = null;
- for(HostAndPort addr: servers) {
- try {
- TTransportWrapper transport =
- isPoolEnabled ?
- pool.borrowObject(addr) :
- transportFactory.getTransport(addr);
- LOGGER.debug("[{}] obtained transport {}", id, transport);
- if (LOGGER.isDebugEnabled() && isPoolEnabled) {
- LOGGER.debug("Currently {} active connections, {} idle connections",
- pool.getNumActive(), pool.getNumIdle());
+ boolean ignoreEmptyPool = true;
+ for (int attempt = 0; attempt < 2; attempt++) {
+ // First only attempt to borrow from pools which have some idle connections
+ // If this fails, try with all pools
+ for (HostAndPort addr : servers) {
+ if (isPoolEnabled && ignoreEmptyPool && (pool.getNumIdle(addr) == 0)) {
+ LOGGER.debug("Ignoring empty pool {}", addr);
+ ignoreEmptyPool = false;
+ continue;
}
- return transport;
- } catch (IllegalStateException e) {
- // Should not happen
- LOGGER.error("Unexpected error from pool {}", id, e);
- failure = e;
- } catch (Exception e) {
- LOGGER.error("Failed to obtain transport for {}: {}",
- addr, e.getMessage());
- failure = e;
+ try {
+ TTransportWrapper transport =
+ isPoolEnabled ?
+ pool.borrowObject(addr) :
+ transportFactory.getTransport(addr);
+ LOGGER.debug("[{}] obtained transport {}", id, transport);
+ if (LOGGER.isDebugEnabled() && isPoolEnabled) {
+ LOGGER.debug("Currently {} active connections, {} idle connections",
+ pool.getNumActive(), pool.getNumIdle());
+ }
+ return transport;
+ } catch (IllegalStateException e) {
+ // Should not happen
+ LOGGER.error("Unexpected error from pool {}", id, e);
+ failure = e;
+ } catch (Exception e) {
+ LOGGER.error("Failed to obtain transport for {}: {}",
+ addr, e.getMessage());
+ failure = e;
+ }
}
+ ignoreEmptyPool = false;
}
// Failed to borrow connect to any endpoint
assert failure != null;