| /* |
| |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License") + you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.openmeetings.core.remote; |
| |
| import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE; |
| import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE; |
| import static org.apache.openmeetings.core.remote.KurentoHandler.activityAllowed; |
| import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg; |
| import static org.apache.openmeetings.core.remote.KurentoHandler.sendError; |
| |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.stream.Collectors; |
| |
| import org.apache.openmeetings.core.converter.IRecordingConverter; |
| import org.apache.openmeetings.core.converter.InterviewConverter; |
| import org.apache.openmeetings.core.converter.RecordingConverter; |
| import org.apache.openmeetings.core.util.WebSocketHelper; |
| import org.apache.openmeetings.db.dao.record.RecordingDao; |
| import org.apache.openmeetings.db.entity.basic.Client; |
| import org.apache.openmeetings.db.entity.basic.Client.Activity; |
| import org.apache.openmeetings.db.entity.basic.Client.StreamDesc; |
| import org.apache.openmeetings.db.entity.basic.Client.StreamType; |
| import org.apache.openmeetings.db.entity.record.Recording; |
| import org.apache.openmeetings.db.entity.room.Room; |
| import org.apache.openmeetings.db.entity.room.Room.Right; |
| import org.apache.openmeetings.db.entity.room.Room.RoomElement; |
| import org.apache.openmeetings.db.manager.IClientManager; |
| import org.apache.openmeetings.db.util.ws.RoomMessage; |
| import org.apache.openmeetings.db.util.ws.TextRoomMessage; |
| import org.apache.wicket.util.string.Strings; |
| import org.kurento.client.IceCandidate; |
| import org.kurento.client.internal.server.KurentoServerException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.beans.factory.annotation.Autowired; |
| import org.springframework.core.task.TaskExecutor; |
| import org.springframework.stereotype.Component; |
| |
| import com.github.openjson.JSONObject; |
| |
| @Component |
| public class StreamProcessor implements IStreamProcessor { |
| private static final Logger log = LoggerFactory.getLogger(StreamProcessor.class); |
| private final Map<String, KStream> streamByUid = new ConcurrentHashMap<>(); |
| |
| @Autowired |
| private IClientManager cm; |
| @Autowired |
| private RecordingDao recDao; |
| @Autowired |
| private KurentoHandler kHandler; |
| @Autowired |
| private TaskExecutor taskExecutor; |
| @Autowired |
| private RecordingConverter recordingConverter; |
| @Autowired |
| private InterviewConverter interviewConverter; |
| |
| void onMessage(Client c, final String cmdId, JSONObject msg) { |
| final String uid = msg.optString("uid"); |
| KStream sender; |
| StreamDesc sd; |
| Optional<StreamDesc> osd; |
| log.debug("Incoming message from user with ID '{}': {}", c.getUserId(), msg); |
| switch (cmdId) { |
| case "devicesAltered": |
| if (!msg.getBoolean("audio") && c.hasActivity(Activity.AUDIO)) { |
| c.remove(Activity.AUDIO); |
| } |
| if (!msg.getBoolean("video") && c.hasActivity(Activity.VIDEO)) { |
| c.remove(Activity.VIDEO); |
| } |
| c.getStream(uid).setActivities(); |
| WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), cm.update(c), RoomMessage.Type.rightUpdated, c.getUid())); |
| break; |
| case "toggleActivity": |
| toggleActivity(c, Activity.valueOf(msg.getString("activity"))); |
| break; |
| case "broadcastStarted": |
| handleBroadcastStarted(c, uid, msg); |
| break; |
| case "onIceCandidate": |
| sender = getByUid(uid); |
| if (sender != null) { |
| JSONObject candidate = msg.getJSONObject(PARAM_CANDIDATE); |
| String candStr = candidate.getString(PARAM_CANDIDATE); |
| if (!Strings.isEmpty(candStr)) { |
| IceCandidate cand = new IceCandidate( |
| candStr |
| , candidate.getString("sdpMid") |
| , candidate.getInt("sdpMLineIndex")); |
| sender.addCandidate(cand, msg.getString("luid")); |
| } |
| } |
| break; |
| case "addListener": |
| sender = getByUid(msg.getString("sender")); |
| if (sender != null) { |
| Client sendClient = cm.getBySid(sender.getSid()); |
| sd = sendClient.getStream(sender.getUid()); |
| if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !sd.hasActivity(Activity.SCREEN)) { |
| break; |
| } |
| sender.addListener(this, c.getSid(), c.getUid(), msg.getString("sdpOffer")); |
| } |
| break; |
| case "wannaShare": |
| osd = c.getScreenStream(); |
| if (screenShareAllowed(c) || (osd.isPresent() && !osd.get().hasActivity(Activity.SCREEN))) { |
| startSharing(c, osd, msg, Activity.SCREEN); |
| } |
| break; |
| case "wannaRecord": |
| osd = c.getScreenStream(); |
| if (recordingAllowed(c)) { |
| Room r = c.getRoom(); |
| if (Room.Type.interview == r.getType()) { |
| log.warn("This shouldn't be called for interview room"); |
| break; |
| } |
| boolean sharing = isSharing(r.getId()); |
| startSharing(c, osd, msg, Activity.RECORD); |
| if (sharing) { |
| startRecording(c); |
| } |
| } |
| break; |
| case "pauseSharing": |
| pauseSharing(c, uid); |
| break; |
| case "stopRecord": |
| stopRecording(c); |
| break; |
| case "errorSharing": |
| errorSharing(c); |
| break; |
| default: |
| // no-op |
| break; |
| } |
| } |
| |
| private void handleBroadcastStarted(Client c, final String uid, JSONObject msg) { |
| StreamDesc sd = c.getStream(uid); |
| KStream sender= getByUid(uid); |
| try { |
| if (sender == null) { |
| KRoom room = kHandler.getRoom(c.getRoomId()); |
| sender = room.join(sd); |
| } |
| sender.startBroadcast(this, sd, msg.getString("sdpOffer")); |
| if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !isRecording(c.getRoomId())) { |
| startRecording(c); |
| } |
| } catch (KurentoServerException e) { |
| sender.release(this); |
| WebSocketHelper.sendClient(c, newKurentoMsg() |
| .put("id", "broadcastStopped") |
| .put("uid", sd.getUid()) |
| ); |
| sendError(c, "Failed to start broadcast: " + e.getMessage()); |
| log.error("Failed to start broadcast", e); |
| } |
| } |
| |
| private static boolean isBroadcasting(final Client c) { |
| return c.hasAnyActivity(Activity.AUDIO, Activity.VIDEO); |
| } |
| |
| public void toggleActivity(Client c, Activity a) { |
| log.info("PARTICIPANT {}: trying to toggle activity {}", c, a); |
| |
| if (!activityAllowed(c, a, c.getRoom())) { |
| if (a == Activity.AUDIO || a == Activity.AUDIO_VIDEO) { |
| c.allow(Room.Right.audio); |
| } |
| if (!c.getRoom().isAudioOnly() && (a == Activity.VIDEO || a == Activity.AUDIO_VIDEO)) { |
| c.allow(Room.Right.video); |
| } |
| } |
| if (activityAllowed(c, a, c.getRoom())) { |
| boolean wasBroadcasting = isBroadcasting(c); |
| if (a == Activity.AUDIO && !c.isMicEnabled()) { |
| return; |
| } |
| if (a == Activity.VIDEO && !c.isCamEnabled()) { |
| return; |
| } |
| if (a == Activity.AUDIO_VIDEO && !c.isMicEnabled() && !c.isCamEnabled()) { |
| return; |
| } |
| c.toggle(a); |
| if (!isBroadcasting(c)) { |
| //close |
| AtomicBoolean changed = new AtomicBoolean(false); |
| c.getStreams().stream() |
| .filter(sd -> StreamType.WEBCAM == sd.getType()) |
| .forEach(sd -> { |
| KStream s = getByUid(sd.getUid()); |
| if (s != null) { |
| s.stopBroadcast(this); |
| } |
| c.removeStream(sd.getUid()); |
| changed.set(true); |
| }); |
| if (changed.get()) { |
| cm.update(c); |
| checkStreams(c.getRoomId()); |
| } |
| WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid())); |
| } else if (!wasBroadcasting) { |
| //join |
| StreamDesc sd = c.addStream(StreamType.WEBCAM); |
| cm.update(c); |
| log.debug("User {}: has started broadcast", sd.getUid()); |
| kHandler.sendClient(sd.getSid(), newKurentoMsg() |
| .put("id", "broadcast") |
| .put("stream", sd.toJson()) |
| .put(PARAM_ICE, kHandler.getTurnServers(false))); |
| } else { |
| constraintsChanged(c); |
| WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid())); |
| } |
| } |
| } |
| |
| private void constraintsChanged(Client c) { |
| //constraints were changed |
| c.getStreams().stream() |
| .filter(sd -> StreamType.WEBCAM == sd.getType()) |
| .findFirst() |
| .ifPresent(sd -> { |
| sd.setActivities(); |
| cm.update(c); |
| }); |
| } |
| |
| public void rightsUpdated(Client c) { |
| Optional<StreamDesc> osd = c.getScreenStream(); |
| if (osd.isPresent() && !hasRightsToShare(c)) { |
| stopSharing(c, osd.get().getUid()); |
| } |
| if (isBroadcasting(c)) { |
| constraintsChanged(c); |
| } else { |
| c.getStreams().stream() |
| .filter(sd -> StreamType.WEBCAM == sd.getType()) |
| .forEach(sd -> { |
| KStream stream = streamByUid.get(sd.getUid()); |
| if (stream != null) { |
| KRoom room = kHandler.getRoom(c.getRoomId()); |
| room.onStopBroadcast(stream, this); |
| } |
| }); |
| } |
| WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid())); |
| } |
| |
| private void checkStreams(Long roomId) { |
| if (!kHandler.isConnected()) { |
| return; |
| } |
| KRoom room = kHandler.getRoom(roomId); |
| if (room.isSharing()) { |
| List<StreamDesc> streams = cm.listByRoom(roomId).parallelStream() |
| .flatMap(c -> c.getStreams().stream()) |
| .filter(sd -> StreamType.SCREEN == sd.getType()).collect(Collectors.toList()); |
| if (streams.isEmpty()) { |
| log.info("No more screen streams in the room, stopping sharing"); |
| room.stopSharing(); |
| if (Room.Type.interview != room.getType() && room.isRecording()) { |
| log.info("No more screen streams in the non-interview room, stopping recording"); |
| room.stopRecording(this, null); |
| } |
| } |
| } |
| if (room.isRecording()) { |
| List<StreamDesc> streams = cm.listByRoom(roomId).parallelStream() |
| .flatMap(c -> c.getStreams().stream()) |
| .collect(Collectors.toList()); |
| if (streams.isEmpty()) { |
| log.info("No more streams in the room, stopping recording"); |
| room.stopRecording(this, null); |
| } |
| } |
| } |
| |
| // Sharing |
| public boolean hasRightsToShare(Client c) { |
| Room r = c.getRoom(); |
| return r != null && Room.Type.interview != r.getType() |
| && !r.isHidden(RoomElement.ScreenSharing) |
| && r.isAllowRecording() |
| && c.hasRight(Right.share); |
| } |
| |
| public boolean screenShareAllowed(Client c) { |
| if (!kHandler.isConnected()) { |
| return false; |
| } |
| Room r = c.getRoom(); |
| return hasRightsToShare(c) && !isSharing(r.getId()); |
| } |
| |
| private void errorSharing(Client c) { |
| if (!kHandler.isConnected()) { |
| return; |
| } |
| KRoom room = kHandler.getRoom(c.getRoomId()); |
| if (!room.isSharing() || !c.getSid().equals(room.getSharingUser().getString("sid"))) { |
| return; |
| } |
| Optional<StreamDesc> osd = c.getScreenStream(); |
| if (osd.isPresent()) { |
| stopSharing(c, osd.get().getUid()); |
| } else { |
| room.stopSharing(); |
| } |
| stopRecording(c); |
| } |
| |
| private void startSharing(Client c, Optional<StreamDesc> osd, JSONObject msg, Activity a) { |
| if (kHandler.isConnected() && c.getRoomId() != null) { |
| kHandler.getRoom(c.getRoomId()).startSharing(this, cm, c, osd, msg, a); |
| } |
| } |
| |
| private void pauseSharing(Client c, String uid) { |
| if (!hasRightsToShare(c)) { |
| return; |
| } |
| if (!isSharing(c.getRoomId())) { |
| return; |
| } |
| if (isRecording(c.getRoomId())) { |
| StreamDesc sd = c.getStream(uid); |
| sd.removeActivity(Activity.SCREEN); |
| cm.update(c); |
| KStream sender = getByUid(uid); |
| sender.pauseSharing(); |
| kHandler.sendShareUpdated(sd); |
| WebSocketHelper.sendRoomOthers(c.getRoomId(), c.getUid(), newKurentoMsg() |
| .put("id", "broadcastStopped") |
| .put("uid", sd.getUid()) |
| ); |
| } else { |
| stopSharing(c, uid); |
| } |
| } |
| |
| private void stopSharing(Client c, String uid) { |
| KStream sender = getByUid(uid); |
| StreamDesc sd = doStopSharing(c.getSid(), uid); |
| if (sender != null && sd != null) { |
| sender.stopBroadcast(this); |
| } |
| } |
| |
| StreamDesc doStopSharing(String sid, String uid) { |
| return doStopSharing(getBySid(sid), uid); |
| } |
| |
| private StreamDesc doStopSharing(Client c, String uid) { |
| StreamDesc sd = null; |
| if (c.getRoomId() != null) { |
| sd = c.getStream(uid); |
| if (sd != null && StreamType.SCREEN == sd.getType()) { |
| c.removeStream(uid); |
| cm.update(c); |
| checkStreams(c.getRoomId()); |
| WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid())); |
| kHandler.sendShareUpdated(sd |
| .removeActivity(Activity.SCREEN) |
| .removeActivity(Activity.RECORD)); |
| } |
| } |
| return sd; |
| } |
| |
| public boolean isSharing(Long roomId) { |
| if (!kHandler.isConnected()) { |
| return false; |
| } |
| return kHandler.getRoom(roomId).isSharing(); |
| } |
| |
| // Recording |
| |
| public boolean hasRightsToRecord(Client c) { |
| Room r = c.getRoom(); |
| return r != null && r.isAllowRecording() && c.hasRight(Right.moderator); |
| } |
| |
| public boolean recordingAllowed(Client c) { |
| if (!kHandler.isConnected()) { |
| return false; |
| } |
| Room r = c.getRoom(); |
| return hasRightsToRecord(c) && !isRecording(r.getId()); |
| } |
| |
| public void startRecording(Client c) { |
| if (!kHandler.isConnected() || !hasRightsToRecord(c)) { |
| return; |
| } |
| kHandler.getRoom(c.getRoomId()).startRecording(this, c); |
| } |
| |
| public void stopRecording(Client c) { |
| if (!kHandler.isConnected() || !hasRightsToRecord(c)) { |
| return; |
| } |
| kHandler.getRoom(c.getRoomId()).stopRecording(this, c); |
| } |
| |
| void startConvertion(Recording rec) { |
| IRecordingConverter conv = rec.isInterview() ? interviewConverter : recordingConverter; |
| taskExecutor.execute(() -> conv.startConversion(rec)); |
| } |
| |
| public boolean isRecording(Long roomId) { |
| if (!kHandler.isConnected()) { |
| return false; |
| } |
| return kHandler.getRoom(roomId).isRecording(); |
| } |
| |
| void remove(Client c) { |
| for (StreamDesc sd : c.getStreams()) { |
| AbstractStream s = getByUid(sd.getUid()); |
| if (s != null) { |
| s.release(this); |
| WebSocketHelper.sendRoomOthers(c.getRoomId(), c.getUid(), newKurentoMsg() |
| .put("id", "broadcastStopped") |
| .put("uid", sd.getUid())); |
| } |
| } |
| if (c.getRoomId() != null) { |
| KRoom room = kHandler.getRoom(c.getRoomId()); |
| room.leave(this, c); |
| checkStreams(c.getRoomId()); |
| } |
| } |
| |
| void addStream(KStream stream) { |
| streamByUid.put(stream.getUid(), stream); |
| } |
| |
| Client getBySid(String sid) { |
| return cm.getBySid(sid); |
| } |
| |
| KStream getByUid(String uid) { |
| return uid == null ? null : streamByUid.get(uid); |
| } |
| |
| KurentoHandler getHandler() { |
| return kHandler; |
| } |
| |
| IClientManager getClientManager() { |
| return cm; |
| } |
| |
| RecordingDao getRecordingDao() { |
| return recDao; |
| } |
| |
| @Override |
| public void release(AbstractStream stream) { |
| final String uid = stream.getUid(); |
| Client c = cm.getBySid(stream.getSid()); |
| if (c != null) { |
| StreamDesc sd = c.getStream(uid); |
| c.removeStream(uid); |
| if (StreamType.WEBCAM == sd.getType()) { |
| for (Activity a : sd.getActivities()) { |
| c.remove(a); |
| } |
| } |
| cm.update(c); |
| WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid())); |
| } |
| streamByUid.remove(uid); |
| } |
| |
| @Override |
| public void destroy() { |
| streamByUid.clear(); |
| } |
| } |