| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License") + you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.openmeetings.web.app; |
| |
| import static org.apache.openmeetings.core.util.WebSocketHelper.sendRoom; |
| import static org.apache.openmeetings.web.app.WebSession.getUserId; |
| import static org.apache.openmeetings.web.pages.auth.SignInPage.TOKEN_PARAM; |
| |
| import java.io.Serializable; |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.function.Function; |
| import java.util.stream.Stream; |
| |
| import org.apache.openmeetings.db.dao.log.ConferenceLogDao; |
| import org.apache.openmeetings.db.entity.basic.Client; |
| import org.apache.openmeetings.db.entity.log.ConferenceLog; |
| import org.apache.openmeetings.db.entity.room.Room; |
| import org.apache.openmeetings.db.manager.IClientManager; |
| import org.apache.openmeetings.db.util.ws.RoomMessage; |
| import org.apache.openmeetings.db.util.ws.TextRoomMessage; |
| import org.apache.openmeetings.mediaserver.remote.KurentoHandler; |
| import org.apache.openmeetings.web.pages.auth.SignInPage; |
| import org.apache.wicket.request.mapper.parameter.PageParameters; |
| import org.apache.wicket.util.string.StringValue; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.beans.factory.annotation.Autowired; |
| import org.springframework.stereotype.Component; |
| |
| import com.hazelcast.core.EntryEvent; |
| import com.hazelcast.map.IMap; |
| import com.hazelcast.map.listener.EntryAddedListener; |
| import com.hazelcast.map.listener.EntryRemovedListener; |
| import com.hazelcast.map.listener.EntryUpdatedListener; |
| import com.hazelcast.query.Predicates; |
| |
| @Component |
| public class ClientManager implements IClientManager { |
| private static final Logger log = LoggerFactory.getLogger(ClientManager.class); |
| private static final String ROOMS_KEY = "ROOMS_KEY"; |
| private static final String ONLINE_USERS_KEY = "ONLINE_USERS_KEY"; |
| private static final String SERVERS_KEY = "SERVERS_KEY"; |
| private static final String INSTANT_TOKENS_KEY = "INSTANT_TOKENS_KEY"; |
| private static final String UID_BY_SID_KEY = "UID_BY_SID_KEY"; |
| private final Map<String, Client> onlineClients = new ConcurrentHashMap<>(); |
| private final Map<Long, Set<String>> onlineRooms = new ConcurrentHashMap<>(); |
| private final Map<String, ServerInfo> onlineServers = new ConcurrentHashMap<>(); |
| |
| @Autowired |
| private ConferenceLogDao confLogDao; |
| @Autowired |
| private Application app; |
| @Autowired |
| private KurentoHandler kHandler; |
| @Autowired |
| private TimerService timerService; |
| |
| private IMap<String, Client> map() { |
| return app.hazelcast.getMap(ONLINE_USERS_KEY); |
| } |
| |
| private Map<String, String> mapBySid() { |
| return app.hazelcast.getMap(UID_BY_SID_KEY); |
| } |
| |
| private IMap<Long, Set<String>> rooms() { |
| return app.hazelcast.getMap(ROOMS_KEY); |
| } |
| |
| private IMap<String, ServerInfo> servers() { |
| return app.hazelcast.getMap(SERVERS_KEY); |
| } |
| |
| private IMap<String, InstantToken> tokens() { |
| return app.hazelcast.getMap(INSTANT_TOKENS_KEY); |
| } |
| |
| void init() { |
| log.debug("Cluster:: PostConstruct"); |
| onlineClients.putAll(map()); |
| onlineRooms.putAll(rooms()); |
| onlineServers.putAll(servers()); |
| map().addEntryListener(new ClientListener(), true); |
| rooms().addEntryListener(new RoomListener(), true); |
| servers().addEntryListener((EntryUpdatedListener<String, ServerInfo>)(event -> { |
| log.debug("Cluster:: Server was updated {} -> {}", event.getKey(), event.getValue()); |
| onlineServers.put(event.getKey(), event.getValue()); |
| }), true); |
| } |
| |
| public void add(Client c) { |
| confLogDao.add( |
| ConferenceLog.Type.CLIENT_CONNECT |
| , c.getUserId(), "0", null |
| , c.getRemoteAddress() |
| , ""); |
| log.debug("Adding online client: {}, room: {}", c.getUid(), c.getRoom()); |
| c.setServerId(Application.get().getServerId()); |
| map().put(c.getUid(), c); |
| onlineClients.put(c.getUid(), c); |
| mapBySid().put(c.getSid(), c.getUid()); |
| } |
| |
| @Override |
| public Client update(Client c) { |
| map().put(c.getUid(), c); |
| synchronized (onlineClients) { |
| onlineClients.get(c.getUid()).merge(c); |
| } |
| return c; |
| } |
| |
| @Override |
| public Client get(String uid) { |
| return uid == null ? null : onlineClients.get(uid); |
| } |
| |
| @Override |
| public Client getBySid(String sid) { |
| if (sid == null) { |
| return null; |
| } |
| String uid = mapBySid().get(sid); |
| return uid == null ? null : get(uid); |
| } |
| |
| @Override |
| public String uidBySid(String sid) { |
| if (sid == null) { |
| return null; |
| } |
| return mapBySid().get(sid); |
| } |
| |
| public void exitRoom(Client c) { |
| exitRoom(c, true); |
| } |
| |
| public void exitRoom(Client c, boolean update) { |
| Long roomId = c.getRoomId(); |
| log.debug("Removing online room client: {}, room: {}", c.getUid(), roomId); |
| if (roomId != null) { |
| IMap<Long, Set<String>> rooms = rooms(); |
| rooms.lock(roomId); |
| Set<String> clients = rooms.getOrDefault(roomId, ConcurrentHashMap.newKeySet()); |
| clients.remove(c.getUid()); |
| rooms.put(roomId, clients); |
| onlineRooms.put(roomId, clients); |
| rooms.unlock(roomId); |
| if (clients.isEmpty()) { |
| String serverId = c.getServerId(); |
| IMap<String, ServerInfo> servers = servers(); |
| servers.lock(serverId); |
| ServerInfo si = servers.get(serverId); |
| si.remove(c.getRoom()); |
| servers.put(serverId, si); |
| onlineServers.put(serverId, si); |
| servers.unlock(serverId); |
| } |
| kHandler.leaveRoom(c); |
| c.setRoom(null); |
| c.clear(); |
| if (update) { |
| update(c); |
| } |
| |
| sendRoom(new TextRoomMessage(roomId, c, RoomMessage.Type.ROOM_EXIT, c.getUid())); |
| confLogDao.add( |
| ConferenceLog.Type.ROOM_LEAVE |
| , c.getUserId(), "0", roomId |
| , c.getRemoteAddress() |
| , String.valueOf(roomId)); |
| } |
| } |
| |
| @Override |
| public void exit(Client c) { |
| if (c != null) { |
| confLogDao.add( |
| ConferenceLog.Type.CLIENT_DISCONNECT |
| , c.getUserId(), "0", null |
| , c.getRemoteAddress() |
| , ""); |
| exitRoom(c, false); |
| kHandler.remove(c); |
| log.debug("Removing online client: {}, roomId: {}", c.getUid(), c.getRoomId()); |
| map().remove(c.getUid()); |
| onlineClients.remove(c.getUid()); |
| mapBySid().remove(c.getSid()); |
| } |
| } |
| |
| public void serverAdded(String serverId, String url) { |
| onlineServers.computeIfAbsent(serverId, id -> { |
| ServerInfo si = new ServerInfo(url); |
| servers().put(id, si); |
| log.debug("Cluster:: server with id '{}' was added", id); |
| return si; |
| }); |
| } |
| |
| public void serverRemoved(String serverId) { |
| Map<String, Client> clients = map(); |
| for (Map.Entry<String, Client> e : clients.entrySet()) { |
| if (serverId.equals(e.getValue().getServerId())) { |
| exit(e.getValue()); |
| } |
| } |
| log.debug("Cluster:: server with id '{}' was removed", serverId); |
| servers().remove(serverId); |
| onlineServers.remove(serverId); |
| } |
| |
| /** |
| * This method will return count of users in room _after_ adding |
| * |
| * @param c - client to be added to the room |
| * @return count of users in room _after_ adding |
| */ |
| public int addToRoom(Client c) { |
| Room r = c.getRoom(); |
| Long roomId = r.getId(); |
| confLogDao.add( |
| ConferenceLog.Type.ROOM_ENTER |
| , c.getUserId(), "0", roomId |
| , c.getRemoteAddress() |
| , String.valueOf(roomId)); |
| log.debug("Adding online room client: {}, room: {}", c.getUid(), roomId); |
| IMap<Long, Set<String>> rooms = rooms(); |
| rooms.lock(roomId); |
| Set<String> set = rooms.getOrDefault(roomId, ConcurrentHashMap.newKeySet()); |
| set.add(c.getUid()); |
| final int count = set.size(); |
| rooms.put(roomId, set); |
| onlineRooms.put(roomId, set); |
| rooms.unlock(roomId); |
| String serverId = c.getServerId(); |
| addRoomToServer(serverId, r); |
| update(c); |
| timerService.scheduleSipCheck(r); |
| return count; |
| } |
| |
| private void addRoomToServer(String serverId, Room r) { |
| if (!onlineServers.get(serverId).getRooms().contains(r.getId())) { |
| log.debug("Cluster:: room {} was not found for server '{}', adding ...", r.getId(), serverId); |
| IMap<String, ServerInfo> servers = servers(); |
| servers.lock(serverId); |
| ServerInfo si = servers.get(serverId); |
| si.add(r); |
| servers.put(serverId, si); |
| onlineServers.put(serverId, si); |
| servers.unlock(serverId); |
| } |
| } |
| |
| public boolean isOnline(Long userId) { |
| boolean isUserOnline = false; |
| for (Map.Entry<String, Client> e : map().entrySet()) { |
| if (e.getValue().sameUserId(userId)) { |
| isUserOnline = true; |
| break; |
| } |
| } |
| return isUserOnline; |
| } |
| |
| @Override |
| public Stream<Client> stream() { |
| return map().values().stream(); |
| } |
| |
| @Override |
| public Collection<Client> listByUser(Long userId) { |
| return map().values(Predicates.equal("userId", userId)); |
| } |
| |
| @Override |
| public Stream<Client> streamByRoom(Long roomId) { |
| return Optional.ofNullable(roomId) |
| .map(id -> onlineRooms.getOrDefault(id, Set.of())) |
| .stream() |
| .flatMap(Set::stream) |
| .map(this::get) |
| .filter(Objects::nonNull); |
| } |
| |
| public boolean isInRoom(long roomId, long userId) { |
| return Optional.of(roomId) |
| .map(id -> onlineRooms.getOrDefault(id, Set.of())) |
| .stream() |
| .flatMap(Set::stream) |
| .map(this::get) |
| .anyMatch(c -> c != null && c.sameUserId(userId)); |
| } |
| |
| private List<Client> getByKeys(Long userId, String sessionId) { |
| return map().values().stream() |
| .filter(c -> c.sameUserId(userId) && c.getSessionId().equals(sessionId)) |
| .toList(); |
| } |
| |
| public void invalidate(Long userId, String sessionId) { |
| for (Client c : getByKeys(userId, sessionId)) { |
| Map<String, String> invalid = Application.get().getInvalidSessions(); |
| invalid.putIfAbsent(sessionId, c.getUid()); |
| exit(c); |
| } |
| } |
| |
| private String getServerUrl(Map.Entry<String, ServerInfo> e, Room r, Function<String, String> generator) { |
| final String curServerId = app.getServerId(); |
| String serverId = e.getKey(); |
| if (!curServerId.equals(serverId)) { |
| addRoomToServer(serverId, r); |
| return generator.apply(e.getValue().getUrl()); |
| } |
| return null; |
| } |
| |
| public String getServerUrl(Room r, Function<String, String> inGenerator) { |
| if (onlineServers.size() == 1) { |
| log.debug("Cluster:: The only server found"); |
| return null; |
| } |
| Function<String, String> generator = inGenerator == null ? baseUrl -> { |
| String uuid = UUID.randomUUID().toString(); |
| tokens().put(uuid, new InstantToken(getUserId(), r.getId())); |
| return Application.urlForPage(SignInPage.class, new PageParameters().add(TOKEN_PARAM, uuid), baseUrl); |
| } : inGenerator; |
| Optional<Map.Entry<String, ServerInfo>> existing = onlineServers.entrySet().stream() |
| .filter(e -> e.getValue().getRooms().contains(r.getId())) |
| .findFirst(); |
| if (existing.isPresent()) { |
| return getServerUrl(existing.get(), r, generator); |
| } |
| Optional<Map.Entry<String, ServerInfo>> min = onlineServers.entrySet().stream() |
| .min((e1, e2) -> e1.getValue().getCapacity() - e2.getValue().getCapacity()); |
| return getServerUrl(min.get(), r, generator); |
| } |
| |
| Optional<InstantToken> getToken(StringValue uuid) { |
| log.debug("Cluster:: Checking token {}, full list: {}", uuid, tokens().entrySet()); |
| return uuid.isEmpty() ? Optional.empty() : Optional.ofNullable(tokens().remove(uuid.toString())); |
| } |
| |
| public class ClientListener implements |
| EntryAddedListener<String, Client> |
| , EntryUpdatedListener<String, Client> |
| , EntryRemovedListener<String, Client> |
| { |
| private void process(EntryEvent<String, Client> event, boolean shouldAdd) { |
| if (event.getMember().localMember()) { |
| return; |
| } |
| final String uid = event.getKey(); |
| synchronized (onlineClients) { |
| if (onlineClients.containsKey(uid)) { |
| onlineClients.get(uid).merge(event.getValue()); |
| } else if (shouldAdd) { |
| onlineClients.put(uid, event.getValue()); |
| } |
| } |
| } |
| |
| @Override |
| public void entryAdded(EntryEvent<String, Client> event) { |
| process(event, true); |
| } |
| |
| @Override |
| public void entryUpdated(EntryEvent<String, Client> event) { |
| process(event, false); |
| } |
| |
| @Override |
| public void entryRemoved(EntryEvent<String, Client> event) { |
| log.trace("ClientListener::Remove"); |
| onlineClients.remove(event.getKey()); |
| } |
| } |
| |
| public class RoomListener implements |
| EntryAddedListener<Long, Set<String>> |
| , EntryUpdatedListener<Long, Set<String>> |
| , EntryRemovedListener<Long, Set<String>> |
| { |
| @Override |
| public void entryAdded(EntryEvent<Long, Set<String>> event) { |
| log.trace("RoomListener::Add"); |
| onlineRooms.put(event.getKey(), event.getValue()); |
| } |
| |
| @Override |
| public void entryUpdated(EntryEvent<Long, Set<String>> event) { |
| log.trace("RoomListener::Update"); |
| onlineRooms.put(event.getKey(), event.getValue()); |
| } |
| |
| @Override |
| public void entryRemoved(EntryEvent<Long, Set<String>> event) { |
| log.trace("RoomListener::Remove"); |
| onlineRooms.remove(event.getKey(), event.getValue()); |
| } |
| } |
| |
| private static class ServerInfo implements Serializable { |
| private static final long serialVersionUID = 1L; |
| private int capacity = 0; |
| private final String url; |
| private final Set<Long> rooms = new HashSet<>(); |
| |
| public ServerInfo(String url) { |
| this.url = url; |
| } |
| |
| public void add(Room r) { |
| if (rooms.add(r.getId())) { |
| log.debug("Cluster:: room {} is added to server, whole list {}", r.getId(), rooms); |
| capacity += r.getCapacity(); |
| } |
| } |
| |
| public void remove(Room r) { |
| if (rooms.remove(r.getId())) { |
| log.debug("Cluster:: room {} is removed from server, whole list {}", r.getId(), rooms); |
| capacity -= r.getCapacity(); |
| } |
| } |
| |
| public String getUrl() { |
| return url; |
| } |
| |
| public int getCapacity() { |
| return capacity; |
| } |
| |
| public Set<Long> getRooms() { |
| return rooms; |
| } |
| |
| @Override |
| public String toString() { |
| return "ServerInfo[rooms: " + rooms + "]"; |
| } |
| } |
| |
| public static class InstantToken implements Serializable { |
| private static final long serialVersionUID = 1L; |
| private final long userId; |
| private final long roomId; |
| private final long created; |
| |
| InstantToken(long userId, long roomId) { |
| this.userId = userId; |
| this.roomId = roomId; |
| created = System.currentTimeMillis(); |
| } |
| |
| public long getUserId() { |
| return userId; |
| } |
| |
| public long getRoomId() { |
| return roomId; |
| } |
| } |
| } |