blob: 764ef5061621d77d89bb2afc29492cf2beb62384 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.ahc.ws;
import java.util.HashSet;
import java.util.Set;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.component.ahc.AhcEndpoint;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncHttpClientConfig;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.ws.WebSocket;
import org.asynchttpclient.ws.WebSocketListener;
import org.asynchttpclient.ws.WebSocketUpgradeHandler;
/**
* To exchange data with external Websocket servers using <a href="http://github.com/sonatype/async-http-client">Async Http Client</a>.
*/
@UriEndpoint(firstVersion = "2.14.0", scheme = "ahc-ws,ahc-wss", extendsScheme = "ahc,ahc", title = "AHC Websocket,AHC Secure Websocket",
syntax = "ahc-ws:httpUri", label = "websocket")
public class WsEndpoint extends AhcEndpoint {
private final Set<WsConsumer> consumers = new HashSet<>();
private final WsListener listener = new WsListener();
private transient WebSocket websocket;
@UriParam(label = "producer")
private boolean useStreaming;
@UriParam(label = "consumer")
private boolean sendMessageOnError;
public WsEndpoint(String endpointUri, WsComponent component) {
super(endpointUri, component, null);
}
@Override
public WsComponent getComponent() {
return (WsComponent) super.getComponent();
}
@Override
public Producer createProducer() throws Exception {
return new WsProducer(this);
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
WsConsumer consumer = new WsConsumer(this, processor);
configureConsumer(consumer);
return consumer;
}
WebSocket getWebSocket() throws Exception {
synchronized (this) {
// ensure we are connected
reConnect();
}
return websocket;
}
void setWebSocket(WebSocket websocket) {
this.websocket = websocket;
}
public boolean isUseStreaming() {
return useStreaming;
}
/**
* To enable streaming to send data as multiple text fragments.
*/
public void setUseStreaming(boolean useStreaming) {
this.useStreaming = useStreaming;
}
public boolean isSendMessageOnError() {
return sendMessageOnError;
}
/**
* Whether to send an message if the web-socket listener received an error.
*/
public void setSendMessageOnError(boolean sendMessageOnError) {
this.sendMessageOnError = sendMessageOnError;
}
@Override
protected AsyncHttpClient createClient(AsyncHttpClientConfig config) {
AsyncHttpClient client;
if (config == null) {
config = new DefaultAsyncHttpClientConfig.Builder().build();
client = new DefaultAsyncHttpClient(config);
} else {
client = new DefaultAsyncHttpClient(config);
}
return client;
}
public void connect() throws Exception {
String uri = getHttpUri().toASCIIString();
log.debug("Connecting to {}", uri);
websocket = getClient().prepareGet(uri).execute(
new WebSocketUpgradeHandler.Builder()
.addWebSocketListener(listener).build()).get();
}
@Override
protected void doStop() throws Exception {
if (websocket != null && websocket.isOpen()) {
if (log.isDebugEnabled()) {
log.debug("Disconnecting from {}", getHttpUri().toASCIIString());
}
websocket.removeWebSocketListener(listener);
websocket.sendCloseFrame();
websocket = null;
}
super.doStop();
}
void connect(WsConsumer wsConsumer) throws Exception {
consumers.add(wsConsumer);
reConnect();
}
void disconnect(WsConsumer wsConsumer) {
consumers.remove(wsConsumer);
}
void reConnect() throws Exception {
if (websocket == null || !websocket.isOpen()) {
String uri = getHttpUri().toASCIIString();
log.info("Reconnecting websocket: {}", uri);
connect();
}
}
class WsListener implements WebSocketListener {
@Override
public void onOpen(WebSocket websocket) {
log.debug("Websocket opened");
}
@Override
public void onClose(WebSocket websocket, int code, String reason) {
log.debug("websocket closed - reconnecting");
try {
reConnect();
} catch (Exception e) {
log.warn("Error re-connecting to websocket", e);
ExceptionHandler exceptionHandler = getExceptionHandler();
if (exceptionHandler != null) {
exceptionHandler.handleException("Error re-connecting to websocket", e);
}
}
}
@Override
public void onError(Throwable t) {
log.debug("websocket on error", t);
if (isSendMessageOnError()) {
for (WsConsumer consumer : consumers) {
consumer.sendMessage(t);
}
}
}
@Override
public void onBinaryFrame(byte[] message, boolean finalFragment, int rsv) {
log.debug("Received message --> {}", message);
for (WsConsumer consumer : consumers) {
consumer.sendMessage(message);
}
}
@Override
public void onTextFrame(String message, boolean finalFragment, int rsv) {
log.debug("Received message --> {}", message);
for (WsConsumer consumer : consumers) {
consumer.sendMessage(message);
}
}
public void onPingFrame(byte[] payload) {
log.debug("Received ping --> {}", payload);
websocket.sendPongFrame(payload);
}
}
}