Minor cleanup of port search utility code and test (#1815)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
index 890f817..78cf45b 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
@@ -35,10 +35,10 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Predicate;
+import java.util.stream.IntStream;
import org.apache.accumulo.core.conf.PropertyType.PortRange;
import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
-import org.apache.accumulo.core.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,12 +95,10 @@
* Given a property and a deprecated property determine which one to use base on which one is set.
*/
public Property resolve(Property property, Property deprecatedProperty) {
- if (isPropertySet(property, true)) {
+ if (isPropertySet(property, true) || !isPropertySet(deprecatedProperty, true)) {
return property;
- } else if (isPropertySet(deprecatedProperty, true)) {
- return deprecatedProperty;
} else {
- return property;
+ return deprecatedProperty;
}
}
@@ -277,7 +275,8 @@
/**
* Gets a property of type {@link PropertyType#PORT}, interpreting the value properly (as an
- * integer within the range of non-privileged ports).
+ * integer within the range of non-privileged ports). Consider using
+ * {@link #getPortStream(Property)}, if an array is not needed.
*
* @param property
* property to get
@@ -286,38 +285,32 @@
* if the property is of the wrong type
*/
public int[] getPort(Property property) {
+ return getPortStream(property).toArray();
+ }
+
+ /**
+ * Same as {@link #getPort(Property)}, but as an {@link IntStream}.
+ */
+ public IntStream getPortStream(Property property) {
checkType(property, PropertyType.PORT);
String portString = get(property);
- int[] ports = null;
try {
- Pair<Integer,Integer> portRange = PortRange.parse(portString);
- int low = portRange.getFirst();
- int high = portRange.getSecond();
- ports = new int[high - low + 1];
- for (int i = 0, j = low; j <= high; i++, j++) {
- ports[i] = j;
- }
+ return PortRange.parse(portString);
} catch (IllegalArgumentException e) {
- ports = new int[1];
try {
int port = Integer.parseInt(portString);
- if (port == 0) {
- ports[0] = port;
+ if (port == 0 || PortRange.VALID_RANGE.contains(port)) {
+ return IntStream.of(port);
} else {
- if (port < 1024 || port > 65535) {
- log.error("Invalid port number {}; Using default {}", port, property.getDefaultValue());
- ports[0] = Integer.parseInt(property.getDefaultValue());
- } else {
- ports[0] = port;
- }
+ log.error("Invalid port number {}; Using default {}", port, property.getDefaultValue());
+ return IntStream.of(Integer.parseInt(property.getDefaultValue()));
}
} catch (NumberFormatException e1) {
throw new IllegalArgumentException("Invalid port syntax. Must be a single positive "
+ "integers or a range (M-N) of positive integers");
}
}
- return ports;
}
/**
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
index 0b607ce..6b4de25 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
@@ -26,9 +26,9 @@
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
-import org.apache.accumulo.core.util.Pair;
import org.apache.commons.lang3.Range;
import org.apache.hadoop.fs.Path;
@@ -310,7 +310,7 @@
public static class PortRange extends Matches {
- private static final Range<Integer> VALID_RANGE = Range.between(1024, 65535);
+ public static final Range<Integer> VALID_RANGE = Range.between(1024, 65535);
public PortRange(final String pattern) {
super(pattern);
@@ -330,7 +330,7 @@
}
}
- public static Pair<Integer,Integer> parse(String portRange) {
+ public static IntStream parse(String portRange) {
int idx = portRange.indexOf('-');
if (idx != -1) {
int low = Integer.parseInt(portRange.substring(0, idx));
@@ -339,7 +339,7 @@
throw new IllegalArgumentException(
"Invalid port range specified, only 1024 to 65535 supported.");
}
- return new Pair<>(low, high);
+ return IntStream.rangeClosed(low, high);
}
throw new IllegalArgumentException(
"Invalid port range specification, must use M-N notation.");
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index c8ecc36..da816bf 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -34,12 +34,14 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import javax.net.ssl.SSLServerSocket;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.PropertyType;
+import org.apache.accumulo.core.conf.PropertyType.PortRange;
import org.apache.accumulo.core.rpc.SslConnectionParams;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory;
@@ -90,12 +92,9 @@
* array of ports
* @return array of HostAndPort objects
*/
- public static HostAndPort[] getHostAndPorts(String hostname, int[] ports) {
- HostAndPort[] addresses = new HostAndPort[ports.length];
- for (int i = 0; i < ports.length; i++) {
- addresses[i] = HostAndPort.fromParts(hostname, ports[i]);
- }
- return addresses;
+ public static HostAndPort[] getHostAndPorts(String hostname, IntStream ports) {
+ return ports.mapToObj(port -> HostAndPort.fromParts(hostname, port))
+ .toArray(HostAndPort[]::new);
}
/**
@@ -108,9 +107,8 @@
public static Map<Integer,Property> getReservedPorts(AccumuloConfiguration config) {
return EnumSet.allOf(Property.class).stream()
.filter(p -> p.getType() == PropertyType.PORT && p != Property.TSERV_CLIENTPORT)
- .flatMap(rp -> {
- return Arrays.stream(config.getPort(rp)).mapToObj(portNum -> new Pair<>(portNum, rp));
- }).collect(Collectors.toMap(Pair::getFirst, Pair::getSecond));
+ .flatMap(rp -> config.getPortStream(rp).mapToObj(portNum -> new Pair<>(portNum, rp)))
+ .collect(Collectors.toMap(Pair::getFirst, Pair::getSecond));
}
/**
@@ -145,7 +143,7 @@
Property maxMessageSizeProperty) throws UnknownHostException {
final AccumuloConfiguration config = service.getConfiguration();
- final int[] portHint = config.getPort(portHintProperty);
+ final IntStream portHint = config.getPortStream(portHintProperty);
int minThreads = 2;
if (minThreadProperty != null) {
@@ -208,7 +206,7 @@
continue;
}
- if (port > 65535) {
+ if (PortRange.VALID_RANGE.isBefore(port)) {
break;
}
try {
@@ -632,7 +630,7 @@
// would require changes in how the transports
// work at the Thrift layer to ensure that both the SSL and SASL handshakes function. SASL's
// quality of protection addresses privacy issues.
- checkArgument(!(sslParams != null && saslParams != null),
+ checkArgument(sslParams == null || saslParams == null,
"Cannot start a Thrift server using both SSL and SASL");
ServerAddress serverAddress = null;
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
index ad08b52..c6ee2a1 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
@@ -24,6 +24,7 @@
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -37,15 +38,12 @@
import org.apache.accumulo.core.clientImpl.thrift.ClientService.Iface;
import org.apache.accumulo.core.clientImpl.thrift.ClientService.Processor;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.client.ClientServiceHandler;
-import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.metrics.Metrics;
import org.apache.accumulo.server.rpc.ServerAddress;
import org.apache.accumulo.server.rpc.TServerUtils;
@@ -54,28 +52,13 @@
import org.apache.thrift.transport.TServerSocket;
import org.easymock.EasyMock;
import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
public class TServerUtilsTest {
- protected static class TestServerConfigurationFactory extends ServerConfigurationFactory {
-
- private ConfigurationCopy conf = null;
-
- public TestServerConfigurationFactory(ServerContext context) {
- super(context, SiteConfiguration.auto());
- conf = new ConfigurationCopy(DefaultConfiguration.getInstance());
- }
-
- @Override
- public synchronized AccumuloConfiguration getSystemConfiguration() {
- return conf;
- }
-
- }
-
private static class TServerWithoutES extends TServer {
boolean stopCalled;
@@ -107,18 +90,21 @@
@Test
public void testStopTServer_ES() {
TServerSocket socket = createNiceMock(TServerSocket.class);
+ replay(socket);
TServerWithES s = new TServerWithES(socket);
TServerUtils.stopTServer(s);
assertTrue(s.stopCalled);
- verify(s.executorService_);
+ verify(socket, s.executorService_);
}
@Test
public void testStopTServer_NoES() {
TServerSocket socket = createNiceMock(TServerSocket.class);
+ replay(socket);
TServerWithoutES s = new TServerWithoutES(socket);
TServerUtils.stopTServer(s);
assertTrue(s.stopCalled);
+ verify(socket);
}
@Test
@@ -127,42 +113,35 @@
// not dying is enough
}
- private static AccumuloConfiguration config =
- new ConfigurationCopy(DefaultConfiguration.getInstance());
+ private ServerContext ctx;
+ private final ConfigurationCopy conf = new ConfigurationCopy(DefaultConfiguration.getInstance());
- private static ServerContext createMockContext() {
- ServerContext context = EasyMock.createMock(ServerContext.class);
- expect(context.getZooReaderWriter()).andReturn(null);
- expect(context.getProperties()).andReturn(new Properties()).anyTimes();
- expect(context.getZooKeepers()).andReturn("").anyTimes();
- expect(context.getInstanceName()).andReturn("instance").anyTimes();
- expect(context.getZooKeepersSessionTimeOut()).andReturn(1).anyTimes();
- expect(context.getInstanceID()).andReturn("11111").anyTimes();
- expect(context.getConfiguration()).andReturn(config).anyTimes();
- return context;
+ @Before
+ public void createMockServerContext() {
+ ctx = EasyMock.createMock(ServerContext.class);
+ expect(ctx.getZooReaderWriter()).andReturn(null).anyTimes();
+ expect(ctx.getProperties()).andReturn(new Properties()).anyTimes();
+ expect(ctx.getZooKeepers()).andReturn("").anyTimes();
+ expect(ctx.getInstanceName()).andReturn("instance").anyTimes();
+ expect(ctx.getZooKeepersSessionTimeOut()).andReturn(1).anyTimes();
+ expect(ctx.getInstanceID()).andReturn("11111").anyTimes();
+ expect(ctx.getConfiguration()).andReturn(conf).anyTimes();
+ expect(ctx.getThriftServerType()).andReturn(ThriftServerType.THREADPOOL).anyTimes();
+ expect(ctx.getServerSslParams()).andReturn(null).anyTimes();
+ expect(ctx.getSaslParams()).andReturn(null).anyTimes();
+ expect(ctx.getClientTimeoutInMillis()).andReturn((long) 1000).anyTimes();
+ replay(ctx);
}
- private static ServerContext createReplayMockInfo() {
- ServerContext context = createMockContext();
- replay(context);
- return context;
- }
-
- private static final TestServerConfigurationFactory factory =
- new TestServerConfigurationFactory(createReplayMockInfo());
-
@After
- public void resetProperty() {
- ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.TSERV_CLIENTPORT,
- Property.TSERV_CLIENTPORT.getDefaultValue());
- ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.TSERV_PORTSEARCH,
- Property.TSERV_PORTSEARCH.getDefaultValue());
+ public void verifyMockServerContext() {
+ verify(ctx);
}
@Test
public void testStartServerZeroPort() throws Exception {
TServer server = null;
- ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.TSERV_CLIENTPORT, "0");
+ conf.set(Property.TSERV_CLIENTPORT, "0");
try {
ServerAddress address = startServer();
assertNotNull(address);
@@ -180,8 +159,7 @@
public void testStartServerFreePort() throws Exception {
TServer server = null;
int port = getFreePort(1024);
- ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.TSERV_CLIENTPORT,
- Integer.toString(port));
+ conf.set(Property.TSERV_CLIENTPORT, Integer.toString(port));
try {
ServerAddress address = startServer();
assertNotNull(address);
@@ -201,8 +179,7 @@
int port = getFreePort(1024);
InetAddress addr = InetAddress.getByName("localhost");
// Bind to the port
- ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.TSERV_CLIENTPORT,
- Integer.toString(port));
+ conf.set(Property.TSERV_CLIENTPORT, Integer.toString(port));
try (ServerSocket s = new ServerSocket(port, 50, addr)) {
assertNotNull(s);
startServer();
@@ -216,9 +193,8 @@
int[] port = findTwoFreeSequentialPorts(1024);
// Bind to the port
InetAddress addr = InetAddress.getByName("localhost");
- ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.TSERV_CLIENTPORT,
- Integer.toString(port[0]));
- ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.TSERV_PORTSEARCH, "true");
+ conf.set(Property.TSERV_CLIENTPORT, Integer.toString(port[0]));
+ conf.set(Property.TSERV_PORTSEARCH, "true");
try (ServerSocket s = new ServerSocket(port[0], 50, addr)) {
assertNotNull(s);
ServerAddress address = startServer();
@@ -249,32 +225,26 @@
// 5. One free port - this is the one that we expect the TServer to finally use
int[] ports = findTwoFreeSequentialPorts(1024);
int tserverDefaultPort = ports[0];
- ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.TSERV_CLIENTPORT,
- Integer.toString(tserverDefaultPort));
+ conf.set(Property.TSERV_CLIENTPORT, Integer.toString(tserverDefaultPort));
int gcPort = ports[1];
- ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.GC_PORT,
- Integer.toString(gcPort));
+ conf.set(Property.GC_PORT, Integer.toString(gcPort));
ports = findTwoFreeSequentialPorts(gcPort + 1);
int masterPort = ports[0];
- ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.MASTER_CLIENTPORT,
- Integer.toString(masterPort));
+ conf.set(Property.MASTER_CLIENTPORT, Integer.toString(masterPort));
int monitorPort = ports[1];
- ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.MONITOR_PORT,
- Integer.toString(monitorPort));
+ conf.set(Property.MONITOR_PORT, Integer.toString(monitorPort));
ports = findTwoFreeSequentialPorts(monitorPort + 1);
int masterReplCoordPort = ports[0];
- ((ConfigurationCopy) factory.getSystemConfiguration())
- .set(Property.MASTER_REPLICATION_COORDINATOR_PORT, Integer.toString(masterReplCoordPort));
+ conf.set(Property.MASTER_REPLICATION_COORDINATOR_PORT, Integer.toString(masterReplCoordPort));
int tserverFinalPort = ports[1];
- ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.TSERV_PORTSEARCH, "true");
+ conf.set(Property.TSERV_PORTSEARCH, "true");
// Ensure that the TServer client port we set above is NOT in the reserved ports
- Map<Integer,Property> reservedPorts =
- TServerUtils.getReservedPorts(factory.getSystemConfiguration());
- assertTrue(!reservedPorts.containsKey(tserverDefaultPort));
+ Map<Integer,Property> reservedPorts = TServerUtils.getReservedPorts(conf);
+ assertFalse(reservedPorts.containsKey(tserverDefaultPort));
// Ensure that all the ports we assigned (GC, Master, Monitor) are included in the reserved
// ports as returned by TServerUtils
@@ -305,8 +275,7 @@
TServer server = null;
int[] port = findTwoFreeSequentialPorts(1024);
String portRange = Integer.toString(port[0]) + "-" + Integer.toString(port[1]);
- ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.TSERV_CLIENTPORT,
- portRange);
+ conf.set(Property.TSERV_CLIENTPORT, portRange);
try {
ServerAddress address = startServer();
assertNotNull(address);
@@ -329,8 +298,7 @@
int[] port = findTwoFreeSequentialPorts(1024);
String portRange = Integer.toString(port[0]) + "-" + Integer.toString(port[1]);
// Bind to the port
- ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.TSERV_CLIENTPORT,
- portRange);
+ conf.set(Property.TSERV_CLIENTPORT, portRange);
try (ServerSocket s = new ServerSocket(port[0], 50, addr)) {
assertNotNull(s);
ServerAddress address = startServer();
@@ -374,15 +342,6 @@
}
private ServerAddress startServer() throws Exception {
- ServerContext ctx = createMock(ServerContext.class);
- expect(ctx.getZooReaderWriter()).andReturn(null).anyTimes();
- expect(ctx.getInstanceID()).andReturn("instance").anyTimes();
- expect(ctx.getConfiguration()).andReturn(factory.getSystemConfiguration()).anyTimes();
- expect(ctx.getThriftServerType()).andReturn(ThriftServerType.THREADPOOL);
- expect(ctx.getServerSslParams()).andReturn(null).anyTimes();
- expect(ctx.getSaslParams()).andReturn(null).anyTimes();
- expect(ctx.getClientTimeoutInMillis()).andReturn((long) 1000).anyTimes();
- replay(ctx);
ClientServiceHandler clientHandler = new ClientServiceHandler(ctx, null);
Iface rpcProxy = TraceUtil.wrapService(clientHandler);
Processor<Iface> processor = new Processor<>(rpcProxy);
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 1ca98f0..4776583 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -36,6 +36,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.accumulo.core.Constants;
@@ -654,7 +655,7 @@
} else {
processor = new Processor<>(rpcProxy);
}
- int[] port = getConfiguration().getPort(Property.GC_PORT);
+ IntStream port = getConfiguration().getPortStream(Property.GC_PORT);
HostAndPort[] addresses = TServerUtils.getHostAndPorts(getHostname(), port);
long maxMessageSize = getConfiguration().getAsBytes(Property.GENERAL_MAX_MESSAGE_SIZE);
try {