blob: 0a62446122270c578342422762d49ae8b1b8ff5d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.atmosphere.websocket;
import java.io.InputStream;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.impl.DefaultProducer;
import org.atmosphere.websocket.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class WebsocketProducer extends DefaultProducer {
private static final transient Logger LOG = LoggerFactory.getLogger(WebsocketProducer.class);
private static ExecutorService executor = Executors.newSingleThreadExecutor();
private List<String> notValidConnectionKeys = new ArrayList<>();
public WebsocketProducer(WebsocketEndpoint endpoint) {
super(endpoint);
}
@Override
public WebsocketEndpoint getEndpoint() {
return (WebsocketEndpoint) super.getEndpoint();
}
@Override
public void process(Exchange exchange) throws Exception {
Message in = exchange.getIn();
//TODO support binary data
Object message = in.getBody();
if (message == null) {
LOG.debug("Ignoring a null message");
return;
}
if (!(message instanceof String || message instanceof byte[]
|| message instanceof Reader || message instanceof InputStream)) {
// fallback to use String
if (LOG.isInfoEnabled()) {
LOG.info("Using String for unexpected message type {} ", message.getClass());
}
message = in.getBody(String.class);
}
// REVISIT Reader and InputStream handling at Producer
// special conversion for Reader and InputStream for now
if (message instanceof Reader) {
message = in.getBody(String.class);
} else if (message instanceof InputStream) {
message = in.getBody(byte[].class);
}
log.debug("Sending to {}", message);
if (getEndpoint().isSendToAll()) {
log.debug("Sending to all -> {}", message);
//TODO consider using atmosphere's broadcast or a more configurable async send
for (final WebSocket websocket : getEndpoint().getWebSocketStore().getAllWebSockets()) {
sendMessage(websocket, message);
}
} else if (in.getHeader(WebsocketConstants.CONNECTION_KEY_LIST) != null) {
List<String> connectionKeyList = in.getHeader(WebsocketConstants.CONNECTION_KEY_LIST, List.class);
messageDistributor(connectionKeyList, message);
} else {
String connectionKey = in.getHeader(WebsocketConstants.CONNECTION_KEY, String.class);
messageDistributor(Arrays.asList(connectionKey), message);
}
}
private void messageDistributor(final List<String> connectionKeyList, final Object message) {
if (connectionKeyList == null) {
throw new IllegalArgumentException("Failed to send message to multiple connections; connetion key list is not set.");
}
notValidConnectionKeys = new ArrayList<>();
for (final String connectionKey : connectionKeyList) {
log.debug("Sending to connection key {} -> {}", connectionKey, message);
sendMessage(getWebSocket(connectionKey), message);
}
if (!notValidConnectionKeys.isEmpty()) {
log.debug("Some connections have not received the message {}", message);
getEndpoint().getWebsocketConsumer().sendNotDeliveredMessage(notValidConnectionKeys, message);
}
}
private void sendMessage(final WebSocket websocket, final Object message) {
if (websocket != null && message != null) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
if (message instanceof String) {
websocket.write((String) message);
} else if (message instanceof byte[]) {
websocket.write((byte[]) message, 0, ((byte[]) message).length);
} else {
// this should not happen unless one of the supported types is missing above.
LOG.error("unexpected message type {}", message == null ? null : message.getClass());
}
} catch (Exception e) {
LOG.error("Error when writing to websocket", e);
}
}
});
}
}
private WebSocket getWebSocket(final String connectionKey) {
WebSocket websocket;
if (connectionKey == null) {
throw new IllegalArgumentException("Failed to send message to single connection; connetion key is not set.");
} else {
websocket = getEndpoint().getWebSocketStore().getWebSocket(connectionKey);
if (websocket == null) {
//collect for call back to handle not sent message(s) to guaranty delivery
notValidConnectionKeys.add(connectionKey);
log.debug("Failed to send message to single connection; connetion key is not valid. {}", connectionKey);
}
}
return websocket;
}
}