SOLR-6874: There is a race around SocketProxy binding to it's port the way we setup JettySolrRunner and SocketProxy.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1649154 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 6e576b2..f0b8378 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -381,6 +381,9 @@
* SOLR-6779: fix /browse for schemaless example (ehatcher)
+* SOLR-6874: There is a race around SocketProxy binding to it's port the way we setup
+ JettySolrRunner and SocketProxy. (Mark Miller, Timothy Potter)
+
Optimizations
----------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java b/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java
index 76d43dd..965f35b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java
@@ -55,9 +55,7 @@
private static final transient Logger log =
LoggerFactory.getLogger(ReplicationFactorTest.class);
-
- private Map<URI,SocketProxy> proxies = new HashMap<URI,SocketProxy>();
-
+
public ReplicationFactorTest() {
super();
sliceCount = 3;
@@ -103,25 +101,8 @@
public JettySolrRunner createJetty(File solrHome, String dataDir,
String shardList, String solrConfigOverride, String schemaOverride)
throws Exception {
-
- JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), context,
- 0, solrConfigOverride, schemaOverride, false,
- getExtraServlets(), sslConfig, getExtraRequestFilters());
- jetty.setShards(shardList);
- jetty.setDataDir(getDataDir(dataDir));
-
- // setup to proxy Http requests to this server unless it is the control
- // server
- int proxyPort = getNextAvailablePort();
- jetty.setProxyPort(proxyPort);
-
- jetty.start();
-
- // create a socket proxy for the jetty server ...
- SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI());
- proxies.put(proxy.getUrl(), proxy);
-
- return jetty;
+
+ return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride);
}
protected int getNextAvailablePort() throws Exception {
@@ -320,21 +301,7 @@
Thread.sleep(2000);
ensureAllReplicasAreActive(testCollectionName, shardId, numShards, replicationFactor, 30);
}
-
- protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
- String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
- assertNotNull(replicaBaseUrl);
- URL baseUrl = new URL(replicaBaseUrl);
-
- SocketProxy proxy = proxies.get(baseUrl.toURI());
- if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
- baseUrl = new URL(baseUrl.toExternalForm() + "/");
- proxy = proxies.get(baseUrl.toURI());
- }
- assertNotNull("No proxy found for " + baseUrl + "!", proxy);
- return proxy;
- }
-
+
protected int sendDoc(int docId, int minRf) throws Exception {
UpdateRequest up = new UpdateRequest();
up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
index 35a3d39..aba22bd 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
@@ -536,16 +536,11 @@
jetty.setShards(shardList);
jetty.setDataDir(getDataDir(dataDir));
- // setup to proxy Http requests to this server unless it is the control
- // server
- int proxyPort = getNextAvailablePort();
- jetty.setProxyPort(proxyPort);
+ SocketProxy proxy = new SocketProxy(0, sslConfig == null ? false : sslConfig.isSSLMode());
+ jetty.setProxyPort(proxy.getListenPort());
jetty.start();
-
- // create a socket proxy for the jetty server ...
- SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI());
+ proxy.open(jetty.getBaseUrl().toURI());
proxies.put(proxy.getUrl(), proxy);
-
return jetty;
}
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java b/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java
index 500cd9a..77bfdb0 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java
@@ -59,24 +59,40 @@
public List<Bridge> connections = new LinkedList<Bridge>();
- private int listenPort = 0;
+ private final int listenPort;
private int receiveBufferSize = -1;
private boolean pauseAtStart = false;
private int acceptBacklog = 50;
-
- public SocketProxy() throws Exception {}
-
- public SocketProxy(URI uri) throws Exception {
- this(0, uri);
+
+ private boolean usesSSL;
+
+ public SocketProxy() throws Exception {
+ this(0, false);
}
- public SocketProxy(int port, URI uri) throws Exception {
- listenPort = port;
+ public SocketProxy( boolean useSSL) throws Exception {
+ this(0, useSSL);
+ }
+
+ public SocketProxy(int port, boolean useSSL) throws Exception {
+ int listenPort = port;
+ this.usesSSL = useSSL;
+ serverSocket = createServerSocket(useSSL);
+ serverSocket.setReuseAddress(true);
+ if (receiveBufferSize > 0) {
+ serverSocket.setReceiveBufferSize(receiveBufferSize);
+ }
+ serverSocket.bind(new InetSocketAddress(listenPort), acceptBacklog);
+ this.listenPort = serverSocket.getLocalPort();
+ }
+
+ public void open(URI uri) throws Exception {
target = uri;
- open();
+ proxyUrl = urlFromSocket(target, serverSocket);
+ doOpen();
}
public String toString() {
@@ -91,18 +107,8 @@
target = tcpBrokerUri;
}
- public void open() throws Exception {
- serverSocket = createServerSocket(target);
- serverSocket.setReuseAddress(true);
- if (receiveBufferSize > 0) {
- serverSocket.setReceiveBufferSize(receiveBufferSize);
- }
- if (proxyUrl == null) {
- serverSocket.bind(new InetSocketAddress(listenPort), acceptBacklog);
- proxyUrl = urlFromSocket(target, serverSocket);
- } else {
- serverSocket.bind(new InetSocketAddress(proxyUrl.getPort()));
- }
+ private void doOpen() throws Exception {
+
acceptor = new Acceptor(serverSocket, target);
if (pauseAtStart) {
acceptor.pause();
@@ -112,19 +118,19 @@
closed = new CountDownLatch(1);
}
- private boolean isSsl(URI target) {
- return "ssl".equals(target.getScheme());
+ public int getListenPort() {
+ return listenPort;
}
- private ServerSocket createServerSocket(URI target) throws Exception {
- if (isSsl(target)) {
+ private ServerSocket createServerSocket(boolean useSSL) throws Exception {
+ if (useSSL) {
return SSLServerSocketFactory.getDefault().createServerSocket();
}
return new ServerSocket();
}
- private Socket createSocket(URI target) throws Exception {
- if (isSsl(target)) {
+ private Socket createSocket(boolean useSSL) throws Exception {
+ if (useSSL) {
return SSLSocketFactory.getDefault().createSocket();
}
return new Socket();
@@ -175,7 +181,16 @@
public void reopen() {
log.info("Re-opening connectivity to "+getUrl());
try {
- open();
+ if (proxyUrl == null) {
+ throw new IllegalStateException("Can not call open before open(URI uri).");
+ }
+ serverSocket = createServerSocket(usesSSL);
+ serverSocket.setReuseAddress(true);
+ if (receiveBufferSize > 0) {
+ serverSocket.setReceiveBufferSize(receiveBufferSize);
+ }
+ serverSocket.bind(new InetSocketAddress(proxyUrl.getPort()));
+ doOpen();
} catch (Exception e) {
log.debug("exception on reopen url:" + getUrl(), e);
}
@@ -257,7 +272,7 @@
public Bridge(Socket socket, URI target) throws Exception {
receiveSocket = socket;
- sendSocket = createSocket(target);
+ sendSocket = createSocket(usesSSL);
if (receiveBufferSize > 0) {
sendSocket.setReceiveBufferSize(receiveBufferSize);
}
@@ -291,9 +306,9 @@
}
private void linkWithThreads(Socket source, Socket dest) {
- requestThread = new Pump(source, dest);
+ requestThread = new Pump("Request", source, dest);
requestThread.start();
- responseThread = new Pump(dest, source);
+ responseThread = new Pump("Response", dest, source);
responseThread.start();
}
@@ -303,8 +318,8 @@
private Socket destination;
private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
- public Pump(Socket source, Socket dest) {
- super("SocketProxy-DataTransfer-" + source.getPort() + ":"
+ public Pump(String kind, Socket source, Socket dest) {
+ super("SocketProxy-"+kind+"-" + source.getPort() + ":"
+ dest.getPort());
src = source;
destination = dest;
@@ -321,17 +336,34 @@
public void run() {
byte[] buf = new byte[1024];
+
try {
- InputStream in = src.getInputStream();
- OutputStream out = destination.getOutputStream();
+ src.setSoTimeout(10 * 1000);
+ } catch (SocketException e) {
+ log.error("Failed to set socket timeout on "+src+" due to: "+e);
+ throw new RuntimeException(e);
+ }
+
+ InputStream in = null;
+ OutputStream out = null;
+ try {
+ in = src.getInputStream();
+ out = destination.getOutputStream();
while (true) {
- int len = in.read(buf);
+ int len = -1;
+ try {
+ len = in.read(buf);
+ } catch (SocketTimeoutException ste) {
+ log.warn(ste+" when reading from "+src);
+ }
+
if (len == -1) {
log.debug("read eof from:" + src);
break;
}
pause.get().await();
- out.write(buf, 0, len);
+ if (len > 0)
+ out.write(buf, 0, len);
}
} catch (Exception e) {
log.debug("read/write failed, reason: " + e.getLocalizedMessage());
@@ -342,6 +374,21 @@
close();
}
} catch (Exception ignore) {}
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (Exception exc) {
+ log.debug(exc+" when closing InputStream on socket: "+src);
+ }
+ }
+ if (out != null) {
+ try {
+ out.close();
+ } catch (Exception exc) {
+ log.debug(exc+" when closing OutputStream on socket: "+destination);
+ }
+ }
}
}
}