blob: 5409e243fb5ce99463e099fbd1185a215ddfb652 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License") + you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openmeetings.mediaserver.remote;
import static java.util.UUID.randomUUID;
import static org.apache.openmeetings.db.util.ApplicationHelper.ensureApplication;
import static org.apache.openmeetings.mediaserver.remote.KurentoHandler.PARAM_ICE;
import static org.apache.openmeetings.mediaserver.remote.KurentoHandler.newKurentoMsg;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.openmeetings.IApplication;
import org.apache.openmeetings.core.util.WebSocketHelper;
import org.apache.openmeetings.db.dao.record.RecordingDao;
import org.apache.openmeetings.db.entity.basic.Client;
import org.apache.openmeetings.db.entity.basic.Client.Activity;
import org.apache.openmeetings.db.entity.basic.Client.StreamDesc;
import org.apache.openmeetings.db.entity.basic.Client.StreamType;
import org.apache.openmeetings.db.entity.file.BaseFileItem;
import org.apache.openmeetings.db.entity.record.Recording;
import org.apache.openmeetings.db.entity.room.Room;
import org.apache.openmeetings.db.entity.user.User;
import org.apache.openmeetings.db.manager.IClientManager;
import org.apache.openmeetings.db.util.FormatHelper;
import org.apache.openmeetings.db.util.ws.RoomMessage;
import org.apache.openmeetings.db.util.ws.TextRoomMessage;
import org.apache.wicket.injection.Injector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.openjson.JSONObject;
/**
* Dynamically created object representing a conference room on the MediaServer
*
*/
public class KRoom {
private static final Logger log = LoggerFactory.getLogger(KRoom.class);
@Inject
private KurentoHandler kHandler;
@Inject
private StreamProcessor processor;
@Inject
private RecordingDao recDao;
@Inject
private IClientManager cm;
private final Room room;
private final AtomicBoolean recordingStarted = new AtomicBoolean(false);
private final AtomicBoolean sharingStarted = new AtomicBoolean(false);
private Long recordingId = null;
private long sipCount = 0;
private JSONObject recordingUser = new JSONObject();
private JSONObject sharingUser = new JSONObject();
public KRoom(Room r) {
this.room = r;
Injector.get().inject(this);
log.info("ROOM {} has been created", room.getId());
}
public Room getRoom() {
return room;
}
public Long getRecordingId() {
return recordingId;
}
public KStream join(final StreamDesc sd) {
log.info("ROOM {}: join client {}, stream: {}", room.getId(), sd.getClient(), sd.getUid());
final KStream stream = new KStream(sd, this);
processor.addStream(stream);
return stream;
}
public void onStopBroadcast(KStream stream) {
processor.release(stream, true);
WebSocketHelper.sendAll(newKurentoMsg()
.put("id", "broadcastStopped")
.put("uid", stream.getUid())
.toString()
);
}
public boolean isRecording() {
return recordingStarted.get();
}
public JSONObject getRecordingUser() {
return new JSONObject(recordingUser.toString());
}
public void startRecording(Client c) {
if (recordingStarted.compareAndSet(false, true)) {
IApplication app = ensureApplication(c.getUser().getLanguageId());
log.debug("##REC:: recording in room {} is starting ::", room.getId());
Room r = c.getRoom();
boolean interview = Room.Type.INTERVIEW == r.getType();
Date now = new Date();
Recording rec = new Recording();
rec.setHash(randomUUID().toString());
final FastDateFormat fdf = FormatHelper.getDateTimeFormat(c.getUser());
rec.setName(app.getOmString(interview ? "file.name.interview" : "file.name.recording", c.getUser().getLanguageId())
+ fdf.format(new Date()));
User u = c.getUser();
recordingUser.put("login", u.getLogin());
recordingUser.put("firstName", u.getFirstname());
recordingUser.put("lastName", u.getLastname());
recordingUser.put("started", now.getTime());
Long ownerId = User.Type.CONTACT == u.getType() ? u.getOwnerId() : u.getId();
rec.setInsertedBy(ownerId);
rec.setType(BaseFileItem.Type.RECORDING);
rec.setInterview(interview);
rec.setRoomId(room.getId());
rec.setRecordStart(now);
rec.setOwnerId(ownerId);
rec.setStatus(Recording.Status.RECORDING);
log.debug("##REC:: recording created by USER: {}", ownerId);
Optional<StreamDesc> osd = c.getScreenStream();
if (osd.isPresent()) {
osd.get().addActivity(Activity.RECORD);
cm.update(c);
rec.setWidth(osd.get().getWidth());
rec.setHeight(osd.get().getHeight());
}
rec = recDao.update(rec);
// Receive recordingId
recordingId = rec.getId();
processor.getByRoom(room.getId()).forEach(KStream::startRecord);
// Send notification to all users that the recording has been started
WebSocketHelper.sendRoom(new RoomMessage(room.getId(), u, RoomMessage.Type.RECORDING_TOGGLED));
log.debug("##REC:: recording in room {} is started {} ::", room.getId(), recordingId);
}
}
public void stopRecording(Client c) {
if (recordingStarted.compareAndSet(true, false)) {
log.debug("##REC:: recording in room {} is stopping {} ::", room.getId(), recordingId);
processor.getByRoom(room.getId()).forEach(KStream::stopRecord);
Recording rec = recDao.get(recordingId);
rec.setRecordEnd(new Date());
rec = recDao.update(rec);
recordingUser = new JSONObject();
recordingId = null;
processor.startConvertion(rec);
User u;
if (c == null) {
u = new User();
} else {
u = c.getUser();
Optional<StreamDesc> osd = c.getScreenStream();
if (osd.isPresent()) {
osd.get().removeActivity(Activity.RECORD);
cm.update(c);
kHandler.sendShareUpdated(osd.get());
}
}
// Send notification to all users that the recording has been started
WebSocketHelper.sendRoom(new RoomMessage(room.getId(), u, RoomMessage.Type.RECORDING_TOGGLED));
log.debug("##REC:: recording in room {} is stopped ::", room.getId());
}
}
/**
* This method will return true, even if the sharing is not enabled. But just recording.
* Cause in order to record you need to have a Screensharing enabled. Doesn't mean that other
* users see that screenshare yet (permissions have not been granted).
*
* @return
*/
public boolean isSharing() {
return sharingStarted.get();
}
public JSONObject getSharingUser() {
return new JSONObject(sharingUser.toString());
}
public void startSharing(Client c, Optional<StreamDesc> osd, JSONObject msg, Activity a) {
StreamDesc sd;
if (sharingStarted.compareAndSet(false, true)) {
sharingUser.put("sid", c.getSid());
sd = c.addStream(StreamType.SCREEN, a);
cm.update(c);
log.debug("Stream.UID {}: sharing has been started, activity: {}", sd.getUid(), a);
kHandler.sendClient(sd.getSid(), newKurentoMsg()
.put("id", "broadcast")
.put("stream", sd.toJson()
.put("shareType", msg.getString("shareType"))
.put("fps", msg.getString("fps")))
.put(PARAM_ICE, kHandler.getTurnServers(c)));
} else if (osd.isPresent() && !osd.get().hasActivity(a)) {
sd = osd.get();
sd.addActivity(a);
cm.update(c);
kHandler.sendShareUpdated(sd);
WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.RIGHT_UPDATED, c.getUid()));
WebSocketHelper.sendRoomOthers(room.getId(), c.getUid(), newKurentoMsg()
.put("id", "newStream")
.put(PARAM_ICE, kHandler.getTurnServers(c))
.put("stream", sd.toJson()));
}
}
public void stopSharing() {
if (sharingStarted.compareAndSet(true, false)) {
sharingUser = new JSONObject();
}
}
public void close() {
processor.getByRoom(room.getId()).forEach(KStream::release);
log.debug("Room {} closed", room.getId());
}
public void updateSipCount(final long count) {
if (count != sipCount) {
processor.getByRoom(room.getId()).forEach(stream -> stream.addSipProcessor(count));
if (sipCount == 0) {
cm.streamByRoom(room.getId())
.filter(Client::isSip)
.findAny()
.ifPresent(c -> {
StreamDesc sd = c.addStream(StreamType.WEBCAM, Activity.AUDIO);
sd.setWidth(120).setHeight(90);
c.restoreActivities(sd);
KStream stream = join(sd);
stream.startBroadcast(sd, "", () -> {});
cm.update(c);
});
}
sipCount = count;
}
}
public long getSipCount() {
return sipCount;
}
}