blob: 447209c9ccf6e7ec5f0e43ea1a3a851c177cda67 [file] [log] [blame]
/*
* (C) Copyright 2014 Kurento (http://kurento.org/)
*/
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License") + you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openmeetings.mediaserver.remote;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.CompletableFuture.delayedExecutor;
import static org.apache.openmeetings.mediaserver.remote.KurentoHandler.PARAM_CANDIDATE;
import static org.apache.openmeetings.mediaserver.remote.KurentoHandler.PARAM_ICE;
import static org.apache.openmeetings.mediaserver.remote.KurentoHandler.TAG_ROOM;
import static org.apache.openmeetings.mediaserver.remote.KurentoHandler.TAG_STREAM_UID;
import static org.apache.openmeetings.mediaserver.remote.KurentoHandler.getFlowoutTimeout;
import static org.apache.openmeetings.mediaserver.remote.KurentoHandler.newKurentoMsg;
import static org.apache.openmeetings.util.OmFileHelper.getRecUri;
import static org.apache.openmeetings.util.OmFileHelper.getRecordingChunk;
import java.util.Date;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.inject.Inject;
import org.apache.openmeetings.core.sip.ISipCallbacks;
import org.apache.openmeetings.core.sip.SipManager;
import org.apache.openmeetings.core.sip.SipStackProcessor;
import org.apache.openmeetings.core.util.WebSocketHelper;
import org.apache.openmeetings.db.dao.record.RecordingChunkDao;
import org.apache.openmeetings.db.entity.basic.Client;
import org.apache.openmeetings.db.entity.basic.Client.Activity;
import org.apache.openmeetings.db.entity.basic.Client.StreamDesc;
import org.apache.openmeetings.db.entity.basic.Client.StreamType;
import org.apache.openmeetings.db.entity.record.RecordingChunk.Type;
import org.apache.openmeetings.db.util.ws.RoomMessage;
import org.apache.openmeetings.db.util.ws.TextRoomMessage;
import org.apache.openmeetings.util.OmFileHelper;
import org.apache.wicket.injection.Injector;
import org.kurento.client.BaseRtpEndpoint;
import org.kurento.client.Continuation;
import org.kurento.client.IceCandidate;
import org.kurento.client.ListenerSubscription;
import org.kurento.client.MediaFlowState;
import org.kurento.client.MediaObject;
import org.kurento.client.MediaPipeline;
import org.kurento.client.MediaProfileSpecType;
import org.kurento.client.MediaType;
import org.kurento.client.RecorderEndpoint;
import org.kurento.client.RtpEndpoint;
import org.kurento.client.WebRtcEndpoint;
import org.kurento.jsonrpc.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.openjson.JSONObject;
public class KStream extends AbstractStream implements ISipCallbacks {
private static final Logger log = LoggerFactory.getLogger(KStream.class);
@Inject
private KurentoHandler kHandler;
@Inject
private StreamProcessor processor;
@Inject
private RecordingChunkDao chunkDao;
@Inject
private SipManager sipManager;
private final KRoom kRoom;
private final Date connectedSince;
private final StreamType streamType;
private MediaProfileSpecType profile;
private MediaPipeline pipeline;
private RecorderEndpoint recorder;
private BaseRtpEndpoint outgoingMedia = null;
private Queue<IceCandidate> candidatesQueue = new ConcurrentLinkedQueue<>();
private RtpEndpoint rtpEndpoint;
private Optional<SipStackProcessor> sipProcessor = Optional.empty();
private final Map<String, WebRtcEndpoint> listeners = new ConcurrentHashMap<>();
private Optional<CompletableFuture<Object>> flowoutFuture = Optional.empty();
private ListenerSubscription flowoutSubscription;
private Long chunkId;
private Type type;
private boolean hasAudio;
private boolean hasVideo;
private boolean hasScreen;
private boolean sipClient;
public KStream(final StreamDesc sd, KRoom kRoom) {
super(sd.getSid(), sd.getUid());
this.kRoom = kRoom;
streamType = sd.getType();
this.connectedSince = new Date();
Injector.get().inject(this);
//TODO Min/MaxVideoSendBandwidth
//TODO Min/Max Audio/Video RecvBandwidth
}
public void startBroadcast(final StreamDesc sd, final String sdpOffer, Runnable then) {
if (outgoingMedia != null) {
release(false);
}
hasAudio = sd.hasActivity(Activity.AUDIO);
hasVideo = sd.hasActivity(Activity.VIDEO);
hasScreen = sd.hasActivity(Activity.SCREEN);
sipClient = OmFileHelper.SIP_USER_ID.equals(sd.getClient().getUserId());
if ((sdpOffer.indexOf("m=audio") > -1 && !hasAudio)
|| (sdpOffer.indexOf("m=video") > -1 && !hasVideo && StreamType.SCREEN != streamType))
{
log.warn("Broadcast started without enough rights, sid {}, uid {}", sid, uid);
return;
}
if (StreamType.SCREEN == streamType) {
type = Type.SCREEN;
} else {
if (hasAudio && hasVideo) {
type = Type.AUDIO_VIDEO;
} else if (hasVideo) {
type = Type.VIDEO_ONLY;
} else {
type = Type.AUDIO_ONLY;
}
}
switch (type) {
case AUDIO_VIDEO:
profile = MediaProfileSpecType.WEBM;
break;
case AUDIO_ONLY:
profile = MediaProfileSpecType.WEBM_AUDIO_ONLY;
break;
case SCREEN, VIDEO_ONLY:
default:
profile = MediaProfileSpecType.WEBM_VIDEO_ONLY;
break;
}
pipeline = kHandler.createPipiline(Map.of(TAG_ROOM, String.valueOf(getRoomId()), TAG_STREAM_UID, sd.getUid()), new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
if (sipClient) {
addSipProcessor(1);
} else {
outgoingMedia = createEndpoint(sd.getSid(), sd.getUid(), true);
internalStartBroadcast(sd, sdpOffer);
notifyOnNewStream(sd);
}
then.run();
}
@Override
public void onError(Throwable cause) throws Exception {
log.warn("Unable to create pipeline {}", KStream.this.uid, cause);
}
});
}
/**
* Invoked in case stream stops to decide on if this stream is worth stopping.
*
* Stop broadcast in case:
* - If mediaType is anything other then MediaType.AUDIO
* - If type Audio stop in case it has no Video attached
*
* @param mediaType the MediaType that stopped flowing
* @return true in case this stream should be dropped
*/
protected boolean checkFlowOutEventForStopping(MediaType mediaType) {
return MediaType.AUDIO != mediaType || !hasVideo;
}
private void internalStartBroadcast(final StreamDesc sd, final String sdpOffer) {
outgoingMedia.addMediaSessionTerminatedListener(evt -> log.warn("Media stream terminated {}", sd));
flowoutSubscription = outgoingMedia.addMediaFlowOutStateChangeListener(evt -> {
log.info("Media Flow OUT STATE :: {}, mediaType {}, source {}, sid {}, uid {}"
, evt.getState(), evt.getMediaType(), evt.getSource(), sid, uid);
if (MediaFlowState.NOT_FLOWING == evt.getState()
&& checkFlowOutEventForStopping(evt.getMediaType())) {
log.warn("FlowOut Future is created, sid {}, uid {}", sid, uid);
flowoutFuture = Optional.of(new CompletableFuture<>().completeAsync(() -> {
log.warn("KStream will be dropped {}, sid {}, uid {}", sd, sid, uid);
if (StreamType.SCREEN == streamType) {
processor.doStopSharing(sid, uid);
}
stopBroadcast();
return null;
}, delayedExecutor(getFlowoutTimeout(), TimeUnit.SECONDS)));
} else {
dropFlowoutFuture();
}
});
outgoingMedia.addMediaFlowInStateChangeListener(evt -> log.warn("Media Flow IN :: {}, {}, {}, sid {}, uid {}"
, evt.getState(), evt.getMediaType(), evt.getSource(), sid, uid));
if (!sipClient) {
addListener(sd.getSid(), sd.getUid(), sdpOffer);
addSipProcessor(kRoom.getSipCount());
}
if (kRoom.isRecording()) {
startRecord();
}
}
private void notifyOnNewStream(final StreamDesc sd) {
Client c = sd.getClient();
WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.RIGHT_UPDATED, c.getUid()));
if (hasAudio || hasVideo || hasScreen) {
WebSocketHelper.sendRoomOthers(getRoomId(), c.getUid(), newKurentoMsg()
.put("id", "newStream")
.put(PARAM_ICE, kHandler.getTurnServers(c))
.put("stream", sd.toJson()));
}
}
public void broadcastRestarted() {
if (outgoingMedia != null && flowoutSubscription != null) {
outgoingMedia.removeMediaFlowOutStateChangeListener(flowoutSubscription);
}
dropFlowoutFuture();
}
private void dropFlowoutFuture() {
flowoutFuture.ifPresent(f -> {
log.warn("FlowOut Future is canceled");
f.cancel(true);
flowoutFuture = Optional.empty();
});
}
public void addListener(String sid, String uid, String sdpOffer) {
final boolean self = uid.equals(this.uid);
log.info("USER: have started, sid {}, uid {}, mode {} in kRoom {}", sid, uid, self ? "broadcasting" : "receiving", getRoomId());
log.trace("USER {}: SdpOffer is {}", uid, sdpOffer);
if (!self && outgoingMedia == null) {
log.warn("Trying to add listener too early, sid {}, uid {}", sid, uid);
return;
}
final BaseRtpEndpoint endpoint = getEndpointForUser(sid, uid);
final String sdpAnswer = endpoint.processOffer(sdpOffer);
if (endpoint instanceof WebRtcEndpoint rtcEndpoint) {
log.debug("gather candidates, sid {}, uid {}", sid, uid);
rtcEndpoint.gatherCandidates(); // this one might throw Exception
}
log.trace("USER {}: SdpAnswer is {}", this.uid, sdpAnswer);
kHandler.sendClient(sid, newKurentoMsg()
.put("id", "videoResponse")
.put("uid", this.uid)
.put("sdpAnswer", sdpAnswer));
}
private BaseRtpEndpoint getEndpointForUser(String sid, String uid) {
if (uid.equals(this.uid)) {
log.debug("PARTICIPANT {}: configuring loopback", this.uid);
return outgoingMedia;
}
log.debug("PARTICIPANT {}: receiving video from {}", uid, this.uid);
WebRtcEndpoint listener = listeners.remove(uid);
if (listener != null) {
log.debug("PARTICIPANT {}: re-started video receiving, will drop previous endpoint", uid);
listener.release();
}
log.debug("PARTICIPANT {}: creating new endpoint for {}", uid, this.uid);
listener = createEndpoint(sid, uid, false);
listeners.put(uid, listener);
log.debug("PARTICIPANT {}: obtained endpoint for {}", uid, this.uid);
Client cur = processor.getBySid(this.sid);
if (cur == null) {
log.warn("Client for endpoint dooesn't exists");
} else {
StreamDesc sd = cur.getStream(this.uid);
if (sd == null) {
log.warn("Stream for endpoint dooesn't exists");
} else {
if (sd.hasActivity(Activity.AUDIO)) {
outgoingMedia.connect(listener, MediaType.AUDIO);
}
if (StreamType.SCREEN == streamType || sd.hasActivity(Activity.VIDEO)) {
outgoingMedia.connect(listener, MediaType.VIDEO);
}
}
}
return listener;
}
private void setTags(MediaObject endpoint, String uid) {
endpoint.addTag("outUid", this.uid);
endpoint.addTag("uid", uid);
}
private RtpEndpoint getRtpEndpoint(MediaPipeline pipeline) {
RtpEndpoint endpoint = new RtpEndpoint.Builder(pipeline).build();
setTags(endpoint, uid);
return endpoint;
}
private WebRtcEndpoint createEndpoint(String sid, String uid, boolean recv) {
WebRtcEndpoint endpoint = createWebRtcEndpoint(pipeline, recv, kHandler.getCertificateType());
setTags(endpoint, uid);
reApplyIceCandiates(endpoint, recv);
endpoint.addIceCandidateFoundListener(evt -> kHandler.sendClient(sid
, newKurentoMsg()
.put("id", "iceCandidate")
.put("uid", KStream.this.uid)
.put(PARAM_CANDIDATE, convert(JsonUtils.toJsonObject(evt.getCandidate()))))
);
return endpoint;
}
private void reApplyIceCandiates(WebRtcEndpoint endpoint, boolean recv) {
// sender candidates
if (recv && !candidatesQueue.isEmpty()) {
log.trace("addIceCandidate iceCandidate reply from not ready, uid: {}", uid);
candidatesQueue.stream().forEach(endpoint::addIceCandidate);
candidatesQueue.clear();
}
}
public void startRecord() {
log.debug("startRecord outMedia OK ? {}", outgoingMedia != null);
if (outgoingMedia == null) {
release(true);
return;
}
final String chunkUid = "rec_" + kRoom.getRecordingId() + "_" + randomUUID();
recorder = createRecorderEndpoint(pipeline, getRecUri(getRecordingChunk(getRoomId(), chunkUid)), profile);
setTags(recorder, uid);
recorder.addRecordingListener(evt -> chunkId = chunkDao.start(kRoom.getRecordingId(), type, chunkUid, sid));
recorder.addStoppedListener(evt -> {
chunkDao.stop(chunkId);
chunkId = null;
});
switch (profile) {
case WEBM:
outgoingMedia.connect(recorder, MediaType.AUDIO);
outgoingMedia.connect(recorder, MediaType.VIDEO);
break;
case WEBM_VIDEO_ONLY:
outgoingMedia.connect(recorder, MediaType.VIDEO);
break;
case WEBM_AUDIO_ONLY:
default:
outgoingMedia.connect(recorder, MediaType.AUDIO);
break;
}
recorder.record(new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.info("Recording started successfully");
}
@Override
public void onError(Throwable cause) throws Exception {
log.error("Failed to start recording", cause);
}
});
}
public void stopRecord() {
stopRecorder(true, () -> {});
}
public void remove(final Client c) {
WebRtcEndpoint point = listeners.remove(c.getUid());
if (point != null) {
point.release();
}
}
public void stopBroadcast() {
kRoom.onStopBroadcast(this);
}
public void pauseSharing() {
releaseListeners();
}
private void releaseListeners() {
log.debug("PARTICIPANT {}: Releasing listeners", uid);
for (Entry<String, WebRtcEndpoint> entry : listeners.entrySet()) {
final String inUid = entry.getKey();
log.trace("PARTICIPANT {}: Released incoming EP for {}", uid, inUid);
final WebRtcEndpoint ep = entry.getValue();
outgoingMedia.disconnect(ep, new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.trace("PARTICIPANT {}: Disconnected successfully incoming EP for {}", KStream.this.uid, inUid);
}
@Override
public void onError(Throwable cause) throws Exception {
log.warn("PARTICIPANT {}: Could not disconnect incoming EP for {}", KStream.this.uid, inUid);
}
});
ep.release(new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.trace("PARTICIPANT {}: Released successfully incoming EP for {}", KStream.this.uid, inUid);
}
@Override
public void onError(Throwable cause) throws Exception {
log.warn("PARTICIPANT {}: Could not release incoming EP for {}", KStream.this.uid, inUid);
}
});
}
listeners.clear();
}
@Override
public void release(boolean remove) {
if (outgoingMedia != null) {
releaseListeners();
stopRecorder(false, () -> {
releaseRtp();
outgoingMedia.release(new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.trace("PARTICIPANT {}: Released successfully", KStream.this.uid);
}
@Override
public void onError(Throwable cause) throws Exception {
log.warn("PARTICIPANT {}: Could not release", KStream.this.uid, cause);
}
});
pipeline.release(new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.trace("PARTICIPANT {}: Released Pipeline", KStream.this.uid);
}
@Override
public void onError(Throwable cause) throws Exception {
log.warn("PARTICIPANT {}: Could not release Pipeline", KStream.this.uid, cause);
}
});
outgoingMedia = null;
doRemove(remove);
});
} else {
doRemove(remove);
}
}
private void doRemove(boolean remove) {
if (remove) {
processor.release(this, false);
}
}
private void releaseRecorder(Runnable then) {
outgoingMedia.disconnect(recorder, new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.trace("PARTICIPANT {}: Recorder disconnected successfully", KStream.this.uid);
}
@Override
public void onError(Throwable cause) throws Exception {
log.warn("PARTICIPANT {}: Could not disconnect recorder", KStream.this.uid, cause);
}
});
recorder.release(new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.trace("PARTICIPANT {}: Recorder released successfully", KStream.this.uid);
}
@Override
public void onError(Throwable cause) throws Exception {
log.warn("PARTICIPANT {}: Could not release recorder", KStream.this.uid, cause);
}
});
recorder = null;
then.run();
}
private void stopRecorder(boolean wait, Runnable then) {
if (recorder != null) {
final Continuation<Void> stop = new Continuation<>() {
@Override
public void onSuccess(Void result) throws Exception {
log.trace("PARTICIPANT {}: Recording stopped", KStream.this.uid);
releaseRecorder(then);
}
@Override
public void onError(Throwable cause) throws Exception {
log.warn("PARTICIPANT {}: Could not stop recording", KStream.this.uid, cause);
releaseRecorder(then);
}
};
if (wait) {
recorder.stopAndWait(stop);
} else {
recorder.stop(stop);
}
} else {
then.run();
}
}
private void releaseRtp() {
if (rtpEndpoint != null) {
rtpEndpoint.release(new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.trace("PARTICIPANT {}: RtpEndpoint released successfully", KStream.this.uid);
}
@Override
public void onError(Throwable cause) throws Exception {
log.warn("PARTICIPANT {}: Could not release RtpEndpoint", KStream.this.uid, cause);
}
});
rtpEndpoint = null;
}
sipProcessor.ifPresent(SipStackProcessor::destroy);
sipProcessor = Optional.empty();
}
public void addIceCandidate(IceCandidate candidate, String uid) {
if (this.uid.equals(uid)) {
if (!(outgoingMedia instanceof WebRtcEndpoint)) {
if (!sipClient) {
log.info("addIceCandidate iceCandidate while not ready yet, uid: {}, candidate: {}", uid, candidate.getCandidate());
candidatesQueue.add(candidate);
}
return;
}
((WebRtcEndpoint)outgoingMedia).addIceCandidate(candidate);
} else {
WebRtcEndpoint endpoint = listeners.get(uid);
log.debug("Add candidate for {}, listener found ? {}", uid, endpoint != null);
if (endpoint != null) {
endpoint.addIceCandidate(candidate);
} else {
log.warn("addIceCandidate iceCandidate could not find endpoint, uid: {}, candidate: {}", uid, candidate.getCandidate());
}
}
}
private static JSONObject convert(com.google.gson.JsonObject o) {
return new JSONObject(o.toString());
}
public Date getConnectedSince() {
return connectedSince;
}
public Long getRoomId() {
return kRoom.getRoom().getId();
}
MediaPipeline getPipeline() {
return pipeline;
}
public StreamType getStreamType() {
return streamType;
}
public MediaProfileSpecType getProfile() {
return profile;
}
public RecorderEndpoint getRecorder() {
return recorder;
}
public Long getChunkId() {
return chunkId;
}
public Type getType() {
return type;
}
public boolean contains(String uid) {
return this.uid.equals(uid) || listeners.containsKey(uid);
}
@Override
public String toString() {
return "KStream [kRoom=" + kRoom + ", streamType=" + streamType + ", profile=" + profile + ", recorder="
+ recorder + ", outgoingMedia=" + outgoingMedia + ", listeners=" + listeners + ", flowoutFuture="
+ flowoutFuture + ", chunkId=" + chunkId + ", type=" + type + ", sid=" + sid + ", uid=" + uid + "]";
}
void addSipProcessor(long count) {
if (count > 0) {
if (sipProcessor.isEmpty()) {
try {
sipProcessor = sipManager.createSipStackProcessor(
randomUUID().toString()
, kRoom.getRoom()
, this);
sipProcessor.ifPresent(SipStackProcessor::register);
} catch (Exception e) {
log.error("Unexpected error while creating SipProcessor", e);
}
}
} else {
if (sipClient) {
release();
} else {
releaseRtp();
}
}
}
@Override
public void onRegisterOk() {
rtpEndpoint = getRtpEndpoint(pipeline);
if (!sipClient) {
if (hasAudio) {
outgoingMedia.connect(rtpEndpoint, MediaType.AUDIO);
}
if (hasVideo) {
outgoingMedia.connect(rtpEndpoint, MediaType.VIDEO);
}
}
sipProcessor.get().invite(kRoom.getRoom(), null);
}
@Override
public void onInviteOk(String sdp, Consumer<String> answerConsumer) {
String answer = rtpEndpoint.processOffer(sdp.replace("a=sendrecv", sipClient ? "a=sendonly" : "a=recvonly"));
answerConsumer.accept(answer);
log.debug(answer);
if (sipClient) {
StreamDesc sd = processor.getBySid(sid).getStream(uid);
try {
outgoingMedia = rtpEndpoint;
internalStartBroadcast(sd, sdp);
notifyOnNewStream(sd);
} catch (Exception e) {
log.error("Unexpected error");
}
}
}
}