blob: 8d1e2d7487246a3d40572624b4911cf33c7ff5a9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License") + you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openmeetings.mediaserver.remote;
import static org.apache.openmeetings.mediaserver.remote.KurentoHandler.PARAM_ICE;
import static org.apache.openmeetings.mediaserver.remote.KurentoHandler.activityAllowed;
import static org.apache.openmeetings.mediaserver.remote.KurentoHandler.newKurentoMsg;
import static org.apache.openmeetings.util.OpenmeetingsVariables.isRecordingsEnabled;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
import org.apache.openmeetings.core.converter.IRecordingConverter;
import org.apache.openmeetings.core.converter.InterviewConverter;
import org.apache.openmeetings.core.converter.RecordingConverter;
import org.apache.openmeetings.core.util.WebSocketHelper;
import org.apache.openmeetings.db.dao.record.RecordingDao;
import org.apache.openmeetings.db.entity.basic.Client;
import org.apache.openmeetings.db.entity.basic.Client.Activity;
import org.apache.openmeetings.db.entity.basic.Client.StreamDesc;
import org.apache.openmeetings.db.entity.basic.Client.StreamType;
import org.apache.openmeetings.db.entity.record.Recording;
import org.apache.openmeetings.db.entity.room.Room;
import org.apache.openmeetings.db.entity.room.Room.Right;
import org.apache.openmeetings.db.entity.room.Room.RoomElement;
import org.apache.openmeetings.db.manager.IClientManager;
import org.apache.openmeetings.db.util.ws.RoomMessage;
import org.apache.openmeetings.db.util.ws.TextRoomMessage;
import org.apache.openmeetings.util.logging.TimedApplication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;
import com.github.openjson.JSONArray;
import com.github.openjson.JSONObject;
@Component
public class StreamProcessor implements IStreamProcessor {
private static final Logger log = LoggerFactory.getLogger(StreamProcessor.class);
/**
* Holds a reference to the current streams available on the server instance
*/
private final Map<String, KStream> streamByUid = new ConcurrentHashMap<>();
@Autowired
private IClientManager cm;
@Autowired
private RecordingDao recDao;
@Autowired
private KurentoHandler kHandler;
@Autowired
private TaskExecutor taskExecutor;
@Autowired
private RecordingConverter recordingConverter;
@Autowired
private InterviewConverter interviewConverter;
@Autowired
private StreamProcessorActions streamProcessorActions;
@TimedApplication
void onMessage(Client c, final String cmdId, JSONObject msg) {
final String uid = msg.optString("uid");
Optional<StreamDesc> osd;
log.debug("Incoming message from user with ID '{}': {}", c.getUserId(), msg);
switch (cmdId) {
case "devicesAltered":
onDeviceAltered(c, uid, msg);
break;
case "toggleActivity":
onToggleActivity(c, Activity.valueOf(msg.getString("activity")));
break;
case "broadcastStarted":
streamProcessorActions.handleBroadcastStarted(c, uid, msg);
break;
case "broadcastRestarted":
streamProcessorActions.handleBroadcastRestarted(c, uid);
break;
case "onIceCandidate":
streamProcessorActions.addIceCandidate(msg);
break;
case "addListener":
streamProcessorActions.addListener(c, msg);
break;
case "wannaShare":
osd = c.getScreenStream();
if (screenShareAllowed(c) || (osd.isPresent() && !osd.get().hasActivity(Activity.SCREEN))) {
startSharing(c, osd, msg, Activity.SCREEN);
}
break;
case "wannaRecord":
onWannaRecord(c, msg);
break;
case "pauseSharing":
pauseSharing(c, uid);
break;
case "stopRecord":
stopRecording(c);
break;
case "errorSharing":
errorSharing(c);
break;
default:
// no-op
break;
}
}
private void onDeviceAltered(Client c, String uid, JSONObject msg) {
StreamDesc sd = c.getStream(uid);
if (sd != null) {
if (!msg.getBoolean("audio") && c.hasActivity(Activity.AUDIO)) {
c.remove(Activity.AUDIO);
}
if (!msg.getBoolean("video") && c.hasActivity(Activity.VIDEO)) {
c.remove(Activity.VIDEO);
}
sd.setActivities();
WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), cm.update(c), RoomMessage.Type.RIGHT_UPDATED, c.getUid()));
}
}
private void onWannaRecord(Client c, JSONObject msg) {
Optional<StreamDesc> osd = c.getScreenStream();
if (recordingAllowed(c)) {
Room r = c.getRoom();
if (Room.Type.INTERVIEW == r.getType()) {
log.warn("This shouldn't be called for interview room");
return;
}
boolean sharing = isSharing(r.getId());
startSharing(c, osd, msg, Activity.RECORD);
if (sharing) {
startRecording(c);
}
}
}
/**
* Method to start broadcasting. Externalised for mocking purpose to be able to
* prevent calling webRTC methods.
*
* @param stream Stream to start
* @param sd StreamDesc to start
* @param sdpOffer the sdpOffer
* @param then steps need to be done after broadcast is started
* @return the current KStream
*/
void startBroadcast(KStream stream, StreamDesc sd, String sdpOffer, Runnable then) {
stream.startBroadcast(sd, sdpOffer, then);
}
private static boolean isBroadcasting(final Client c) {
return c.hasAnyActivity(Activity.AUDIO, Activity.VIDEO);
}
private Set<String> cleanWebCams(Client c, List<StreamDesc> streams) {
Set<String> closed = new HashSet<>();
streams.stream()
.filter(lsd -> StreamType.WEBCAM == lsd.getType())
.forEach(lsd -> {
KStream s = getByUid(lsd.getUid());
if (s != null) {
s.stopBroadcast();
}
c.removeStream(lsd.getUid());
closed.add(lsd.getUid());
});
return closed;
}
@TimedApplication
public void onToggleActivity(Client c, Activity a) {
log.info("PARTICIPANT {}: trying to toggle activity {}", c, a);
if (!kHandler.isConnected()) {
return;
}
if (activityAllowed(c, a, c.getRoom())) {
boolean wasBroadcasting = isBroadcasting(c);
if (a == Activity.AUDIO && !c.isMicEnabled()) {
return;
}
if (a == Activity.VIDEO && !c.isCamEnabled()) {
return;
}
if (a == Activity.AUDIO_VIDEO && !c.isMicEnabled() && !c.isCamEnabled()) {
return;
}
c.toggle(a);
List<StreamDesc> streams = c.getStreams();
if (!isBroadcasting(c)) {
Set<String> closed = cleanWebCams(c, streams);
if (!closed.isEmpty()) {
cm.update(c);
checkStreams(c.getRoomId());
WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.RIGHT_UPDATED, c.getUid()));
}
} else {
StreamDesc sd = c.addStream(StreamType.WEBCAM);
Set<String> closed = wasBroadcasting ? cleanWebCams(c, streams) : Set.of();
cm.update(c.restoreActivities(sd));
log.debug("User {}: has started broadcast", sd.getUid());
kHandler.sendClient(sd.getSid(), newKurentoMsg()
.put("id", "broadcast")
.put("stream", sd.toJson(true))
.put("cleanup", new JSONArray(closed))
.put(PARAM_ICE, kHandler.getTurnServers(c, false)));
}
}
}
private void constraintsChanged(Client c) {
//constraints were changed
c.getStreams().stream()
.filter(sd -> StreamType.WEBCAM == sd.getType())
.findFirst()
.ifPresent(sd -> {
sd.setActivities();
cm.update(c);
});
}
public void rightsUpdated(Client c) {
Optional<StreamDesc> osd = c.getScreenStream();
if (osd.isPresent() && !hasRightsToShare(c)) {
stopSharing(c, osd.get().getUid());
}
if (isBroadcasting(c)) {
constraintsChanged(c);
} else {
c.getStreams().stream()
.filter(sd -> StreamType.WEBCAM == sd.getType())
.forEach(sd -> {
KStream stream = streamByUid.get(sd.getUid());
if (stream != null) {
KRoom room = kHandler.getRoom(c.getRoomId());
room.onStopBroadcast(stream);
}
});
}
WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.RIGHT_UPDATED, c.getUid()));
}
private void checkStreams(Long roomId) {
if (!kHandler.isConnected()) {
return;
}
KRoom kRoom = kHandler.getRoom(roomId);
if (kRoom.isSharing() && cm.streamByRoom(roomId)
.flatMap(c -> c.getStreams().stream())
.filter(sd -> StreamType.SCREEN == sd.getType())
.findAny()
.isEmpty())
{
log.info("No more screen streams in the room, stopping sharing");
kRoom.stopSharing();
if (Room.Type.INTERVIEW != kRoom.getRoom().getType() && kRoom.isRecording()) {
log.info("No more screen streams in the non-interview room, stopping recording");
kRoom.stopRecording(null);
}
}
if (kRoom.isRecording() && cm.streamByRoom(roomId)
.flatMap(c -> c.getStreams().stream())
.findAny()
.isEmpty())
{
log.info("No more streams in the room, stopping recording");
kRoom.stopRecording(null);
}
}
// Sharing
public boolean hasRightsToShare(Client c) {
if (!kHandler.isConnected()) {
return false;
}
Room r = c.getRoom();
return r != null && Room.Type.INTERVIEW != r.getType()
&& !r.isHidden(RoomElement.SCREEN_SHARING)
&& c.hasRight(Right.SHARE);
}
public boolean screenShareAllowed(Client c) {
Room r = c.getRoom();
return hasRightsToShare(c) && !isSharing(r.getId());
}
private void errorSharing(Client c) {
if (!kHandler.isConnected()) {
return;
}
KRoom room = kHandler.getRoom(c.getRoomId());
if (!room.isSharing() || !c.getSid().equals(room.getSharingUser().getString("sid"))) {
return;
}
Optional<StreamDesc> osd = c.getScreenStream();
if (osd.isPresent()) {
stopSharing(c, osd.get().getUid());
} else {
room.stopSharing();
}
stopRecording(c);
}
private void startSharing(Client c, Optional<StreamDesc> osd, JSONObject msg, Activity a) {
if (kHandler.isConnected() && c.getRoomId() != null) {
kHandler.getRoom(c.getRoomId()).startSharing(c, osd, msg, a);
}
}
/**
* Execute Pausing of sharing.
*
* Invoked and overwritten by Mock, hance package private.
*
* @param c client
* @param uid the uid
*/
void pauseSharing(Client c, String uid) {
if (!hasRightsToShare(c)) {
return;
}
if (!isSharing(c.getRoomId())) {
return;
}
if (isRecording(c.getRoomId())) {
StreamDesc sd = c.getStream(uid);
sd.removeActivity(Activity.SCREEN);
cm.update(c);
KStream sender = getByUid(uid);
sender.pauseSharing();
kHandler.sendShareUpdated(sd);
WebSocketHelper.sendRoomOthers(c.getRoomId(), c.getUid(), newStoppedMsg(sd));
} else {
stopSharing(c, uid);
}
}
private void stopSharing(Client c, String uid) {
KStream sender = getByUid(uid);
StreamDesc sd = doStopSharing(c.getSid(), uid);
if (sender != null && sd != null) {
sender.stopBroadcast();
} else {
log.warn("Could not stop broadcast - could be a KStream leak and lead to ghost KStream, client: {}, uid: {} ", c, uid);
}
}
StreamDesc doStopSharing(String sid, String uid) {
return doStopSharing(getBySid(sid), uid);
}
private StreamDesc doStopSharing(Client c, String uid) {
StreamDesc sd = null;
if (c.getRoomId() != null) {
sd = c.getStream(uid);
if (sd != null && StreamType.SCREEN == sd.getType()) {
c.removeStream(uid);
cm.update(c);
checkStreams(c.getRoomId());
WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.RIGHT_UPDATED, c.getUid()));
kHandler.sendShareUpdated(sd
.removeActivity(Activity.SCREEN)
.removeActivity(Activity.RECORD));
}
}
return sd;
}
public boolean isSharing(Long roomId) {
if (!kHandler.isConnected()) {
return false;
}
return kHandler.getRoom(roomId).isSharing();
}
// Recording
public boolean hasRightsToRecord(Client c) {
Room r = c.getRoom();
return isRecordingsEnabled() && r != null && r.isAllowRecording() && c.hasRight(Right.MODERATOR);
}
public boolean recordingAllowed(Client c) {
if (!kHandler.isConnected() || !isRecordingsEnabled()) {
return false;
}
Room r = c.getRoom();
return hasRightsToRecord(c) && !isRecording(r.getId());
}
public void startRecording(Client c) {
if (!kHandler.isConnected() || !hasRightsToRecord(c)) {
return;
}
kHandler.getRoom(c.getRoomId()).startRecording(c);
}
public void stopRecording(Client c) {
if (!kHandler.isConnected() || !hasRightsToRecord(c)) {
return;
}
kHandler.getRoom(c.getRoomId()).stopRecording(c);
// In case this user wasn't shareing his screen we also need to close that one
c.getScreenStream().ifPresent(sd -> {
if (!sd.hasActivity(Activity.SCREEN)) {
pauseSharing(c, sd.getUid());
}
});
}
/**
* Used for mocking. Requires a return value in order to be mocked.
*
* @param rec
* @return
*/
boolean startConvertion(Recording rec) {
IRecordingConverter conv = rec.isInterview() ? interviewConverter : recordingConverter;
taskExecutor.execute(() -> conv.startConversion(rec));
return true;
}
public boolean isRecording(Long roomId) {
if (!kHandler.isConnected()) {
return false;
}
return kHandler.getRoom(roomId).isRecording();
}
void remove(Client c) {
for (StreamDesc sd : c.getStreams()) {
AbstractStream s = getByUid(sd.getUid());
if (s != null) {
s.release();
WebSocketHelper.sendRoomOthers(c.getRoomId(), c.getUid(), newStoppedMsg(sd));
}
}
if (c.getRoomId() != null) {
getByRoom(c.getRoomId()).forEach(stream -> stream.remove(c)); // listeners of existing streams should be cleaned-up
checkStreams(c.getRoomId());
}
}
void addStream(KStream stream) {
streamByUid.put(stream.getUid(), stream);
}
public Collection<KStream> getStreams() {
return streamByUid.values();
}
Stream<KStream> getByRoom(Long roomId) {
return streamByUid.values().stream()
.filter(stream -> stream.getRoomId().equals(roomId));
}
Client getBySid(String sid) {
return cm.getBySid(sid);
}
public boolean hasStream(String uid) {
return streamByUid.get(uid) != null;
}
KStream getByUid(String uid) {
return uid == null ? null : streamByUid.get(uid);
}
KurentoHandler getHandler() {
return kHandler;
}
IClientManager getClientManager() {
return cm;
}
RecordingDao getRecordingDao() {
return recDao;
}
@Override
public void release(AbstractStream stream, boolean releaseStream) {
final String uid = stream.getUid();
if (releaseStream) {
stream.release();
}
Client c = cm.getBySid(stream.getSid());
if (c != null) {
StreamDesc sd = c.getStream(uid);
if (sd != null) {
c.removeStream(uid);
if (StreamType.WEBCAM == sd.getType()) {
for (Activity a : sd.getActivities()) {
c.remove(a);
}
}
cm.update(c);
WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.RIGHT_UPDATED, c.getUid()));
}
}
streamByUid.remove(uid);
}
@Override
public void destroy() {
for (Map.Entry<String, KStream> e : streamByUid.entrySet()) {
release(e.getValue(), true);
}
}
protected static JSONObject newStoppedMsg(StreamDesc sd) {
return newKurentoMsg()
.put("id", "broadcastStopped")
.put("uid", sd.getUid());
}
}