blob: 7d9199cf7c8f59321a9bd2686227bfd367052660 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.connectors.wsclient.javax.websocket.runtime;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Properties;
import javax.websocket.ClientEndpoint;
import javax.websocket.ContainerProvider;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import org.apache.edgent.connectors.runtime.Connector;
import org.apache.edgent.function.Supplier;
import org.apache.edgent.javax.websocket.EdgentSslContainerProvider;
//import org.eclipse.jetty.util.component.LifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ClientEndpoint
public class WebSocketClientConnector extends Connector<Session> implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(WebSocketClientConnector.class);
private final Properties config;
private volatile String id;
private volatile String sid;
private WebSocketClientReceiver<?> msgReceiver;
private volatile WebSocketContainer container;
private final Supplier<WebSocketContainer> containerFn;
public WebSocketClientConnector(Properties config, Supplier<WebSocketContainer> containerFn) {
Objects.requireNonNull(config, "config");
this.config = config;
checkConfig();
this.containerFn = containerFn!=null
? containerFn
: () -> getWebSocketContainer();
}
private void checkConfig() {
requireConfig("ws.uri");
URI uri = getEndpointURI();
if (!("ws".equals(uri.getScheme()) || "wss".equals(uri.getScheme())))
throw new IllegalArgumentException("ws.uri");
if (optionalConfig("ws.trustStore"))
requireConfig("ws.trustStorePassword");
if (optionalConfig("ws.keyStore"))
requireConfig("ws.keyStorePassword");
}
void setReceiver(WebSocketClientReceiver<?> msgReceiver) {
this.msgReceiver = msgReceiver;
}
@Override
public Logger getLogger() {
return logger;
}
@Override
protected Session doConnect(Session session) throws Exception {
if (session == null || !session.isOpen()) {
if (session != null)
doClose(session);
if (container == null)
container = containerFn.get();
URI uri = getEndpointURI();
getLogger().info("{} connecting uri={}", id(), uri);
session = container.connectToServer(this, uri);
updateId(session);
getLogger().info("{} connected uri={}", id(), uri);
}
return session;
}
private WebSocketContainer getWebSocketContainer() throws RuntimeException {
// Ugh. Turns out there are some serious issues w/JSR356
// as well as Jetty client impl of it wrt SSL and
// trust and key store configurations.
//
// "wss" is OK unless: you need **programatic** trustStore
// OR need clientAuth at all.
//
// https://github.com/eclipse/jetty.project/issues/155
URI uri = getEndpointURI();
// Use the std code for the non-problematic cases
if ("ws".equals(uri.getScheme())
|| (config.getProperty("ws.trustStore") == null
&& config.getProperty("ws.keyStore") == null
&& System.getProperty("javax.net.ssl.keyStore") == null))
{
return ContainerProvider.getWebSocketContainer();
}
else {
getLogger().info("##### Using ContainerProvider.getWebSocketContainer() workaround for SSL #####");
return EdgentSslContainerProvider.getSslWebSocketContainer(config);
}
}
private URI getEndpointURI() throws RuntimeException {
String uriStr = config.getProperty("ws.uri");
try {
return new URI(uriStr);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("ws.uri", e);
}
}
private void requireConfig(String id) {
if (config.getProperty(id) == null)
throw new IllegalArgumentException(id);
}
private boolean optionalConfig(String id) {
return config.getProperty(id) != null;
}
@Override
protected void doDisconnect(Session session) throws Exception {
// no disconnect from javax.websocket.Session
doClose(session);
}
@Override
protected void doClose(Session session) throws Exception {
getLogger().debug("{} doClose {}", id(), session);
try {
session.close();
}
finally {
// // Force lifecycle stop when done with container.
// // This is to free up threads and resources that the
// // JSR-356 container allocates. But unfortunately
// // the JSR-356 spec does not handle lifecycles (yet)
// ((LifeCycle)container).stop();
}
}
private void updateId(Session session) {
sid = session.getId();
id = null;
}
@Override
protected String id() {
if (id == null) {
// include our short object Id
id = "WSCLIENT " + toString().substring(toString().indexOf('@') + 1)
+ " sid=" + sid;
}
return id;
}
@OnError
public void onError(Session client, Throwable t) {
getLogger().error("{} onError {}", id(), t);
}
@OnMessage
public void onTextMessage(String message) {
getLogger().trace("{} onTextMessage {}", id(), message);
if (msgReceiver != null) {
msgReceiver.onTextMessage(message);
}
}
@OnMessage
public void onBinaryMessage(byte[] message) {
getLogger().trace("{} onBinaryMessage {} bytes.", id(), message.length);
if (msgReceiver != null) {
msgReceiver.onBinaryMessage(message);
}
}
void sendBinary(byte[] bytes) {
while (true) {
Session session = getConnectedSession();
try {
session.getBasicRemote().sendBinary(ByteBuffer.wrap(bytes));
getLogger().trace("{} sendBinary {} bytes.", id(), bytes.length);
return;
}
catch (IOException e) {
if (!session.isOpen()) {
connectionLost(e); // logs error
getLogger().error("{} sendBinary {} bytes failed. Retrying following connection lost", id(), bytes.length);
// retry
}
else {
getLogger().error("{} sendBinary {} bytes failed", id(), bytes.length, e);
throw new RuntimeException(e);
}
}
}
}
void sendText(String msg) {
while (true) {
Session session = getConnectedSession();
try {
session.getBasicRemote().sendText(msg);
getLogger().trace("{} sendText {}", id(), msg);
return;
}
catch (IOException e) {
if (!session.isOpen()) {
connectionLost(e); // logs error
getLogger().error("{} sendText {} chars failed. Retrying following connection lost", id(), msg.length());
// retry
}
else {
getLogger().error("{} sendText {} chars failed", id(), msg.length(), e);
throw new RuntimeException(e);
}
}
}
}
private Session getConnectedSession() {
try {
return client();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted", e);
}
catch (RuntimeException e) {
throw e;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}