blob: b8f5bfea94e53709d8f0433eda5bdbbcae186f78 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram.util;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncCompletionHandler;
import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClient;
import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClient.BoundRequestBuilder;
import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClientConfigBean;
import org.apache.apex.shaded.ning19.com.ning.http.client.Response;
import org.apache.apex.shaded.ning19.com.ning.http.client.cookie.Cookie;
import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocket;
import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketTextListener;
import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketUpgradeHandler;
import com.google.common.base.Throwables;
import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.common.util.JacksonObjectMapperProvider;
import com.datatorrent.common.util.NameableThreadFactory;
import com.datatorrent.common.util.PubSubMessage;
import com.datatorrent.common.util.PubSubMessageCodec;
/**
* <p>Abstract PubSubWebSocketClient class.</p>
*
* @since 0.3.2
*/
public abstract class PubSubWebSocketClient implements Component<Context>
{
private final AsyncHttpClient client;
private WebSocket connection;
private final ObjectMapper mapper;
private final PubSubMessageCodec<Object> codec;
private URI uri;
private int ioThreadMultiplier;
private String loginUrl;
private String userName;
private String password;
private final AtomicReference<Throwable> throwable;
private class PubSubWebSocket implements WebSocketTextListener
{
@Override
public void onMessage(String message)
{
PubSubMessage<Object> pubSubMessage;
try {
pubSubMessage = codec.parseMessage(message);
PubSubWebSocketClient.this.onMessage(pubSubMessage.getType().getIdentifier(), pubSubMessage.getTopic(), pubSubMessage.getData());
} catch (JsonParseException jpe) {
logger.warn("Ignoring unparseable JSON message: {}", message, jpe);
} catch (JsonMappingException jme) {
logger.warn("Ignoring JSON mapping in message: {}", message, jme);
} catch (IOException ex) {
onError(ex);
}
}
@Override
public void onOpen(WebSocket ws)
{
PubSubWebSocketClient.this.onOpen(ws);
}
@Override
public void onClose(WebSocket ws)
{
PubSubWebSocketClient.this.onClose(ws);
}
@Override
public void onError(Throwable t)
{
PubSubWebSocketClient.this.onError(t);
}
}
/**
* <p>Constructor for PubSubWebSocketClient.</p>
*/
public PubSubWebSocketClient()
{
throwable = new AtomicReference<>();
ioThreadMultiplier = 1;
mapper = (new JacksonObjectMapperProvider()).getContext(null);
codec = new PubSubMessageCodec<>(mapper);
AsyncHttpClientConfigBean config = new AsyncHttpClientConfigBean();
config.setIoThreadMultiplier(ioThreadMultiplier);
config.setApplicationThreadPool(Executors.newCachedThreadPool(new NameableThreadFactory("AsyncHttpClient")));
client = new AsyncHttpClient(config);
}
/**
* <p>Setter for the field
* <code>uri</code>.</p>
*
* @param uri
*/
public void setUri(URI uri)
{
this.uri = uri;
}
public URI getUri()
{
return uri;
}
public void setIoThreadMultiplier(int ioThreadMultiplier)
{
this.ioThreadMultiplier = ioThreadMultiplier;
}
public void setLoginUrl(String url)
{
this.loginUrl = url;
}
public void setUserName(String userName)
{
this.userName = userName;
}
public void setPassword(String password)
{
this.password = password;
}
/**
* <p>openConnection.</p>
*
* @param timeoutMillis
* @throws IOException
* @throws ExecutionException
* @throws InterruptedException
* @throws TimeoutException
*/
public void openConnection(long timeoutMillis) throws IOException, ExecutionException, InterruptedException, TimeoutException
{
throwable.set(null);
List<Cookie> cookies = null;
if (loginUrl != null && userName != null && password != null) {
// get the session key first before attempting web socket
JSONObject json = new JSONObject();
try {
json.put("userName", userName);
json.put("password", password);
} catch (JSONException ex) {
throw new RuntimeException(ex);
}
Response response = client.preparePost(loginUrl).setHeader("Content-Type", "application/json").setBody(json.toString()).execute().get();
cookies = response.getCookies();
}
BoundRequestBuilder brb = client.prepareGet(uri.toString());
if (cookies != null) {
for (Cookie cookie : cookies) {
brb.addCookie(cookie);
}
}
connection = brb.execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new PubSubWebSocket()).build()).get(timeoutMillis, TimeUnit.MILLISECONDS);
}
public void openConnectionAsync() throws IOException
{
throwable.set(null);
if (loginUrl != null && userName != null && password != null) {
// get the session key first before attempting web socket
JSONObject json = new JSONObject();
try {
json.put("userName", userName);
json.put("password", password);
} catch (JSONException ex) {
throw new RuntimeException(ex);
}
client.preparePost(loginUrl).setHeader("Content-Type", "application/json").setBody(json.toString()).execute(new AsyncCompletionHandler<Response>()
{
@Override
public Response onCompleted(Response response) throws Exception
{
List<Cookie> cookies = response.getCookies();
BoundRequestBuilder brb = client.prepareGet(uri.toString());
if (cookies != null) {
for (Cookie cookie : cookies) {
brb.addCookie(cookie);
}
}
connection = brb.execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new PubSubWebSocket()).build()).get();
return response;
}
});
} else {
final PubSubWebSocket webSocket = new PubSubWebSocket()
{
@Override
public void onOpen(WebSocket ws)
{
connection = ws;
super.onOpen(ws);
}
};
client.prepareGet(uri.toString()).execute(
new WebSocketUpgradeHandler.Builder().addWebSocketListener(webSocket).build());
}
}
protected boolean isConnectionSetup()
{
return (connection != null);
}
/**
*
* @return true if the connection is open; false otherwise.
*/
public boolean isConnectionOpen()
{
return isConnectionSetup() ? connection.isOpen() : false;
}
/**
* Before the websocket is used, it's recommended to call this method to ensure that
* any exceptions caught while processing the messages received are acknowledged.
* @throws IOException The reason because of which connection cannot be used.
*/
public void assertUsable() throws IOException
{
if (throwable.get() == null) {
if (connection == null) {
throw new IOException("Connection is not open");
}
return;
}
Throwable t = throwable.get();
if (t instanceof IOException) {
throw (IOException)t;
} else {
throw Throwables.propagate(t);
}
}
/**
* <p>publish.</p>
*
* @param topic
* @param data
* @throws IOException
*/
public void publish(String topic, Object data) throws IOException
{
assertUsable();
connection.sendMessage(PubSubMessageCodec.constructPublishMessage(topic, data, codec));
}
/**
* <p>subscribe.</p>
*
* @param topic
* @throws IOException
*/
public void subscribe(String topic) throws IOException
{
assertUsable();
connection.sendMessage(PubSubMessageCodec.constructSubscribeMessage(topic, codec));
}
/**
* <p>unsubscribe.</p>
*
* @param topic
* @throws IOException
*/
public void unsubscribe(String topic) throws IOException
{
assertUsable();
connection.sendMessage(PubSubMessageCodec.constructUnsubscribeMessage(topic, codec));
}
/**
* <p>subscribeNumSubscribers.</p>
*
* @param topic
* @throws IOException
*/
public void subscribeNumSubscribers(String topic) throws IOException
{
assertUsable();
connection.sendMessage(PubSubMessageCodec.constructSubscribeNumSubscribersMessage(topic, codec));
}
/**
* <p>unsubscribeNumSubscribers.</p>
*
* @param topic
* @throws IOException
*/
public void unsubscribeNumSubscribers(String topic) throws IOException
{
assertUsable();
connection.sendMessage(PubSubMessageCodec.constructUnsubscribeNumSubscribersMessage(topic, codec));
}
/**
* <p>onOpen.</p>
*
* @param ws
*/
public abstract void onOpen(WebSocket ws);
/**
* <p>onMessage.</p>
*
* @param type
* @param topic
* @param data
*/
public abstract void onMessage(String type, String topic, Object data);
/**
* <p>onClose.</p>
*
* @param ws
*/
public abstract void onClose(WebSocket ws);
@Override
public void setup(Context context)
{
}
@Override
public void teardown()
{
if (connection != null) {
connection.close();
}
if (client != null) {
client.close();
}
throwable.set(null);
}
private void onError(Throwable t)
{
throwable.set(t);
}
private static final Logger logger = LoggerFactory.getLogger(PubSubWebSocketClient.class);
}