| /* |
| |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License") + you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.openmeetings.mediaserver.remote; |
| |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| |
| import java.security.InvalidKeyException; |
| import java.security.NoSuchAlgorithmException; |
| import java.util.Base64; |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.stream.Collectors; |
| |
| import javax.annotation.PostConstruct; |
| import javax.annotation.PreDestroy; |
| import javax.crypto.Mac; |
| import javax.crypto.spec.SecretKeySpec; |
| |
| import org.apache.openmeetings.core.util.WebSocketHelper; |
| import org.apache.openmeetings.db.dao.room.RoomDao; |
| import org.apache.openmeetings.db.entity.basic.Client; |
| import org.apache.openmeetings.db.entity.basic.Client.Activity; |
| import org.apache.openmeetings.db.entity.basic.Client.StreamDesc; |
| import org.apache.openmeetings.db.entity.basic.IWsClient; |
| import org.apache.openmeetings.db.entity.room.Room; |
| import org.apache.openmeetings.db.entity.room.Room.Right; |
| import org.apache.openmeetings.db.entity.user.User; |
| import org.apache.openmeetings.db.manager.IClientManager; |
| import org.apache.openmeetings.db.util.ws.RoomMessage; |
| import org.apache.openmeetings.db.util.ws.TextRoomMessage; |
| import org.apache.wicket.util.string.Strings; |
| import org.kurento.client.CertificateKeyType; |
| import org.kurento.client.Continuation; |
| import org.kurento.client.Endpoint; |
| import org.kurento.client.EventListener; |
| import org.kurento.client.KurentoClient; |
| import org.kurento.client.MediaObject; |
| import org.kurento.client.MediaPipeline; |
| import org.kurento.client.ObjectCreatedEvent; |
| import org.kurento.client.PlayerEndpoint; |
| import org.kurento.client.RecorderEndpoint; |
| import org.kurento.client.RtpEndpoint; |
| import org.kurento.client.Tag; |
| import org.kurento.client.Transaction; |
| import org.kurento.client.WebRtcEndpoint; |
| import org.kurento.jsonrpc.client.JsonRpcClientNettyWebSocket; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.beans.factory.annotation.Autowired; |
| import org.springframework.beans.factory.annotation.Value; |
| import org.springframework.stereotype.Component; |
| |
| import com.github.openjson.JSONArray; |
| import com.github.openjson.JSONObject; |
| |
| @Component |
| public class KurentoHandler { |
| private static final Logger log = LoggerFactory.getLogger(KurentoHandler.class); |
| public static final String PARAM_ICE = "iceServers"; |
| public static final String PARAM_CANDIDATE = "candidate"; |
| private static final String WARN_NO_KURENTO = "Media Server is not accessible"; |
| public static final String MODE_TEST = "test"; |
| public static final String TAG_KUID = "kuid"; |
| public static final String TAG_MODE = "mode"; |
| public static final String TAG_ROOM = "roomId"; |
| public static final String TAG_STREAM_UID = "streamUid"; |
| private static final String HMAC_SHA1_ALGORITHM = "HmacSHA1"; |
| private final ScheduledExecutorService kmsRecheckScheduler = Executors.newScheduledThreadPool(1); |
| public static final String KURENTO_TYPE = "kurento"; |
| private static int flowoutTimeout = 5; |
| @Value("${kurento.ws.url}") |
| private String kurentoWsUrl; |
| @Value("${kurento.turn.url}") |
| private String turnUrl; |
| @Value("${kurento.turn.user}") |
| private String turnUser; |
| @Value("${kurento.turn.secret}") |
| private String turnSecret; |
| @Value("${kurento.turn.mode}") |
| private String turnMode; |
| @Value("${kurento.turn.ttl}") |
| private int turnTtl = 60; //minutes |
| @Value("${kurento.check.timeout}") |
| private long checkTimeout = 120000; //ms |
| @Value("${kurento.object.check.timeout}") |
| private long objCheckTimeout = 200; //ms |
| @Value("${kurento.watch.thread.count}") |
| private int watchThreadCount = 10; |
| @Value("${kurento.kuid}") |
| private String kuid; |
| private CertificateKeyType certificateType; |
| private KurentoClient client; |
| private final AtomicBoolean connected = new AtomicBoolean(false); |
| private final Map<Long, KRoom> rooms = new ConcurrentHashMap<>(); |
| private final Set<String> ignoredKuids = new HashSet<>(); |
| |
| @Autowired |
| private IClientManager cm; |
| @Autowired |
| private RoomDao roomDao; |
| @Autowired |
| private TestStreamProcessor testProcessor; |
| @Autowired |
| private StreamProcessor streamProcessor; |
| |
| boolean isConnected() { |
| boolean connctd = connected.get() && client != null && !client.isClosed(); |
| if (!connctd) { |
| log.warn(WARN_NO_KURENTO); |
| } |
| return connctd; |
| } |
| |
| @PostConstruct |
| public void init() { |
| Runnable check = () -> { |
| try { |
| if (client != null) { |
| return; |
| } |
| log.debug("Reconnecting KMS"); |
| client = KurentoClient.createFromJsonRpcClient(new JsonRpcClientNettyWebSocket(kurentoWsUrl) { |
| { |
| setTryReconnectingMaxTime(0); |
| } |
| |
| private void notifyRooms(boolean connected) { |
| WebSocketHelper.sendServer(new TextRoomMessage(null, new User(), RoomMessage.Type.KURENTO_STATUS, new JSONObject().put("connected", connected).toString())); |
| } |
| |
| private void onDisconnect() { |
| log.info("!!! Kurento disconnected"); |
| connected.set(false); |
| notifyRooms(false); |
| clean(); |
| } |
| |
| @Override |
| protected void handleReconnectDisconnection(final int statusCode, final String closeReason) { |
| if (!isClosedByUser()) { |
| log.debug("{}JsonRpcWsClient disconnected from {} because {}.", label, uri, closeReason); |
| onDisconnect(); |
| } else { |
| super.handleReconnectDisconnection(statusCode, closeReason); |
| onDisconnect(); |
| } |
| } |
| |
| @Override |
| protected void fireConnected() { |
| log.info("!!! Kurento connected"); |
| connected.set(true); |
| notifyRooms(true); |
| } |
| }); |
| client.getServerManager().addObjectCreatedListener(new KWatchDogCreate()); |
| client.getServerManager().addObjectDestroyedListener(event -> |
| log.debug("Kurento::ObjectDestroyedEvent objectId {}, tags {}, source {}", event.getObjectId(), event.getTags(), event.getSource()) |
| ); |
| } catch (Exception e) { |
| connected.set(false); |
| clean(); |
| log.warn("Fail to create Kurento client, will re-try in {} ms", checkTimeout, e); |
| } |
| }; |
| kmsRecheckScheduler.scheduleAtFixedRate(check, 0L, checkTimeout, MILLISECONDS); |
| } |
| |
| @PreDestroy |
| public void destroy() { |
| clean(); |
| kmsRecheckScheduler.shutdownNow(); |
| } |
| |
| private void clean() { |
| if (client != null) { |
| try { |
| KurentoClient copy = client; |
| client = null; |
| if (!copy.isClosed()) { |
| log.debug("Client will be destroyed ..."); |
| copy.destroy(); |
| log.debug(".... Client is destroyed"); |
| } |
| testProcessor.destroy(); |
| streamProcessor.destroy(); |
| for (Entry<Long, KRoom> e : rooms.entrySet()) { |
| e.getValue().close(); |
| } |
| rooms.clear(); |
| } catch (Exception e) { |
| log.error("Unexpected error while clean-up", e); |
| } |
| } |
| } |
| |
| private static Map<String, String> tagsAsMap(MediaObject pipe) { |
| return pipe.getTags().stream() |
| .collect(Collectors.toMap(Tag::getKey, Tag::getValue)); |
| } |
| |
| Transaction beginTransaction() { |
| return client.beginTransaction(); |
| } |
| |
| public void onMessage(IWsClient inClient, JSONObject msg) { |
| if (!isConnected()) { |
| sendError(inClient, "Multimedia server is inaccessible"); |
| return; |
| } |
| final String cmdId = msg.getString("id"); |
| if (MODE_TEST.equals(msg.optString(TAG_MODE))) { |
| testProcessor.onMessage(inClient, cmdId, msg); |
| } else { |
| final Client c = (Client)inClient; |
| |
| if (c == null || c.getRoomId() == null) { |
| log.warn("Incoming message from invalid user"); |
| return; |
| } |
| streamProcessor.onMessage(c, cmdId, msg); |
| } |
| } |
| |
| public JSONObject getRecordingUser(Long roomId) { |
| if (!isConnected()) { |
| return new JSONObject(); |
| } |
| return getRoom(roomId).getRecordingUser(); |
| } |
| |
| public void leaveRoom(Client c) { |
| remove(c); |
| WebSocketHelper.sendAll(newKurentoMsg() |
| .put("id", "clientLeave") |
| .put("uid", c.getUid()) |
| .toString() |
| ); |
| } |
| |
| void sendShareUpdated(StreamDesc sd) { |
| sendClient(sd.getSid(), newKurentoMsg() |
| .put("id", "shareUpdated") |
| .put("stream", sd.toJson()) |
| ); |
| } |
| |
| public void sendClient(String sid, JSONObject msg) { |
| WebSocketHelper.sendClient(cm.getBySid(sid), msg); |
| } |
| |
| public static void sendError(IWsClient c, String msg) { |
| WebSocketHelper.sendClient(c, newKurentoMsg() |
| .put("id", "error") |
| .put("message", msg)); |
| } |
| |
| public void remove(IWsClient c) { |
| if (!isConnected() || c == null) { |
| return; |
| } |
| if (!(c instanceof Client)) { |
| testProcessor.remove(c); |
| return; |
| } |
| streamProcessor.remove((Client)c); |
| } |
| |
| MediaPipeline createPipiline(Map<String, String> tags, Continuation<Void> continuation) { |
| Transaction t = beginTransaction(); |
| MediaPipeline pipe = client.createMediaPipeline(t); |
| pipe.addTag(t, TAG_KUID, kuid); |
| tags.forEach((key, value) -> pipe.addTag(t, key, value)); |
| t.commit(continuation); |
| return pipe; |
| } |
| |
| KRoom getRoom(Long roomId) { |
| return rooms.computeIfAbsent(roomId, k -> { |
| log.debug("Room {} does not exist. Will create now!", roomId); |
| Room r = roomDao.get(roomId); |
| return new KRoom(r); |
| }); |
| } |
| |
| public Collection<KRoom> getRooms() { |
| return rooms.values(); |
| } |
| |
| public void updateSipCount(Room r, long count) { |
| getRoom(r.getId()).updateSipCount(count); |
| } |
| |
| static JSONObject newKurentoMsg() { |
| return new JSONObject().put("type", KURENTO_TYPE); |
| } |
| |
| public static boolean activityAllowed(Client c, Activity a, Room room) { |
| boolean r = false; |
| switch (a) { |
| case AUDIO: |
| r = c.hasRight(Right.AUDIO); |
| break; |
| case VIDEO: |
| r = !room.isAudioOnly() && c.hasRight(Right.VIDEO); |
| break; |
| case AUDIO_VIDEO: |
| r = !room.isAudioOnly() && c.hasRight(Right.AUDIO) && c.hasRight(Right.VIDEO); |
| break; |
| default: |
| break; |
| } |
| return r; |
| } |
| |
| public JSONArray getTurnServers(Client c) { |
| return getTurnServers(c, false); |
| } |
| |
| JSONArray getTurnServers(Client c, final boolean test) { |
| JSONArray arr = new JSONArray(); |
| if (!Strings.isEmpty(turnUrl)) { |
| try { |
| JSONObject turn = new JSONObject(); |
| if ("rest".equalsIgnoreCase(turnMode)) { |
| Mac mac = Mac.getInstance(HMAC_SHA1_ALGORITHM); |
| mac.init(new SecretKeySpec(turnSecret.getBytes(), HMAC_SHA1_ALGORITHM)); |
| StringBuilder user = new StringBuilder() |
| .append((test ? 60 : turnTtl * 60) + System.currentTimeMillis() / 1000L); |
| final String uid = c == null ? null : c.getUid(); |
| if (!Strings.isEmpty(uid)) { |
| user.append(':').append(uid); |
| } else if (!Strings.isEmpty(turnUser)) { |
| user.append(':').append(turnUser); |
| } |
| turn.put("username", user) |
| .put("credential", Base64.getEncoder().encodeToString(mac.doFinal(user.toString().getBytes()))); |
| } else { |
| turn.put("username", turnUser) |
| .put("credential", turnSecret); |
| } |
| |
| JSONArray urls = new JSONArray(); |
| final String[] turnUrls = turnUrl.split(","); |
| for (String url : turnUrls) { |
| if (url.startsWith("stun:") || url.startsWith("stuns:") || url.startsWith("turn:") || url.startsWith("turns:")) { |
| urls.put(url); |
| } else { |
| urls.put("turn:" + url); |
| } |
| } |
| turn.put("urls", urls); |
| |
| arr.put(turn); |
| } catch (NoSuchAlgorithmException|InvalidKeyException e) { |
| log.error("Unexpected error while creating turn", e); |
| } |
| } |
| return arr; |
| } |
| |
| KurentoClient getClient() { |
| return client; |
| } |
| |
| String getKuid() { |
| return kuid; |
| } |
| |
| @Value("${kurento.certificateType}") |
| public void setCertificateType(String certificateType) { |
| if (certificateType.isEmpty()) { |
| return; |
| } |
| this.certificateType = CertificateKeyType.valueOf(certificateType); |
| } |
| |
| public CertificateKeyType getCertificateType() { |
| return certificateType; |
| } |
| |
| static int getFlowoutTimeout() { |
| return flowoutTimeout; |
| } |
| |
| @Value("${kurento.flowout.timeout}") |
| private void setFlowoutTimeout(int timeout) { |
| flowoutTimeout = timeout; |
| } |
| |
| @Value("${kurento.ignored.kuids}") |
| private void setIgnoredKuids(String ignoredKuids) { |
| if (!Strings.isEmpty(ignoredKuids)) { |
| this.ignoredKuids.addAll(List.of(ignoredKuids.split("[, ]"))); |
| } |
| } |
| |
| private class KWatchDogCreate implements EventListener<ObjectCreatedEvent> { |
| private ScheduledExecutorService scheduler; |
| |
| public KWatchDogCreate() { |
| scheduler = Executors.newScheduledThreadPool(watchThreadCount); |
| } |
| |
| private void checkPipeline(String roomOid) { |
| scheduler.schedule(() -> { |
| if (client == null) { |
| return; |
| } |
| // still alive |
| MediaPipeline pipe = client.getById(roomOid, MediaPipeline.class); |
| Map<String, String> tags = tagsAsMap(pipe); |
| try { |
| final String inKuid = tags.get(TAG_KUID); |
| if (inKuid != null && ignoredKuids.contains(inKuid)) { |
| return; |
| } |
| if (validTestPipeline(tags)) { |
| return; |
| } |
| if (kuid.equals(inKuid)) { |
| KStream stream = streamProcessor.getByUid(tags.get(TAG_STREAM_UID)); |
| if (stream != null) { |
| if (stream.getRoomId().equals(Long.valueOf(tags.get(TAG_ROOM))) |
| && stream.getPipeline().getId().equals(pipe.getId())) |
| { |
| return; |
| } else { |
| stream.release(); |
| } |
| } |
| } |
| } catch (Exception e) { |
| log.warn("Unexpected error while checking MediaPipeline {}, tags: {}", pipe.getId(), tags, e); |
| } |
| log.warn("Invalid MediaPipeline {} detected, will be dropped, tags: {}", pipe.getId(), tags); |
| pipe.release(); |
| }, objCheckTimeout, MILLISECONDS); |
| } |
| |
| private Class<? extends Endpoint> getEndpointClass(Endpoint curPoint) { |
| Class<? extends Endpoint> clazz = null; |
| if (curPoint instanceof WebRtcEndpoint) { |
| clazz = WebRtcEndpoint.class; |
| } else if (curPoint instanceof RecorderEndpoint) { |
| clazz = RecorderEndpoint.class; |
| } else if (curPoint instanceof PlayerEndpoint) { |
| clazz = PlayerEndpoint.class; |
| } else if (curPoint instanceof RtpEndpoint) { |
| clazz = RtpEndpoint.class; |
| } |
| return clazz; |
| } |
| |
| private void checkEndpoint(String endpointOid, Class<? extends Endpoint> clazz) { |
| scheduler.schedule(() -> { |
| if (client == null || clazz == null) { |
| return; |
| } |
| // still alive |
| Endpoint point = client.getById(endpointOid, clazz); |
| Map<String, String> tags = tagsAsMap(point); |
| try { |
| Map<String, String> pipeTags = tagsAsMap(point.getMediaPipeline()); |
| final String inKuid = pipeTags.get(TAG_KUID); |
| if (ignoredKuids.contains(inKuid)) { |
| return; |
| } |
| if (validTestPipeline(pipeTags)) { |
| return; |
| } |
| KStream stream = streamProcessor.getByUid(tags.get("outUid")); |
| log.debug("Kurento::ObjectCreated -> New Endpoint {} detected, tags: {}, kStream: {}", point.getId(), tags, stream); |
| if (stream != null && stream.contains(tags.get("uid"))) { |
| return; |
| } |
| } catch (Exception e) { |
| log.warn("Unexpected error while checking Endpoint {}, tags: {}", point.getId(), tags, e); |
| } |
| log.warn("Invalid Endpoint {} detected, will be dropped, tags: {}", point.getId(), tags); |
| point.release(); |
| }, objCheckTimeout, MILLISECONDS); |
| } |
| |
| @Override |
| public void onEvent(ObjectCreatedEvent evt) { |
| MediaObject obj = evt.getObject(); |
| log.debug("Kurento::ObjectCreated -> {}, source {}", obj, evt.getSource()); |
| if (obj instanceof MediaPipeline) { |
| // room created |
| final String roid = obj.getId(); |
| |
| checkPipeline(roid); |
| } else if (obj instanceof Endpoint curPoint) { |
| // endpoint created |
| final String eoid = curPoint.getId(); |
| final Class<? extends Endpoint> clazz = getEndpointClass(curPoint); |
| |
| checkEndpoint(eoid, clazz); |
| } |
| } |
| |
| private boolean validTestPipeline(Map<String, String> tags) { |
| return kuid.equals(tags.get(TAG_KUID)) |
| && MODE_TEST.equals(tags.get(TAG_MODE)) |
| && MODE_TEST.equals(tags.get(TAG_ROOM)); |
| } |
| } |
| } |