blob: 0e8737a8f683c0a6ce190298e76c054bdc18b714 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processor.Processor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WebSocketMessageRouter {
private static final Logger logger = LoggerFactory.getLogger(WebSocketMessageRouter.class);
private final String endpointId;
private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
private volatile Processor processor;
public WebSocketMessageRouter(final String endpointId) {
this.endpointId = endpointId;
}
public synchronized void registerProcessor(final Processor processor) throws WebSocketConfigurationException {
if (this.processor != null) {
throw new WebSocketConfigurationException("Processor " + this.processor + " is already assigned to this router.");
}
this.processor = processor;
}
public boolean isProcessorRegistered(final Processor processor) {
return this.processor != null && this.processor.getIdentifier().equals(processor.getIdentifier());
}
public synchronized void deregisterProcessor(final Processor processor) {
if (!isProcessorRegistered(processor)) {
if (this.processor == null) {
logger.info("Deregister processor {}, do nothing because this router doesn't have registered processor",
new Object[]{processor});
} else {
logger.info("Deregister processor {}, do nothing because this router is assigned to different processor {}",
new Object[]{processor, this.processor});
}
return;
}
this.processor = null;
sessions.keySet().forEach(sessionId -> {
try {
disconnect(sessionId, "Processing has stopped.");
} catch (IOException e) {
logger.warn("Failed to disconnect session {} due to {}", sessionId, e, e);
}
});
}
public void captureSession(final WebSocketSession session) {
final String sessionId = session.getSessionId();
sessions.put(sessionId, session);
if (processor != null && processor instanceof ConnectedListener) {
((ConnectedListener)processor).connected(session);
}
}
public void onWebSocketClose(final String sessionId, final int statusCode, final String reason) {
sessions.remove(sessionId);
}
public void onWebSocketText(final String sessionId, final String message) {
if (processor != null && processor instanceof TextMessageConsumer) {
((TextMessageConsumer)processor).consume(getSessionOrFail(sessionId), message);
}
}
public void onWebSocketBinary(final String sessionId, final byte[] payload, final int offset, final int length) {
if (processor != null && processor instanceof BinaryMessageConsumer) {
((BinaryMessageConsumer)processor).consume(getSessionOrFail(sessionId), payload, offset, length);
}
}
private WebSocketSession getSessionOrFail(final String sessionId) {
final WebSocketSession session = sessions.get(sessionId);
if (session == null) {
throw new IllegalStateException("Session was not found for the sessionId: " + sessionId);
}
return session;
}
public void sendMessage(final String sessionId, final SendMessage sendMessage) throws IOException {
if (!StringUtils.isEmpty(sessionId)) {
final WebSocketSession session = getSessionOrFail(sessionId);
sendMessage.send(session);
} else {
//The sessionID is not specified so broadcast the message to all connected client sessions.
sessions.keySet().forEach(itrSessionId -> {
try {
final WebSocketSession session = getSessionOrFail(itrSessionId);
sendMessage.send(session);
} catch (IOException e) {
logger.warn("Failed to send message to session {} due to {}", itrSessionId, e, e);
}
});
}
}
public void disconnect(final String sessionId, final String reason) throws IOException {
final WebSocketSession session = getSessionOrFail(sessionId);
session.close(reason);
sessions.remove(sessionId);
}
public boolean containsSession(final String sessionId) {
return sessions.containsKey(sessionId);
}
}