blob: c2c3e942d77c737e4fca7f0878bc69fad2a21427 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License") + you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openmeetings.core.remote;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import org.apache.directory.api.util.Strings;
import org.apache.openmeetings.core.util.WebSocketHelper;
import org.apache.openmeetings.db.dao.record.RecordingChunkDao;
import org.apache.openmeetings.db.dao.room.RoomDao;
import org.apache.openmeetings.db.entity.basic.Client;
import org.apache.openmeetings.db.entity.basic.Client.Activity;
import org.apache.openmeetings.db.entity.basic.Client.StreamDesc;
import org.apache.openmeetings.db.entity.basic.IWsClient;
import org.apache.openmeetings.db.entity.room.Room;
import org.apache.openmeetings.db.entity.room.Room.Right;
import org.apache.openmeetings.db.entity.user.User;
import org.apache.openmeetings.db.manager.IClientManager;
import org.apache.openmeetings.db.util.ws.RoomMessage;
import org.apache.openmeetings.db.util.ws.TextRoomMessage;
import org.kurento.client.Endpoint;
import org.kurento.client.EventListener;
import org.kurento.client.KurentoClient;
import org.kurento.client.MediaObject;
import org.kurento.client.MediaPipeline;
import org.kurento.client.ObjectCreatedEvent;
import org.kurento.client.PlayerEndpoint;
import org.kurento.client.RecorderEndpoint;
import org.kurento.client.Tag;
import org.kurento.client.Transaction;
import org.kurento.client.WebRtcEndpoint;
import org.kurento.jsonrpc.client.JsonRpcClientNettyWebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import com.github.openjson.JSONArray;
import com.github.openjson.JSONObject;
public class KurentoHandler {
private static final Logger log = LoggerFactory.getLogger(KurentoHandler.class);
public static final String PARAM_ICE = "iceServers";
public static final String PARAM_CANDIDATE = "candidate";
private static final String WARN_NO_KURENTO = "Media Server is not accessible";
public static final String MODE_TEST = "test";
public static final String TAG_KUID = "kuid";
public static final String TAG_MODE = "mode";
public static final String TAG_ROOM = "roomId";
private static final String HMAC_SHA1_ALGORITHM = "HmacSHA1";
private final ScheduledExecutorService kmsRecheckScheduler = Executors.newScheduledThreadPool(1);
public static final String KURENTO_TYPE = "kurento";
private static int FLOWOUT_TIMEOUT_SEC = 5;
private long checkTimeout = 120000; //ms
private long objCheckTimeout = 200; //ms
private int watchThreadCount = 10;
private String kurentoWsUrl;
private String turnUrl;
private String turnUser;
private String turnSecret;
private String turnMode;
private int turnTtl = 60; //minutes
private KurentoClient client;
private final AtomicBoolean connected = new AtomicBoolean(false);
private String kuid;
private final Map<Long, KRoom> rooms = new ConcurrentHashMap<>();
private Runnable check;
@Autowired
private IClientManager cm;
@Autowired
private RoomDao roomDao;
@Autowired
private RecordingChunkDao chunkDao;
@Autowired
private TestStreamProcessor testProcessor;
@Autowired
private StreamProcessor streamProcessor;
boolean isConnected() {
boolean connctd = connected.get() && client != null && !client.isClosed();
if (!connctd) {
log.warn(WARN_NO_KURENTO);
}
return connctd;
}
@PostConstruct
public void init() {
check = () -> {
try {
if (client != null) {
return;
}
log.debug("Reconnecting KMS");
kuid = randomUUID().toString();
client = KurentoClient.createFromJsonRpcClient(new JsonRpcClientNettyWebSocket(kurentoWsUrl) {
{
setTryReconnectingMaxTime(0);
}
private void notifyRooms(boolean connected) {
WebSocketHelper.sendServer(new TextRoomMessage(null, new User(), RoomMessage.Type.KURENTO_STATUS, new JSONObject().put("connected", connected).toString()));
}
private void onDisconnect() {
log.info("!!! Kurento disconnected");
connected.set(false);
notifyRooms(false);
clean();
}
@Override
protected void handleReconnectDisconnection(final int statusCode, final String closeReason) {
if (!isClosedByUser()) {
log.debug("{}JsonRpcWsClient disconnected from {} because {}.", label, uri, closeReason);
onDisconnect();
} else {
super.handleReconnectDisconnection(statusCode, closeReason);
onDisconnect();
}
}
@Override
protected void fireConnected() {
log.info("!!! Kurento connected");
connected.set(true);
notifyRooms(true);
}
});
client.getServerManager().addObjectCreatedListener(new KWatchDogCreate());
client.getServerManager().addObjectDestroyedListener(event ->
log.debug("Kurento::ObjectDestroyedEvent objectId {}, tags {}, source {}", event.getObjectId(), event.getTags(), event.getSource())
);
} catch (Exception e) {
connected.set(false);
clean();
log.warn("Fail to create Kurento client, will re-try in {} ms", checkTimeout);
}
};
kmsRecheckScheduler.scheduleAtFixedRate(check, 0L, checkTimeout, MILLISECONDS);
}
@PreDestroy
public void destroy() {
clean();
kmsRecheckScheduler.shutdownNow();
}
private void clean() {
if (client != null) {
try {
KurentoClient copy = client;
client = null;
if (copy != null && !copy.isClosed()) {
log.debug("Client will destroyed ...");
copy.destroy();
log.debug(".... Client is destroyed");
}
testProcessor.destroy();
streamProcessor.destroy();
for (Entry<Long, KRoom> e : rooms.entrySet()) {
e.getValue().close();
}
rooms.clear();
} catch (Exception e) {
log.error("Unexpected error while clean-up", e);
}
}
}
private static Map<String, String> tagsAsMap(MediaObject pipe) {
Map<String, String> map = new HashMap<>();
for (Tag t : pipe.getTags()) {
map.put(t.getKey(), t.getValue());
}
return map;
}
Transaction beginTransaction() {
return client.beginTransaction();
}
public void onMessage(IWsClient inClient, JSONObject msg) {
if (!isConnected()) {
sendError(inClient, "Multimedia server is inaccessible");
return;
}
final String cmdId = msg.getString("id");
if (MODE_TEST.equals(msg.optString(TAG_MODE))) {
testProcessor.onMessage(inClient, cmdId, msg);
} else {
final Client c = (Client)inClient;
if (c == null || c.getRoomId() == null) {
log.warn("Incoming message from invalid user");
return;
}
streamProcessor.onMessage(c, cmdId, msg);
}
}
public JSONObject getRecordingUser(Long roomId) {
if (!isConnected()) {
return new JSONObject();
}
return getRoom(roomId).getRecordingUser();
}
public void leaveRoom(Client c) {
remove(c);
WebSocketHelper.sendAll(newKurentoMsg()
.put("id", "clientLeave")
.put("uid", c.getUid())
.toString()
);
}
void sendShareUpdated(StreamDesc sd) {
sendClient(sd.getSid(), newKurentoMsg()
.put("id", "shareUpdated")
.put("stream", sd.toJson())
);
}
public void sendClient(String sid, JSONObject msg) {
WebSocketHelper.sendClient(cm.getBySid(sid), msg);
}
public static void sendError(IWsClient c, String msg) {
WebSocketHelper.sendClient(c, newKurentoMsg()
.put("id", "error")
.put("message", msg));
}
public void remove(IWsClient c) {
if (!isConnected() || c == null) {
return;
}
if (!(c instanceof Client)) {
testProcessor.remove(c);
return;
}
streamProcessor.remove((Client)c);
}
KRoom getRoom(Long roomId) {
log.debug("Searching for room {}", roomId);
KRoom room = rooms.get(roomId);
if (room == null) {
log.debug("Room {} does not exist. Will create now!", roomId);
Room r = roomDao.get(roomId);
Transaction t = beginTransaction();
MediaPipeline pipe = client.createMediaPipeline(t);
pipe.addTag(t, TAG_KUID, kuid);
pipe.addTag(t, TAG_ROOM, String.valueOf(roomId));
t.commit();
room = new KRoom(streamProcessor, r, pipe, chunkDao);
rooms.put(roomId, room);
}
log.debug("Room {} found!", roomId);
return room;
}
public Collection<KRoom> getRooms() {
return rooms.values();
}
static JSONObject newKurentoMsg() {
return new JSONObject().put("type", KURENTO_TYPE);
}
public static boolean activityAllowed(Client c, Activity a, Room room) {
boolean r = false;
switch (a) {
case AUDIO:
r = c.hasRight(Right.AUDIO);
break;
case VIDEO:
r = !room.isAudioOnly() && c.hasRight(Right.VIDEO);
break;
case AUDIO_VIDEO:
r = !room.isAudioOnly() && c.hasRight(Right.AUDIO) && c.hasRight(Right.VIDEO);
break;
default:
break;
}
return r;
}
public JSONArray getTurnServers() {
return getTurnServers(false);
}
JSONArray getTurnServers(final boolean test) {
JSONArray arr = new JSONArray();
if (!Strings.isEmpty(turnUrl)) {
try {
JSONObject turn = new JSONObject();
if ("rest".equalsIgnoreCase(turnMode)) {
Mac mac = Mac.getInstance(HMAC_SHA1_ALGORITHM);
mac.init(new SecretKeySpec(turnSecret.getBytes(), HMAC_SHA1_ALGORITHM));
StringBuilder user = new StringBuilder()
.append((test ? 60 : turnTtl * 60) + System.currentTimeMillis() / 1000L);
if (!Strings.isEmpty(turnUser)) {
user.append(':').append(turnUser);
}
turn.put("username", user)
.put("credential", Base64.getEncoder().encodeToString(mac.doFinal(user.toString().getBytes())));
} else {
turn.put("username", turnUser)
.put("credential", turnSecret);
}
final String fturnUrl = "turn:" + turnUrl;
turn.put("url", fturnUrl); // old-school
turn.put("urls", fturnUrl);
arr.put(turn);
} catch (NoSuchAlgorithmException|InvalidKeyException e) {
log.error("Unexpected error while creating turn", e);
}
}
return arr;
}
KurentoClient getClient() {
return client;
}
String getKuid() {
return kuid;
}
public void setCheckTimeout(long checkTimeout) {
this.checkTimeout = checkTimeout;
}
public void setObjCheckTimeout(long objCheckTimeout) {
this.objCheckTimeout = objCheckTimeout;
}
public void setWatchThreadCount(int watchThreadCount) {
this.watchThreadCount = watchThreadCount;
}
public void setKurentoWsUrl(String kurentoWsUrl) {
this.kurentoWsUrl = kurentoWsUrl;
}
public void setTurnUrl(String turnUrl) {
this.turnUrl = turnUrl;
}
public void setTurnUser(String turnUser) {
this.turnUser = turnUser;
}
public void setTurnSecret(String turnSecret) {
this.turnSecret = turnSecret;
}
public void setTurnMode(String turnMode) {
this.turnMode = turnMode;
}
public void setTurnTtl(int turnTtl) {
this.turnTtl = turnTtl;
}
public void setFlowoutTimeout(int timeout) {
FLOWOUT_TIMEOUT_SEC = timeout;
}
static int getFlowoutTimeout() {
return FLOWOUT_TIMEOUT_SEC;
}
private class KWatchDogCreate implements EventListener<ObjectCreatedEvent> {
private ScheduledExecutorService scheduler;
public KWatchDogCreate() {
scheduler = Executors.newScheduledThreadPool(watchThreadCount);
}
@Override
public void onEvent(ObjectCreatedEvent evt) {
log.debug("Kurento::ObjectCreated -> {}, source {}", evt.getObject(), evt.getSource());
if (evt.getObject() instanceof MediaPipeline) {
// room created
final String roid = evt.getObject().getId();
scheduler.schedule(() -> {
if (client == null) {
return;
}
// still alive
MediaPipeline pipe = client.getById(roid, MediaPipeline.class);
Map<String, String> tags = tagsAsMap(pipe);
if (validTestPipeline(tags)) {
return;
}
if (kuid.equals(tags.get(TAG_KUID))) {
KRoom r = rooms.get(Long.valueOf(tags.get(TAG_ROOM)));
if (r.getPipeline().getId().equals(pipe.getId())) {
return;
} else if (r != null) {
rooms.remove(r.getRoomId());
r.close();
}
}
log.warn("Invalid MediaPipeline {} detected, will be dropped, tags: {}", pipe.getId(), tags);
pipe.release();
}, objCheckTimeout, MILLISECONDS);
} else if (evt.getObject() instanceof Endpoint) {
// endpoint created
Endpoint curPoint = (Endpoint)evt.getObject();
final String eoid = curPoint.getId();
Class<? extends Endpoint> clazz = null;
if (curPoint instanceof WebRtcEndpoint) {
clazz = WebRtcEndpoint.class;
} else if (curPoint instanceof RecorderEndpoint) {
clazz = RecorderEndpoint.class;
} else if (curPoint instanceof PlayerEndpoint) {
clazz = PlayerEndpoint.class;
}
final Class<? extends Endpoint> fClazz = clazz;
scheduler.schedule(() -> {
if (client == null || fClazz == null) {
return;
}
// still alive
Endpoint point = client.getById(eoid, fClazz);
if (validTestPipeline(point.getMediaPipeline())) {
return;
}
Map<String, String> tags = tagsAsMap(point);
KStream stream = streamProcessor.getByUid(tags.get("outUid"));
log.debug("New Endpoint {} detected, tags: {}, kStream: {}", point.getId(), tags, stream);
if (stream != null && stream.contains(tags.get("uid"))) {
return;
}
log.warn("Invalid Endpoint {} detected, will be dropped, tags: {}", point.getId(), tags);
point.release();
}, objCheckTimeout, MILLISECONDS);
}
}
private boolean validTestPipeline(MediaPipeline pipeline) {
return validTestPipeline(tagsAsMap(pipeline));
}
private boolean validTestPipeline(Map<String, String> tags) {
return kuid.equals(tags.get(TAG_KUID)) && MODE_TEST.equals(tags.get(TAG_MODE)) && MODE_TEST.equals(tags.get(TAG_ROOM));
}
}
}