[MINOR] Federated retry on connection error
This commit adds a small retry block for federated requests that
fail due to connection issues. This in practice means if the request
does not get through we retry the instruction first with 100 ms wait
time up to 1sec after 5 retries.
In general this made the tests for me run more smoothly, since we would
not fail on workers not being started, but instead we would retry enough
for the workers to start.
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
index 0e0f837..3b332d5 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
@@ -20,6 +20,7 @@
package org.apache.sysds.runtime.controlprogram.federated;
import java.io.Serializable;
+import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
@@ -143,9 +144,8 @@
if(!_dataType.isMatrix() && !_dataType.isFrame())
throw new DMLRuntimeException("Federated datatype \"" + _dataType.toString() + "\" is not supported.");
_varID = id;
- FederatedRequest request = (mtd != null) ?
- new FederatedRequest(RequestType.READ_VAR, id, mtd) :
- new FederatedRequest(RequestType.READ_VAR, id);
+ FederatedRequest request = (mtd != null) ? new FederatedRequest(RequestType.READ_VAR, id,
+ mtd) : new FederatedRequest(RequestType.READ_VAR, id);
request.appendParam(_filepath);
request.appendParam(_dataType.name());
return executeFederatedOperation(request);
@@ -175,7 +175,20 @@
* @param request the requested operation
* @return the response
*/
- public synchronized static Future<FederatedResponse> executeFederatedOperation(InetSocketAddress address,
+ public static Future<FederatedResponse> executeFederatedOperation(InetSocketAddress address,
+ FederatedRequest... request) {
+ return executeFederatedOperation(address, 1, request);
+ }
+
+ /**
+ * Executes an federated operation on a federated worker.
+ *
+ * @param address socket address (incl host and port)
+ * @param retry the retry count
+ * @param request the requested operation
+ * @return the response
+ */
+ public synchronized static Future<FederatedResponse> executeFederatedOperation(InetSocketAddress address, int retry,
FederatedRequest... request) {
try {
final Bootstrap b = new Bootstrap();
@@ -196,11 +209,28 @@
return handler.getProm();
}
catch(Exception e) {
+ if(e instanceof ConnectException) {
+
+ if(retry < 5) {
+ try {
+ // Increasing retry timeout
+ Thread.sleep(200 * retry);
+ }
+ catch(Exception e2) {
+ throw new DMLRuntimeException(e);
+ }
+ return executeFederatedOperation(address, retry + 1, request);
+ }
+ else {
+ throw new DMLRuntimeException(e);
+ }
+ }
throw new DMLRuntimeException("Failed sending federated operation", e);
}
}
- private static ChannelInitializer<SocketChannel> createChannel(InetSocketAddress address, DataRequestHandler handler){
+ private static ChannelInitializer<SocketChannel> createChannel(InetSocketAddress address,
+ DataRequestHandler handler) {
final int timeout = ConfigurationManager.getFederatedTimeout();
final boolean ssl = ConfigurationManager.isFederatedSSL();
@@ -240,9 +270,8 @@
}
}
- private static SslHandler createSSLHandler(SocketChannel ch, InetSocketAddress address){
- return SslConstructor().context.newHandler(ch.alloc(), address.getAddress().getHostAddress(),
- address.getPort());
+ private static SslHandler createSSLHandler(SocketChannel ch, InetSocketAddress address) {
+ return SslConstructor().context.newHandler(ch.alloc(), address.getAddress().getHostAddress(), address.getPort());
}
public static void resetFederatedSites() {
@@ -313,19 +342,20 @@
public static class FederatedRequestEncoder extends ObjectEncoder {
@Override
- protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, Serializable msg,
- boolean preferDirect) throws Exception {
+ protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, Serializable msg, boolean preferDirect)
+ throws Exception {
int initCapacity = 256; // default initial capacity
if(msg instanceof FederatedRequest[]) {
initCapacity = 0;
try {
- for(FederatedRequest fr : (FederatedRequest[])msg) {
+ for(FederatedRequest fr : (FederatedRequest[]) msg) {
int frSize = Math.toIntExact(fr.estimateSerializationBufferSize());
if(Integer.MAX_VALUE - initCapacity < frSize) // summed sizes exceed integer limits
throw new ArithmeticException("Overflow.");
initCapacity += frSize;
}
- } catch(ArithmeticException ae) { // size of federated request exceeds integer limits
+ }
+ catch(ArithmeticException ae) { // size of federated request exceeds integer limits
initCapacity = Integer.MAX_VALUE;
}
}