blob: a9e10ec01541c0e44224505ae9c31a9bb933a480 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License") + you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openmeetings.core.remote;
import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE;
import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE;
import static org.apache.openmeetings.core.remote.KurentoHandler.activityAllowed;
import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg;
import static org.apache.openmeetings.core.remote.KurentoHandler.sendError;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.openmeetings.core.converter.IRecordingConverter;
import org.apache.openmeetings.core.converter.InterviewConverter;
import org.apache.openmeetings.core.converter.RecordingConverter;
import org.apache.openmeetings.core.util.WebSocketHelper;
import org.apache.openmeetings.db.dao.record.RecordingDao;
import org.apache.openmeetings.db.entity.basic.Client;
import org.apache.openmeetings.db.entity.basic.Client.Activity;
import org.apache.openmeetings.db.entity.basic.Client.StreamDesc;
import org.apache.openmeetings.db.entity.basic.Client.StreamType;
import org.apache.openmeetings.db.entity.record.Recording;
import org.apache.openmeetings.db.entity.room.Room;
import org.apache.openmeetings.db.entity.room.Room.Right;
import org.apache.openmeetings.db.entity.room.Room.RoomElement;
import org.apache.openmeetings.db.manager.IClientManager;
import org.apache.openmeetings.db.util.ws.RoomMessage;
import org.apache.openmeetings.db.util.ws.TextRoomMessage;
import org.apache.wicket.util.string.Strings;
import org.kurento.client.IceCandidate;
import org.kurento.client.internal.server.KurentoServerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;
import com.github.openjson.JSONObject;
@Component
public class StreamProcessor implements IStreamProcessor {
private static final Logger log = LoggerFactory.getLogger(StreamProcessor.class);
private final Map<String, KStream> streamByUid = new ConcurrentHashMap<>();
@Autowired
private IClientManager cm;
@Autowired
private RecordingDao recDao;
@Autowired
private KurentoHandler kHandler;
@Autowired
private TaskExecutor taskExecutor;
@Autowired
private RecordingConverter recordingConverter;
@Autowired
private InterviewConverter interviewConverter;
void onMessage(Client c, final String cmdId, JSONObject msg) {
final String uid = msg.optString("uid");
KStream sender;
StreamDesc sd;
Optional<StreamDesc> osd;
log.debug("Incoming message from user with ID '{}': {}", c.getUserId(), msg);
switch (cmdId) {
case "devicesAltered":
if (!msg.getBoolean("audio") && c.hasActivity(Activity.AUDIO)) {
c.remove(Activity.AUDIO);
}
if (!msg.getBoolean("video") && c.hasActivity(Activity.VIDEO)) {
c.remove(Activity.VIDEO);
}
c.getStream(uid).setActivities();
WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), cm.update(c), RoomMessage.Type.rightUpdated, c.getUid()));
break;
case "toggleActivity":
toggleActivity(c, Activity.valueOf(msg.getString("activity")));
break;
case "broadcastStarted":
handleBroadcastStarted(c, uid, msg);
break;
case "onIceCandidate":
sender = getByUid(uid);
if (sender != null) {
JSONObject candidate = msg.getJSONObject(PARAM_CANDIDATE);
String candStr = candidate.getString(PARAM_CANDIDATE);
if (!Strings.isEmpty(candStr)) {
IceCandidate cand = new IceCandidate(
candStr
, candidate.getString("sdpMid")
, candidate.getInt("sdpMLineIndex"));
sender.addCandidate(cand, msg.getString("luid"));
}
}
break;
case "addListener":
sender = getByUid(msg.getString("sender"));
if (sender != null) {
Client sendClient = cm.getBySid(sender.getSid());
sd = sendClient.getStream(sender.getUid());
if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !sd.hasActivity(Activity.SCREEN)) {
break;
}
sender.addListener(this, c.getSid(), c.getUid(), msg.getString("sdpOffer"));
}
break;
case "wannaShare":
osd = c.getScreenStream();
if (screenShareAllowed(c) || (osd.isPresent() && !osd.get().hasActivity(Activity.SCREEN))) {
startSharing(c, osd, msg, Activity.SCREEN);
}
break;
case "wannaRecord":
osd = c.getScreenStream();
if (recordingAllowed(c)) {
Room r = c.getRoom();
if (Room.Type.interview == r.getType()) {
log.warn("This shouldn't be called for interview room");
break;
}
boolean sharing = isSharing(r.getId());
startSharing(c, osd, msg, Activity.RECORD);
if (sharing) {
startRecording(c);
}
}
break;
case "pauseSharing":
pauseSharing(c, uid);
break;
case "stopRecord":
stopRecording(c);
break;
case "errorSharing":
errorSharing(c);
break;
default:
// no-op
break;
}
}
private void handleBroadcastStarted(Client c, final String uid, JSONObject msg) {
StreamDesc sd = c.getStream(uid);
KStream sender= getByUid(uid);
try {
if (sender == null) {
KRoom room = kHandler.getRoom(c.getRoomId());
sender = room.join(sd);
}
sender.startBroadcast(this, sd, msg.getString("sdpOffer"));
if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !isRecording(c.getRoomId())) {
startRecording(c);
}
} catch (KurentoServerException e) {
sender.release(this);
WebSocketHelper.sendClient(c, newKurentoMsg()
.put("id", "broadcastStopped")
.put("uid", sd.getUid())
);
sendError(c, "Failed to start broadcast: " + e.getMessage());
log.error("Failed to start broadcast", e);
}
}
private static boolean isBroadcasting(final Client c) {
return c.hasAnyActivity(Activity.AUDIO, Activity.VIDEO);
}
public void toggleActivity(Client c, Activity a) {
log.info("PARTICIPANT {}: trying to toggle activity {}", c, a);
if (!activityAllowed(c, a, c.getRoom())) {
if (a == Activity.AUDIO || a == Activity.AUDIO_VIDEO) {
c.allow(Room.Right.audio);
}
if (!c.getRoom().isAudioOnly() && (a == Activity.VIDEO || a == Activity.AUDIO_VIDEO)) {
c.allow(Room.Right.video);
}
}
if (activityAllowed(c, a, c.getRoom())) {
boolean wasBroadcasting = isBroadcasting(c);
if (a == Activity.AUDIO && !c.isMicEnabled()) {
return;
}
if (a == Activity.VIDEO && !c.isCamEnabled()) {
return;
}
if (a == Activity.AUDIO_VIDEO && !c.isMicEnabled() && !c.isCamEnabled()) {
return;
}
c.toggle(a);
if (!isBroadcasting(c)) {
//close
AtomicBoolean changed = new AtomicBoolean(false);
c.getStreams().stream()
.filter(sd -> StreamType.WEBCAM == sd.getType())
.forEach(sd -> {
KStream s = getByUid(sd.getUid());
if (s != null) {
s.stopBroadcast(this);
}
c.removeStream(sd.getUid());
changed.set(true);
});
if (changed.get()) {
cm.update(c);
checkStreams(c.getRoomId());
}
WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
} else if (!wasBroadcasting) {
//join
StreamDesc sd = c.addStream(StreamType.WEBCAM);
cm.update(c);
log.debug("User {}: has started broadcast", sd.getUid());
kHandler.sendClient(sd.getSid(), newKurentoMsg()
.put("id", "broadcast")
.put("stream", sd.toJson())
.put(PARAM_ICE, kHandler.getTurnServers(false)));
} else {
constraintsChanged(c);
WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
}
}
}
private void constraintsChanged(Client c) {
//constraints were changed
c.getStreams().stream()
.filter(sd -> StreamType.WEBCAM == sd.getType())
.findFirst()
.ifPresent(sd -> {
sd.setActivities();
cm.update(c);
});
}
public void rightsUpdated(Client c) {
Optional<StreamDesc> osd = c.getScreenStream();
if (osd.isPresent() && !hasRightsToShare(c)) {
stopSharing(c, osd.get().getUid());
}
if (isBroadcasting(c)) {
constraintsChanged(c);
} else {
c.getStreams().stream()
.filter(sd -> StreamType.WEBCAM == sd.getType())
.forEach(sd -> {
KStream stream = streamByUid.get(sd.getUid());
if (stream != null) {
KRoom room = kHandler.getRoom(c.getRoomId());
room.onStopBroadcast(stream, this);
}
});
}
WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
}
private void checkStreams(Long roomId) {
if (!kHandler.isConnected()) {
return;
}
KRoom room = kHandler.getRoom(roomId);
if (room.isSharing()) {
List<StreamDesc> streams = cm.listByRoom(roomId).parallelStream()
.flatMap(c -> c.getStreams().stream())
.filter(sd -> StreamType.SCREEN == sd.getType()).collect(Collectors.toList());
if (streams.isEmpty()) {
log.info("No more screen streams in the room, stopping sharing");
room.stopSharing();
if (Room.Type.interview != room.getType() && room.isRecording()) {
log.info("No more screen streams in the non-interview room, stopping recording");
room.stopRecording(this, null);
}
}
}
if (room.isRecording()) {
List<StreamDesc> streams = cm.listByRoom(roomId).parallelStream()
.flatMap(c -> c.getStreams().stream())
.collect(Collectors.toList());
if (streams.isEmpty()) {
log.info("No more streams in the room, stopping recording");
room.stopRecording(this, null);
}
}
}
// Sharing
public boolean hasRightsToShare(Client c) {
Room r = c.getRoom();
return r != null && Room.Type.interview != r.getType()
&& !r.isHidden(RoomElement.ScreenSharing)
&& r.isAllowRecording()
&& c.hasRight(Right.share);
}
public boolean screenShareAllowed(Client c) {
if (!kHandler.isConnected()) {
return false;
}
Room r = c.getRoom();
return hasRightsToShare(c) && !isSharing(r.getId());
}
private void errorSharing(Client c) {
if (!kHandler.isConnected()) {
return;
}
KRoom room = kHandler.getRoom(c.getRoomId());
if (!room.isSharing() || !c.getSid().equals(room.getSharingUser().getString("sid"))) {
return;
}
Optional<StreamDesc> osd = c.getScreenStream();
if (osd.isPresent()) {
stopSharing(c, osd.get().getUid());
} else {
room.stopSharing();
}
stopRecording(c);
}
private void startSharing(Client c, Optional<StreamDesc> osd, JSONObject msg, Activity a) {
if (kHandler.isConnected() && c.getRoomId() != null) {
kHandler.getRoom(c.getRoomId()).startSharing(this, cm, c, osd, msg, a);
}
}
private void pauseSharing(Client c, String uid) {
if (!hasRightsToShare(c)) {
return;
}
if (!isSharing(c.getRoomId())) {
return;
}
if (isRecording(c.getRoomId())) {
StreamDesc sd = c.getStream(uid);
sd.removeActivity(Activity.SCREEN);
cm.update(c);
KStream sender = getByUid(uid);
sender.pauseSharing();
kHandler.sendShareUpdated(sd);
WebSocketHelper.sendRoomOthers(c.getRoomId(), c.getUid(), newKurentoMsg()
.put("id", "broadcastStopped")
.put("uid", sd.getUid())
);
} else {
stopSharing(c, uid);
}
}
private void stopSharing(Client c, String uid) {
KStream sender = getByUid(uid);
StreamDesc sd = doStopSharing(c.getSid(), uid);
if (sender != null && sd != null) {
sender.stopBroadcast(this);
}
}
StreamDesc doStopSharing(String sid, String uid) {
return doStopSharing(getBySid(sid), uid);
}
private StreamDesc doStopSharing(Client c, String uid) {
StreamDesc sd = null;
if (c.getRoomId() != null) {
sd = c.getStream(uid);
if (sd != null && StreamType.SCREEN == sd.getType()) {
c.removeStream(uid);
cm.update(c);
checkStreams(c.getRoomId());
WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
kHandler.sendShareUpdated(sd
.removeActivity(Activity.SCREEN)
.removeActivity(Activity.RECORD));
}
}
return sd;
}
public boolean isSharing(Long roomId) {
if (!kHandler.isConnected()) {
return false;
}
return kHandler.getRoom(roomId).isSharing();
}
// Recording
public boolean hasRightsToRecord(Client c) {
Room r = c.getRoom();
return r != null && r.isAllowRecording() && c.hasRight(Right.moderator);
}
public boolean recordingAllowed(Client c) {
if (!kHandler.isConnected()) {
return false;
}
Room r = c.getRoom();
return hasRightsToRecord(c) && !isRecording(r.getId());
}
public void startRecording(Client c) {
if (!kHandler.isConnected() || !hasRightsToRecord(c)) {
return;
}
kHandler.getRoom(c.getRoomId()).startRecording(this, c);
}
public void stopRecording(Client c) {
if (!kHandler.isConnected() || !hasRightsToRecord(c)) {
return;
}
kHandler.getRoom(c.getRoomId()).stopRecording(this, c);
}
void startConvertion(Recording rec) {
IRecordingConverter conv = rec.isInterview() ? interviewConverter : recordingConverter;
taskExecutor.execute(() -> conv.startConversion(rec));
}
public boolean isRecording(Long roomId) {
if (!kHandler.isConnected()) {
return false;
}
return kHandler.getRoom(roomId).isRecording();
}
void remove(Client c) {
for (StreamDesc sd : c.getStreams()) {
AbstractStream s = getByUid(sd.getUid());
if (s != null) {
s.release(this);
WebSocketHelper.sendRoomOthers(c.getRoomId(), c.getUid(), newKurentoMsg()
.put("id", "broadcastStopped")
.put("uid", sd.getUid()));
}
}
if (c.getRoomId() != null) {
KRoom room = kHandler.getRoom(c.getRoomId());
room.leave(this, c);
checkStreams(c.getRoomId());
}
}
void addStream(KStream stream) {
streamByUid.put(stream.getUid(), stream);
}
Client getBySid(String sid) {
return cm.getBySid(sid);
}
KStream getByUid(String uid) {
return uid == null ? null : streamByUid.get(uid);
}
KurentoHandler getHandler() {
return kHandler;
}
IClientManager getClientManager() {
return cm;
}
RecordingDao getRecordingDao() {
return recDao;
}
@Override
public void release(AbstractStream stream) {
final String uid = stream.getUid();
Client c = cm.getBySid(stream.getSid());
if (c != null) {
StreamDesc sd = c.getStream(uid);
c.removeStream(uid);
if (StreamType.WEBCAM == sd.getType()) {
for (Activity a : sd.getActivities()) {
c.remove(a);
}
}
cm.update(c);
WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
}
streamByUid.remove(uid);
}
@Override
public void destroy() {
streamByUid.clear();
}
}