[OPENMEETINGS-2331] re-connection should be fixed
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
index fd4fa65..a7b2fab 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
@@ -270,7 +270,7 @@
}
public void stopRecord() {
- releaseRecorder();
+ releaseRecorder(true);
chunkId = null;
}
@@ -296,6 +296,17 @@
log.trace("PARTICIPANT {}: Released incoming EP for {}", uid, inUid);
final WebRtcEndpoint ep = entry.getValue();
+ outgoingMedia.disconnect(ep, new Continuation<Void>() {
+ @Override
+ public void onSuccess(Void result) throws Exception {
+ log.trace("PARTICIPANT {}: Disconnected successfully incoming EP for {}", KStream.this.uid, inUid);
+ }
+
+ @Override
+ public void onError(Throwable cause) throws Exception {
+ log.warn("PARTICIPANT {}: Could not disconnect incoming EP for {}", KStream.this.uid, inUid);
+ }
+ });
ep.release(new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
@@ -315,19 +326,64 @@
public void release(IStreamProcessor processor, boolean remove) {
if (outgoingMedia != null) {
releaseListeners();
- outgoingMedia.release();
+ outgoingMedia.release(new Continuation<Void>() {
+ @Override
+ public void onSuccess(Void result) throws Exception {
+ log.trace("PARTICIPANT {}: Released successfully", KStream.this.uid);
+ }
+
+ @Override
+ public void onError(Throwable cause) throws Exception {
+ log.warn("PARTICIPANT {}: Could not release", KStream.this.uid, cause);
+ }
+ });
+ releaseRecorder(false);
outgoingMedia = null;
}
- releaseRecorder();
if (remove) {
processor.release(this, false);
}
}
- private void releaseRecorder() {
+ private void releaseRecorder(boolean wait) {
if (recorder != null) {
- recorder.stopAndWait();
- recorder.release();
+ if (wait) {
+ recorder.stopAndWait();
+ } else {
+ recorder.stop(new Continuation<Void>() {
+ @Override
+ public void onSuccess(Void result) throws Exception {
+ log.trace("PARTICIPANT {}: Recording stopped", KStream.this.uid);
+ }
+
+ @Override
+ public void onError(Throwable cause) throws Exception {
+ log.warn("PARTICIPANT {}: Could not stop recording", KStream.this.uid, cause);
+ }
+ });
+ }
+ outgoingMedia.disconnect(recorder, new Continuation<Void>() {
+ @Override
+ public void onSuccess(Void result) throws Exception {
+ log.trace("PARTICIPANT {}: Recorder disconnected successfully", KStream.this.uid);
+ }
+
+ @Override
+ public void onError(Throwable cause) throws Exception {
+ log.warn("PARTICIPANT {}: Could not disconnect recorder", KStream.this.uid, cause);
+ }
+ });
+ recorder.release(new Continuation<Void>() {
+ @Override
+ public void onSuccess(Void result) throws Exception {
+ log.trace("PARTICIPANT {}: Recorder released successfully", KStream.this.uid);
+ }
+
+ @Override
+ public void onError(Throwable cause) throws Exception {
+ log.warn("PARTICIPANT {}: Could not release recorder", KStream.this.uid, cause);
+ }
+ });
recorder = null;
}
}
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
index 0276767..c2c3e94 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
@@ -32,6 +32,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@@ -55,7 +56,6 @@
import org.kurento.client.Endpoint;
import org.kurento.client.EventListener;
import org.kurento.client.KurentoClient;
-import org.kurento.client.KurentoConnectionListener;
import org.kurento.client.MediaObject;
import org.kurento.client.MediaPipeline;
import org.kurento.client.ObjectCreatedEvent;
@@ -64,6 +64,7 @@
import org.kurento.client.Tag;
import org.kurento.client.Transaction;
import org.kurento.client.WebRtcEndpoint;
+import org.kurento.jsonrpc.client.JsonRpcClientNettyWebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -94,7 +95,7 @@
private String turnMode;
private int turnTtl = 60; //minutes
private KurentoClient client;
- private boolean connected = false;
+ private final AtomicBoolean connected = new AtomicBoolean(false);
private String kuid;
private final Map<Long, KRoom> rooms = new ConcurrentHashMap<>();
private Runnable check;
@@ -111,7 +112,7 @@
private StreamProcessor streamProcessor;
boolean isConnected() {
- boolean connctd = client != null && !client.isClosed() && connected;
+ boolean connctd = connected.get() && client != null && !client.isClosed();
if (!connctd) {
log.warn(WARN_NO_KURENTO);
}
@@ -127,13 +128,46 @@
}
log.debug("Reconnecting KMS");
kuid = randomUUID().toString();
- client = KurentoClient.create(kurentoWsUrl, new KConnectionListener());
+ client = KurentoClient.createFromJsonRpcClient(new JsonRpcClientNettyWebSocket(kurentoWsUrl) {
+ {
+ setTryReconnectingMaxTime(0);
+ }
+
+ private void notifyRooms(boolean connected) {
+ WebSocketHelper.sendServer(new TextRoomMessage(null, new User(), RoomMessage.Type.KURENTO_STATUS, new JSONObject().put("connected", connected).toString()));
+ }
+
+ private void onDisconnect() {
+ log.info("!!! Kurento disconnected");
+ connected.set(false);
+ notifyRooms(false);
+ clean();
+ }
+
+ @Override
+ protected void handleReconnectDisconnection(final int statusCode, final String closeReason) {
+ if (!isClosedByUser()) {
+ log.debug("{}JsonRpcWsClient disconnected from {} because {}.", label, uri, closeReason);
+ onDisconnect();
+ } else {
+ super.handleReconnectDisconnection(statusCode, closeReason);
+ onDisconnect();
+ }
+ }
+
+ @Override
+ protected void fireConnected() {
+ log.info("!!! Kurento connected");
+ connected.set(true);
+ notifyRooms(true);
+ }
+ });
client.getServerManager().addObjectCreatedListener(new KWatchDogCreate());
client.getServerManager().addObjectDestroyedListener(event ->
log.debug("Kurento::ObjectDestroyedEvent objectId {}, tags {}, source {}", event.getObjectId(), event.getTags(), event.getSource())
);
} catch (Exception e) {
- connected = false;
+ connected.set(false);
clean();
log.warn("Fail to create Kurento client, will re-try in {} ms", checkTimeout);
}
@@ -149,18 +183,22 @@
private void clean() {
if (client != null) {
- KurentoClient copy = client;
- client = null;
- testProcessor.destroy();
- streamProcessor.destroy();
- for (Entry<Long, KRoom> e : rooms.entrySet()) {
- e.getValue().close();
- }
- rooms.clear();
- if (copy != null && !copy.isClosed()) {
- log.debug("Client will destroyed ...");
- copy.destroy();
- log.debug(".... Client is destroyed");
+ try {
+ KurentoClient copy = client;
+ client = null;
+ if (copy != null && !copy.isClosed()) {
+ log.debug("Client will destroyed ...");
+ copy.destroy();
+ log.debug(".... Client is destroyed");
+ }
+ testProcessor.destroy();
+ streamProcessor.destroy();
+ for (Entry<Long, KRoom> e : rooms.entrySet()) {
+ e.getValue().close();
+ }
+ rooms.clear();
+ } catch (Exception e) {
+ log.error("Unexpected error while clean-up", e);
}
}
}
@@ -371,38 +409,6 @@
return FLOWOUT_TIMEOUT_SEC;
}
- private class KConnectionListener implements KurentoConnectionListener {
- private void notifyRooms() {
- WebSocketHelper.sendServer(new TextRoomMessage(null, new User(), RoomMessage.Type.KURENTO_STATUS, new JSONObject().put("connected", isConnected()).toString()));
- }
-
- @Override
- public void reconnected(boolean sameServer) {
- log.error("Kurento reconnected ? {}, this shouldn't happen", sameServer);
- }
-
- @Override
- public void disconnected() {
- log.info("Kurento disconnected");
- connected = false;
- notifyRooms();
- clean();
- }
-
- @Override
- public void connectionFailed() {
- log.info("Kurento connection failed");
- // this handled seems to be called multiple times
- }
-
- @Override
- public void connected() {
- log.info("Kurento connected");
- connected = true;
- notifyRooms();
- }
- }
-
private class KWatchDogCreate implements EventListener<ObjectCreatedEvent> {
private ScheduledExecutorService scheduler;