[OPENMEETINGS-2331] re-connection should be fixed
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
index fd4fa65..a7b2fab 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
@@ -270,7 +270,7 @@
 	}
 
 	public void stopRecord() {
-		releaseRecorder();
+		releaseRecorder(true);
 		chunkId = null;
 	}
 
@@ -296,6 +296,17 @@
 			log.trace("PARTICIPANT {}: Released incoming EP for {}", uid, inUid);
 
 			final WebRtcEndpoint ep = entry.getValue();
+			outgoingMedia.disconnect(ep, new Continuation<Void>() {
+				@Override
+				public void onSuccess(Void result) throws Exception {
+					log.trace("PARTICIPANT {}: Disconnected successfully incoming EP for {}", KStream.this.uid, inUid);
+				}
+
+				@Override
+				public void onError(Throwable cause) throws Exception {
+					log.warn("PARTICIPANT {}: Could not disconnect incoming EP for {}", KStream.this.uid, inUid);
+				}
+			});
 			ep.release(new Continuation<Void>() {
 				@Override
 				public void onSuccess(Void result) throws Exception {
@@ -315,19 +326,64 @@
 	public void release(IStreamProcessor processor, boolean remove) {
 		if (outgoingMedia != null) {
 			releaseListeners();
-			outgoingMedia.release();
+			outgoingMedia.release(new Continuation<Void>() {
+				@Override
+				public void onSuccess(Void result) throws Exception {
+					log.trace("PARTICIPANT {}: Released successfully", KStream.this.uid);
+				}
+
+				@Override
+				public void onError(Throwable cause) throws Exception {
+					log.warn("PARTICIPANT {}: Could not release", KStream.this.uid, cause);
+				}
+			});
+			releaseRecorder(false);
 			outgoingMedia = null;
 		}
-		releaseRecorder();
 		if (remove) {
 			processor.release(this, false);
 		}
 	}
 
-	private void releaseRecorder() {
+	private void releaseRecorder(boolean wait) {
 		if (recorder != null) {
-			recorder.stopAndWait();
-			recorder.release();
+			if (wait) {
+				recorder.stopAndWait();
+			} else {
+				recorder.stop(new Continuation<Void>() {
+					@Override
+					public void onSuccess(Void result) throws Exception {
+						log.trace("PARTICIPANT {}: Recording stopped", KStream.this.uid);
+					}
+
+					@Override
+					public void onError(Throwable cause) throws Exception {
+						log.warn("PARTICIPANT {}: Could not stop recording", KStream.this.uid, cause);
+					}
+				});
+			}
+			outgoingMedia.disconnect(recorder, new Continuation<Void>() {
+				@Override
+				public void onSuccess(Void result) throws Exception {
+					log.trace("PARTICIPANT {}: Recorder disconnected successfully", KStream.this.uid);
+				}
+
+				@Override
+				public void onError(Throwable cause) throws Exception {
+					log.warn("PARTICIPANT {}: Could not disconnect recorder", KStream.this.uid, cause);
+				}
+			});
+			recorder.release(new Continuation<Void>() {
+				@Override
+				public void onSuccess(Void result) throws Exception {
+					log.trace("PARTICIPANT {}: Recorder released successfully", KStream.this.uid);
+				}
+
+				@Override
+				public void onError(Throwable cause) throws Exception {
+					log.warn("PARTICIPANT {}: Could not release recorder", KStream.this.uid, cause);
+				}
+			});
 			recorder = null;
 		}
 	}
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
index 0276767..c2c3e94 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
@@ -32,6 +32,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
@@ -55,7 +56,6 @@
 import org.kurento.client.Endpoint;
 import org.kurento.client.EventListener;
 import org.kurento.client.KurentoClient;
-import org.kurento.client.KurentoConnectionListener;
 import org.kurento.client.MediaObject;
 import org.kurento.client.MediaPipeline;
 import org.kurento.client.ObjectCreatedEvent;
@@ -64,6 +64,7 @@
 import org.kurento.client.Tag;
 import org.kurento.client.Transaction;
 import org.kurento.client.WebRtcEndpoint;
+import org.kurento.jsonrpc.client.JsonRpcClientNettyWebSocket;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -94,7 +95,7 @@
 	private String turnMode;
 	private int turnTtl = 60; //minutes
 	private KurentoClient client;
-	private boolean connected = false;
+	private final AtomicBoolean connected = new AtomicBoolean(false);
 	private String kuid;
 	private final Map<Long, KRoom> rooms = new ConcurrentHashMap<>();
 	private Runnable check;
@@ -111,7 +112,7 @@
 	private StreamProcessor streamProcessor;
 
 	boolean isConnected() {
-		boolean connctd = client != null && !client.isClosed() && connected;
+		boolean connctd = connected.get() && client != null && !client.isClosed();
 		if (!connctd) {
 			log.warn(WARN_NO_KURENTO);
 		}
@@ -127,13 +128,46 @@
 				}
 				log.debug("Reconnecting KMS");
 				kuid = randomUUID().toString();
-				client = KurentoClient.create(kurentoWsUrl, new KConnectionListener());
+				client = KurentoClient.createFromJsonRpcClient(new JsonRpcClientNettyWebSocket(kurentoWsUrl) {
+						{
+							setTryReconnectingMaxTime(0);
+						}
+
+						private void notifyRooms(boolean connected) {
+							WebSocketHelper.sendServer(new TextRoomMessage(null, new User(), RoomMessage.Type.KURENTO_STATUS, new JSONObject().put("connected", connected).toString()));
+						}
+
+						private void onDisconnect() {
+							log.info("!!! Kurento disconnected");
+							connected.set(false);
+							notifyRooms(false);
+							clean();
+						}
+
+						@Override
+						protected void handleReconnectDisconnection(final int statusCode, final String closeReason) {
+							if (!isClosedByUser()) {
+								log.debug("{}JsonRpcWsClient disconnected from {} because {}.", label, uri, closeReason);
+								onDisconnect();
+							} else {
+								super.handleReconnectDisconnection(statusCode, closeReason);
+								onDisconnect();
+							}
+						}
+
+						@Override
+						protected void fireConnected() {
+							log.info("!!! Kurento connected");
+							connected.set(true);
+							notifyRooms(true);
+						}
+					});
 				client.getServerManager().addObjectCreatedListener(new KWatchDogCreate());
 				client.getServerManager().addObjectDestroyedListener(event ->
 					log.debug("Kurento::ObjectDestroyedEvent objectId {}, tags {}, source {}", event.getObjectId(), event.getTags(), event.getSource())
 				);
 			} catch (Exception e) {
-				connected = false;
+				connected.set(false);
 				clean();
 				log.warn("Fail to create Kurento client, will re-try in {} ms", checkTimeout);
 			}
@@ -149,18 +183,22 @@
 
 	private void clean() {
 		if (client != null) {
-			KurentoClient copy = client;
-			client = null;
-			testProcessor.destroy();
-			streamProcessor.destroy();
-			for (Entry<Long, KRoom> e : rooms.entrySet()) {
-				e.getValue().close();
-			}
-			rooms.clear();
-			if (copy != null && !copy.isClosed()) {
-				log.debug("Client will destroyed ...");
-				copy.destroy();
-				log.debug(".... Client is destroyed");
+			try {
+				KurentoClient copy = client;
+				client = null;
+				if (copy != null && !copy.isClosed()) {
+					log.debug("Client will destroyed ...");
+					copy.destroy();
+					log.debug(".... Client is destroyed");
+				}
+				testProcessor.destroy();
+				streamProcessor.destroy();
+				for (Entry<Long, KRoom> e : rooms.entrySet()) {
+					e.getValue().close();
+				}
+				rooms.clear();
+			} catch (Exception e) {
+				log.error("Unexpected error while clean-up", e);
 			}
 		}
 	}
@@ -371,38 +409,6 @@
 		return FLOWOUT_TIMEOUT_SEC;
 	}
 
-	private class KConnectionListener implements KurentoConnectionListener {
-		private void notifyRooms() {
-			WebSocketHelper.sendServer(new TextRoomMessage(null, new User(), RoomMessage.Type.KURENTO_STATUS, new JSONObject().put("connected", isConnected()).toString()));
-		}
-
-		@Override
-		public void reconnected(boolean sameServer) {
-			log.error("Kurento reconnected ? {}, this shouldn't happen", sameServer);
-		}
-
-		@Override
-		public void disconnected() {
-			log.info("Kurento disconnected");
-			connected = false;
-			notifyRooms();
-			clean();
-		}
-
-		@Override
-		public void connectionFailed() {
-			log.info("Kurento connection failed");
-			// this handled seems to be called multiple times
-		}
-
-		@Override
-		public void connected() {
-			log.info("Kurento connected");
-			connected = true;
-			notifyRooms();
-		}
-	}
-
 	private class KWatchDogCreate implements EventListener<ObjectCreatedEvent> {
 		private ScheduledExecutorService scheduler;